You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/03/31 06:30:56 UTC

[camel] branch master created (now c9649c9)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


      at c9649c9  Fixed CS

This branch includes the following new commits:

     new 5c4b7e8  Polished
     new 0ffee7c  Camel-AWS2-Lambda: Producer operations refactoring - deleteEventSourceMapping
     new c9649c9  Fixed CS

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[camel] 03/03: Fixed CS

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c9649c90d66b5c3dff66ab1a989ace2e746f28f9
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Mar 31 08:22:22 2021 +0200

    Fixed CS
---
 .../camel/component/aws2/lambda/Lambda2Producer.java | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git a/components/camel-aws/camel-aws2-lambda/src/main/java/org/apache/camel/component/aws2/lambda/Lambda2Producer.java b/components/camel-aws/camel-aws2-lambda/src/main/java/org/apache/camel/component/aws2/lambda/Lambda2Producer.java
index 506d6e4..5caae82 100644
--- a/components/camel-aws/camel-aws2-lambda/src/main/java/org/apache/camel/component/aws2/lambda/Lambda2Producer.java
+++ b/components/camel-aws/camel-aws2-lambda/src/main/java/org/apache/camel/component/aws2/lambda/Lambda2Producer.java
@@ -433,20 +433,20 @@ public class Lambda2Producer extends DefaultProducer {
     }
 
     private void deleteEventSourceMapping(LambdaClient lambdaClient, Exchange exchange) throws InvalidPayloadException {
-    	DeleteEventSourceMappingRequest request = null;
-    	DeleteEventSourceMappingResponse result;
+        DeleteEventSourceMappingRequest request = null;
+        DeleteEventSourceMappingResponse result;
         if (getConfiguration().isPojoRequest()) {
             request = exchange.getIn().getMandatoryBody(DeleteEventSourceMappingRequest.class);
         } else {
-                DeleteEventSourceMappingRequest.Builder builder = DeleteEventSourceMappingRequest.builder();
-                if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(Lambda2Constants.EVENT_SOURCE_UUID))) {
-                	builder.uuid(exchange.getIn().getHeader(Lambda2Constants.EVENT_SOURCE_UUID, String.class));
-                } else {
-                    throw new IllegalArgumentException("Event Source Arn must be specified");
-                }
+            DeleteEventSourceMappingRequest.Builder builder = DeleteEventSourceMappingRequest.builder();
+            if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(Lambda2Constants.EVENT_SOURCE_UUID))) {
+                builder.uuid(exchange.getIn().getHeader(Lambda2Constants.EVENT_SOURCE_UUID, String.class));
+            } else {
+                throw new IllegalArgumentException("Event Source Arn must be specified");
+            }
             request = builder.build();
-                
-            try {    
+
+            try {
                 result = lambdaClient.deleteEventSourceMapping(request);
             } catch (AwsServiceException ase) {
                 LOG.trace("deleteEventSourceMapping command returned the error code {}", ase.awsErrorDetails().errorCode());

[camel] 02/03: Camel-AWS2-Lambda: Producer operations refactoring - deleteEventSourceMapping

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0ffee7cf9af5029c6f68b3cafdf24202f9746633
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Mar 31 08:19:02 2021 +0200

    Camel-AWS2-Lambda: Producer operations refactoring - deleteEventSourceMapping
---
 .../component/aws2/lambda/Lambda2Producer.java     | 26 ++++++++--------------
 1 file changed, 9 insertions(+), 17 deletions(-)

diff --git a/components/camel-aws/camel-aws2-lambda/src/main/java/org/apache/camel/component/aws2/lambda/Lambda2Producer.java b/components/camel-aws/camel-aws2-lambda/src/main/java/org/apache/camel/component/aws2/lambda/Lambda2Producer.java
index 58edc7d..506d6e4 100644
--- a/components/camel-aws/camel-aws2-lambda/src/main/java/org/apache/camel/component/aws2/lambda/Lambda2Producer.java
+++ b/components/camel-aws/camel-aws2-lambda/src/main/java/org/apache/camel/component/aws2/lambda/Lambda2Producer.java
@@ -433,29 +433,21 @@ public class Lambda2Producer extends DefaultProducer {
     }
 
     private void deleteEventSourceMapping(LambdaClient lambdaClient, Exchange exchange) throws InvalidPayloadException {
+    	DeleteEventSourceMappingRequest request = null;
+    	DeleteEventSourceMappingResponse result;
         if (getConfiguration().isPojoRequest()) {
-            Object payload = exchange.getIn().getMandatoryBody();
-            if (payload instanceof DeleteEventSourceMappingRequest) {
-                DeleteEventSourceMappingResponse result;
-                try {
-                    result = lambdaClient.deleteEventSourceMapping((DeleteEventSourceMappingRequest) payload);
-                } catch (AwsServiceException ase) {
-                    LOG.trace("deleteEventSourceMapping command returned the error code {}", ase.awsErrorDetails().errorCode());
-                    throw ase;
-                }
-                Message message = getMessageForResponse(exchange);
-                message.setBody(result);
-            }
+            request = exchange.getIn().getMandatoryBody(DeleteEventSourceMappingRequest.class);
         } else {
-            DeleteEventSourceMappingResponse result;
-            try {
-                DeleteEventSourceMappingRequest.Builder request = DeleteEventSourceMappingRequest.builder();
+                DeleteEventSourceMappingRequest.Builder builder = DeleteEventSourceMappingRequest.builder();
                 if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(Lambda2Constants.EVENT_SOURCE_UUID))) {
-                    request.uuid(exchange.getIn().getHeader(Lambda2Constants.EVENT_SOURCE_UUID, String.class));
+                	builder.uuid(exchange.getIn().getHeader(Lambda2Constants.EVENT_SOURCE_UUID, String.class));
                 } else {
                     throw new IllegalArgumentException("Event Source Arn must be specified");
                 }
-                result = lambdaClient.deleteEventSourceMapping(request.build());
+            request = builder.build();
+                
+            try {    
+                result = lambdaClient.deleteEventSourceMapping(request);
             } catch (AwsServiceException ase) {
                 LOG.trace("deleteEventSourceMapping command returned the error code {}", ase.awsErrorDetails().errorCode());
                 throw ase;

[camel] 01/03: Polished

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5c4b7e8bf2e5676c3bda5e1aca3b8a7d07371fae
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Mar 31 07:46:02 2021 +0200

    Polished
---
 .../apache/camel/catalog/docs/sql-component.adoc   | 19 ++++++++++++++++++
 .../JdbcOrphanLockAwareIdempotentRepository.java   | 23 +++++++++++-----------
 .../modules/ROOT/pages/sql-component.adoc          | 19 ++++++++++++++++++
 3 files changed, 49 insertions(+), 12 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc
index a42a90e..af5a795 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc
@@ -574,6 +574,25 @@ the second one is the message id (`String`).
 The option `tableName` can be used to use the default SQL queries but with a different table name.
 However if you want to customize the SQL queries then you can configure each of them individually.
 
+=== Orphan Lock aware Jdbc IdempotentRepository 
+
+One of the limitations of `org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository` is that it does not handle orphan locks resulting from JVM crash or non graceful shutdown. This can result in unprocessed files/messages if this is implementation is used with camel-file, camel-ftp etc. if you need to address orphan locks processing then use
+`org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository`.  This repository keeps track of the locks held by an instance of the application. For each lock held, the application will send keep alive signals to the lock repository resulting in updating the createdAt column with the current Timestamp. When an application instance tries to acquire a lock if the, then there are three possibilities exist : 
+
+* lock entry does not exist then the lock is provided using the base implementation of `JdbcMessageIdRepository`. 
+
+* lock already exists and the createdAt < System.currentTimeMillis() - lockMaxAgeMillis. In this case it is assumed that an active instance has the lock and the lock is not provided to the new instance requesting the lock
+
+* lock already exists and the createdAt > = System.currentTimeMillis() - lockMaxAgeMillis. In this case it is assumed that there is no active instance which has the lock and the lock is provided to the requesting instance. The reason behind is that if the original instance which had the lock, if it was still running, it would have updated the Timestamp on createdAt using its keepAlive mechanism
+
+This repository has two additional configuration parameters 
+
+[cols="1,1"]
+|===
+|Parameter | Description
+|lockMaxAgeMillis | This refers to the duration after which the lock is considered orphaned i.e. if the currentTimestamp - createdAt >= lockMaxAgeMillis then lock is orphaned.
+|lockKeepAliveIntervalMillis | The frequency at which keep alive updates are done to createdAt Timestamp column.
+|===
 
 == Using the JDBC based aggregation repository
 
diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
index a787e5a..a84cbff 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
@@ -45,8 +45,6 @@ import org.springframework.transaction.support.TransactionTemplate;
  * A lock is granted to an instance if either the entry for the lock attributes do not exists in the
  * CAMEL_MESSAGEPROCESSED table or if in case the instance holding the lock has crashed. This is determined if the
  * timestamp on the createdAt column is more than the lockMaxAge.
- *
- * *
  */
 public class JdbcOrphanLockAwareIdempotentRepository extends JdbcMessageIdRepository implements ShutdownableService {
 
@@ -144,16 +142,22 @@ public class JdbcOrphanLockAwareIdempotentRepository extends JdbcMessageIdReposi
             updateTimestampQuery = updateTimestampQuery.replaceFirst(DEFAULT_TABLENAME, getTableName());
         }
         executorServiceManager = context.getExecutorServiceManager();
-        executorService = executorServiceManager.newSingleThreadScheduledExecutor(this, this.getClass().getName());
-        /**
-         * Schedule a task which will keep updating the timestamp on the acquired locks at lockKeepAliveInterval so that
-         * the timestamp does not reaches lockMaxAge
-         */
+        executorService = executorServiceManager.newSingleThreadScheduledExecutor(this, this.getClass().getSimpleName());
+
+        // Schedule a task which will keep updating the timestamp on the acquired locks at lockKeepAliveInterval so that
+        // the timestamp does not reaches lockMaxAge
         executorService.scheduleWithFixedDelay(new LockKeepAliveTask(), lockKeepAliveIntervalMillis,
                 lockKeepAliveIntervalMillis, TimeUnit.MILLISECONDS);
     }
 
     @Override
+    protected void doShutdown() throws Exception {
+        if (executorServiceManager != null && executorService != null) {
+            executorServiceManager.shutdownGraceful(executorService);
+        }
+    }
+
+    @Override
     protected int delete() {
         long stamp = sl.writeLock();
         try {
@@ -182,11 +186,6 @@ public class JdbcOrphanLockAwareIdempotentRepository extends JdbcMessageIdReposi
         }
     }
 
-    @Override
-    public void shutdown() {
-        executorServiceManager.shutdownGraceful(executorService);
-    }
-
     public Set<ProcessorNameAndMessageId> getProcessorNameMessageIdSet() {
         return processorNameMessageIdSet;
     }
diff --git a/docs/components/modules/ROOT/pages/sql-component.adoc b/docs/components/modules/ROOT/pages/sql-component.adoc
index 9a187de..9263b64 100644
--- a/docs/components/modules/ROOT/pages/sql-component.adoc
+++ b/docs/components/modules/ROOT/pages/sql-component.adoc
@@ -576,6 +576,25 @@ the second one is the message id (`String`).
 The option `tableName` can be used to use the default SQL queries but with a different table name.
 However if you want to customize the SQL queries then you can configure each of them individually.
 
+=== Orphan Lock aware Jdbc IdempotentRepository 
+
+One of the limitations of `org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository` is that it does not handle orphan locks resulting from JVM crash or non graceful shutdown. This can result in unprocessed files/messages if this is implementation is used with camel-file, camel-ftp etc. if you need to address orphan locks processing then use
+`org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository`.  This repository keeps track of the locks held by an instance of the application. For each lock held, the application will send keep alive signals to the lock repository resulting in updating the createdAt column with the current Timestamp. When an application instance tries to acquire a lock if the, then there are three possibilities exist : 
+
+* lock entry does not exist then the lock is provided using the base implementation of `JdbcMessageIdRepository`. 
+
+* lock already exists and the createdAt < System.currentTimeMillis() - lockMaxAgeMillis. In this case it is assumed that an active instance has the lock and the lock is not provided to the new instance requesting the lock
+
+* lock already exists and the createdAt > = System.currentTimeMillis() - lockMaxAgeMillis. In this case it is assumed that there is no active instance which has the lock and the lock is provided to the requesting instance. The reason behind is that if the original instance which had the lock, if it was still running, it would have updated the Timestamp on createdAt using its keepAlive mechanism
+
+This repository has two additional configuration parameters 
+
+[cols="1,1"]
+|===
+|Parameter | Description
+|lockMaxAgeMillis | This refers to the duration after which the lock is considered orphaned i.e. if the currentTimestamp - createdAt >= lockMaxAgeMillis then lock is orphaned.
+|lockKeepAliveIntervalMillis | The frequency at which keep alive updates are done to createdAt Timestamp column.
+|===
 
 == Using the JDBC based aggregation repository