You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/30 07:00:31 UTC

[camel-kafka-connector] branch error-handling created (now 55de3c4)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch error-handling
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


      at 55de3c4  Fixed CS

This branch includes the following new commits:

     new 9fa60d1  First attempt Camel Error Handling level
     new 55de3c4  Fixed CS

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector] 02/02: Fixed CS

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch error-handling
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 55de3c47c17eb640354183050f586fb29358091d
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Oct 30 08:00:08 2020 +0100

    Fixed CS
---
 .../utils/CamelKafkaConnectMain.java               | 26 +++++++++++-----------
 1 file changed, 13 insertions(+), 13 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 3dbf045..7a43f0f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -126,8 +126,8 @@ public class CamelKafkaConnectMain extends SimpleMain {
         }
         
         public Builder withErrorHandler(String errorHandler) {
-        	this.errorHandler = errorHandler;
-        	return this;
+            this.errorHandler = errorHandler;
+            return this;
         }
         
         public Builder withMaxRedeliveries(int maxRedeliveries) {
@@ -158,17 +158,17 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     LOG.info("Creating Camel route from({})", from);
                     
                     if (!ObjectHelper.isEmpty(props.get(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF))) {
-                    	String errorHandler = props.get(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
-                    	switch (errorHandler) {
-						case "no":
-							rd.errorHandler(noErrorHandler());
-							break;
-						case "default":
-							rd.errorHandler(defaultErrorHandler().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
-							break;
-						default:
-							break;
-						}
+                        String errorHandler = props.get(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
+                        switch (errorHandler) {
+                            case "no":
+                                rd.errorHandler(noErrorHandler());
+                                break;
+                            case "default":
+                                rd.errorHandler(defaultErrorHandler().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
+                                break;
+                            default:
+                                break;
+                        }
                     }
 
                     //dataformats


[camel-kafka-connector] 01/02: First attempt Camel Error Handling level

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch error-handling
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 9fa60d144d047cd3bcb04c336c736dbc98ef6263
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Oct 29 22:59:07 2020 +0100

    First attempt Camel Error Handling level
---
 .../camel/kafkaconnector/CamelConnectorConfig.java | 12 ++++++++
 .../kafkaconnector/CamelSinkConnectorConfig.java   |  5 +++-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  7 ++++-
 .../kafkaconnector/CamelSourceConnectorConfig.java |  5 +++-
 .../camel/kafkaconnector/CamelSourceTask.java      |  8 +++++-
 .../utils/CamelKafkaConnectMain.java               | 32 ++++++++++++++++++++++
 6 files changed, 65 insertions(+), 4 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
index ae67309..4271938 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -35,6 +35,18 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
     public static final String CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF = "camel.aggregation.timeout";
     public static final String CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC = "The timeout of the aggregation, to be used in combination with camel.beans.aggregate";
 
+    public static final String CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT = "default";
+    public static final String CAMEL_CONNECTOR_ERROR_HANDLER_CONF = "camel.error.handler";
+    public static final String CAMEL_CONNECTOR_ERROR_HANDLER_DOC = "The error handler to use: possible value are 'no' or 'default'";
+    
+    public static final int CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT = 0;
+    public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF = "camel.error.handler.max.redeliveries";
+    public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC = "The maximum redeliveries to be use in case of Default Error Handler";
+    
+    public static final Long CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT = 1000L;
+    public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF = "camel.error.handler.redelivery.delay";
+    public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC = "The initial redelivery delay in milliseconds in case of Default Error Handler";
+    
     protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
         super(definition, originals, configProviderProps, doLog);
     }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index 980aad1..e86e921 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -54,7 +54,10 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SINK_CONTENT_LOG_LEVEL_DOC)
         .define(CAMEL_CONNECTOR_AGGREGATE_CONF, Type.STRING, CAMEL_CONNECTOR_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_DOC)
         .define(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC)
-        .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC);
+        .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC)
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC)
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC)
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC);
 
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 08967cc..a38afea 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -82,7 +82,9 @@ public class CamelSinkTask extends SinkTask {
             final String unmarshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_UNMARSHAL_CONF);
             final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF);
             final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF);
-
+            final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF);
+            final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF);
+            final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
                 remoteUrl = TaskHelper.buildUrl(camelContext,
@@ -98,6 +100,9 @@ public class CamelSinkTask extends SinkTask {
                 .withMarshallDataFormat(marshaller)
                 .withAggregationSize(size)
                 .withAggregationTimeout(timeout)
+                .withErrorHandler(errorHandler)
+                .withMaxRedeliveries(maxRedeliveries)
+                .withRedeliveryDelay(redeliveryDelay)
                 .build(camelContext);
 
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index bdfc5c7..2ecfb45 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -90,7 +90,10 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC)
         .define(CAMEL_CONNECTOR_AGGREGATE_CONF, Type.STRING, CAMEL_CONNECTOR_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_DOC)
         .define(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC)
-        .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC);
+        .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC)
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC)
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC)
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC);
 
     public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index aef600f..165ffff 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -88,7 +88,10 @@ public class CamelSourceTask extends SourceTask {
             final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF);
             final int size = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF);
             final long timeout = config.getLong(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF);
-
+            final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF);
+            final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF);
+            final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
+            
             topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
             String localUrl = getLocalUrlWithPollingOptions(config);
@@ -107,6 +110,9 @@ public class CamelSourceTask extends SourceTask {
                 .withMarshallDataFormat(marshaller)
                 .withAggregationSize(size)
                 .withAggregationTimeout(timeout)
+                .withErrorHandler(errorHandler)
+                .withMaxRedeliveries(maxRedeliveries)
+                .withRedeliveryDelay(redeliveryDelay)
                 .build(camelContext);
 
             consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 36ec56a..3dbf045 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -91,6 +91,9 @@ public class CamelKafkaConnectMain extends SimpleMain {
         private String unmarshallDataFormat;
         private int aggregationSize;
         private long aggregationTimeout;
+        private String errorHandler;
+        private int maxRedeliveries;
+        private long redeliveryDelay;
 
         public Builder(String from, String to) {
             this.from = from;
@@ -121,6 +124,21 @@ public class CamelKafkaConnectMain extends SimpleMain {
             this.aggregationTimeout = aggregationTimeout;
             return this;
         }
+        
+        public Builder withErrorHandler(String errorHandler) {
+        	this.errorHandler = errorHandler;
+        	return this;
+        }
+        
+        public Builder withMaxRedeliveries(int maxRedeliveries) {
+            this.maxRedeliveries = maxRedeliveries;
+            return this;
+        }
+        
+        public Builder withRedeliveryDelay(long redeliveryDelay) {
+            this.redeliveryDelay = redeliveryDelay;
+            return this;
+        }
 
         public CamelKafkaConnectMain build(CamelContext camelContext) {
             CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
@@ -138,6 +156,20 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     //from
                     RouteDefinition rd = from(from);
                     LOG.info("Creating Camel route from({})", from);
+                    
+                    if (!ObjectHelper.isEmpty(props.get(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF))) {
+                    	String errorHandler = props.get(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
+                    	switch (errorHandler) {
+						case "no":
+							rd.errorHandler(noErrorHandler());
+							break;
+						case "default":
+							rd.errorHandler(defaultErrorHandler().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
+							break;
+						default:
+							break;
+						}
+                    }
 
                     //dataformats
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {