You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/31 05:42:46 UTC

[camel] branch master updated: Added new Jdbc based IdempotentRepository which addresses the problem of orphan locks (#5278)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new c87b3f8  Added new Jdbc based IdempotentRepository which addresses the problem of orphan locks (#5278)
c87b3f8 is described below

commit c87b3f83d1808cf3503effbc5b4c8e08c63e22be
Author: Samrat Dhillon <sa...@gmail.com>
AuthorDate: Wed Mar 31 01:42:19 2021 -0400

    Added new Jdbc based IdempotentRepository which addresses the problem of orphan locks (#5278)
    
    * Added new Jdbc based IdempotentRepository which addresses the problem of orhpan lock that can be left behind by jvm crashes
    
    * making default table name protected so that it can be acessed by other implementations
    
    * Renaming the class. Using Camel ExecutorServiceManager to schedule the keep alive and added documentation
    
    Co-authored-by: Samrat Dhillon <sa...@innovapost.com>
---
 .../camel-sql/src/main/docs/sql-component.adoc     |  19 ++
 .../idempotent/jdbc/JdbcMessageIdRepository.java   |   3 +-
 .../JdbcOrphanLockAwareIdempotentRepository.java   | 283 +++++++++++++++++++++
 ...dbcOrphanLockAwareIdempotentRepositoryTest.java | 103 ++++++++
 .../sql/idempotentWithOrphanLockRemoval.sql        |  14 +
 5 files changed, 421 insertions(+), 1 deletion(-)

diff --git a/components/camel-sql/src/main/docs/sql-component.adoc b/components/camel-sql/src/main/docs/sql-component.adoc
index a42a90e..af5a795 100644
--- a/components/camel-sql/src/main/docs/sql-component.adoc
+++ b/components/camel-sql/src/main/docs/sql-component.adoc
@@ -574,6 +574,25 @@ the second one is the message id (`String`).
 The option `tableName` can be used to use the default SQL queries but with a different table name.
 However if you want to customize the SQL queries then you can configure each of them individually.
 
+=== Orphan Lock aware Jdbc IdempotentRepository 
+
+One of the limitations of `org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository` is that it does not handle orphan locks resulting from JVM crash or non graceful shutdown. This can result in unprocessed files/messages if this is implementation is used with camel-file, camel-ftp etc. if you need to address orphan locks processing then use
+`org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository`.  This repository keeps track of the locks held by an instance of the application. For each lock held, the application will send keep alive signals to the lock repository resulting in updating the createdAt column with the current Timestamp. When an application instance tries to acquire a lock if the, then there are three possibilities exist : 
+
+* lock entry does not exist then the lock is provided using the base implementation of `JdbcMessageIdRepository`. 
+
+* lock already exists and the createdAt < System.currentTimeMillis() - lockMaxAgeMillis. In this case it is assumed that an active instance has the lock and the lock is not provided to the new instance requesting the lock
+
+* lock already exists and the createdAt > = System.currentTimeMillis() - lockMaxAgeMillis. In this case it is assumed that there is no active instance which has the lock and the lock is provided to the requesting instance. The reason behind is that if the original instance which had the lock, if it was still running, it would have updated the Timestamp on createdAt using its keepAlive mechanism
+
+This repository has two additional configuration parameters 
+
+[cols="1,1"]
+|===
+|Parameter | Description
+|lockMaxAgeMillis | This refers to the duration after which the lock is considered orphaned i.e. if the currentTimestamp - createdAt >= lockMaxAgeMillis then lock is orphaned.
+|lockKeepAliveIntervalMillis | The frequency at which keep alive updates are done to createdAt Timestamp column.
+|===
 
 == Using the JDBC based aggregation repository
 
diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
index a0e9db2..225a5a3 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
@@ -31,7 +31,7 @@ import org.springframework.transaction.support.TransactionTemplate;
  */
 public class JdbcMessageIdRepository extends AbstractJdbcMessageIdRepository {
 
-    private static final String DEFAULT_TABLENAME = "CAMEL_MESSAGEPROCESSED";
+    protected static final String DEFAULT_TABLENAME = "CAMEL_MESSAGEPROCESSED";
 
     private boolean createTableIfNotExists = true;
     private String tableName;
@@ -79,6 +79,7 @@ public class JdbcMessageIdRepository extends AbstractJdbcMessageIdRepository {
         super.doStart();
 
         transactionTemplate.execute(new TransactionCallback<Boolean>() {
+            @Override
             public Boolean doInTransaction(TransactionStatus status) {
                 try {
                     // we will receive an exception if the table doesn't exists or we cannot access it
diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
new file mode 100644
index 0000000..a787e5a
--- /dev/null
+++ b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.jdbc;
+
+import java.sql.Timestamp;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.StampedLock;
+import java.util.stream.Collectors;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ShutdownableService;
+import org.apache.camel.spi.ExecutorServiceManager;
+import org.springframework.dao.DuplicateKeyException;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.support.TransactionTemplate;
+
+/**
+ * Implementation of {@link AbstractJdbcMessageIdRepository} which handles orphan locks resulting from jvm crash.
+ *
+ * When an instance of the application acquires a lock on the idempotent repository, the lock attributes are added to a
+ * HashSet. While the lock is help by the instance, the instance keeps updating the createdAt column with the current
+ * timestamp indicating the instance holding the lock is active.
+ *
+ * A lock is granted to an instance if either the entry for the lock attributes do not exists in the
+ * CAMEL_MESSAGEPROCESSED table or if in case the instance holding the lock has crashed. This is determined if the
+ * timestamp on the createdAt column is more than the lockMaxAge.
+ *
+ * *
+ */
+public class JdbcOrphanLockAwareIdempotentRepository extends JdbcMessageIdRepository implements ShutdownableService {
+
+    private final StampedLock sl = new StampedLock();
+
+    private final Set<ProcessorNameAndMessageId> processorNameMessageIdSet = new HashSet<>();
+
+    private ExecutorServiceManager executorServiceManager;
+
+    private ScheduledExecutorService executorService;
+
+    private CamelContext context;
+
+    /** Max age of read lock in milliseconds **/
+    private long lockMaxAgeMillis;
+
+    /** intervals after which keep alive is sent for the locks held by an instance **/
+    private long lockKeepAliveIntervalMillis;
+
+    private String updateTimestampQuery
+            = "UPDATE CAMEL_MESSAGEPROCESSED SET createdAt =? WHERE processorName =? AND messageId = ?";
+
+    public JdbcOrphanLockAwareIdempotentRepository(CamelContext camelContext) {
+        super();
+        this.context = camelContext;
+    }
+
+    public JdbcOrphanLockAwareIdempotentRepository(DataSource dataSource, String processorName, CamelContext camelContext) {
+        super(dataSource, processorName);
+        this.context = camelContext;
+    }
+
+    public JdbcOrphanLockAwareIdempotentRepository(DataSource dataSource, TransactionTemplate transactionTemplate,
+                                                   String processorName, CamelContext camelContext) {
+        super(dataSource, transactionTemplate, processorName);
+        this.context = camelContext;
+    }
+
+    public JdbcOrphanLockAwareIdempotentRepository(JdbcTemplate jdbcTemplate,
+                                                   TransactionTemplate transactionTemplate, CamelContext camelContext) {
+        super(jdbcTemplate, transactionTemplate);
+        this.context = camelContext;
+    }
+
+    @Override
+    protected int queryForInt(String key) {
+        /**
+         * If the update timestamp time is more than lockMaxAge then assume that the lock is orphan and the process
+         * which had acquired the lock has died
+         */
+        String orphanLockRecoverQueryString = getQueryString() + " AND createdAt >= ?";
+        Timestamp xMillisAgo = new Timestamp(System.currentTimeMillis() - lockMaxAgeMillis);
+        return jdbcTemplate.queryForObject(orphanLockRecoverQueryString, Integer.class, processorName, key,
+                xMillisAgo);
+    }
+
+    @Override
+    protected int delete(String key) {
+        long stamp = sl.writeLock();
+        try {
+            int result = super.delete(key);
+            processorNameMessageIdSet.remove(new ProcessorNameAndMessageId(processorName, key));
+            return result;
+        } finally {
+            sl.unlockWrite(stamp);
+        }
+
+    }
+
+    @Override
+    protected int insert(String key) {
+        Timestamp currentTimestamp = new Timestamp(System.currentTimeMillis());
+        long stamp = sl.writeLock();
+        try {
+            int result = jdbcTemplate.update(getInsertString(), processorName, key, currentTimestamp);
+            processorNameMessageIdSet.add(new ProcessorNameAndMessageId(processorName, key));
+            return result;
+        } catch (DuplicateKeyException e) {
+            //Update in case of orphan lock where a process dies without releasing exist lock
+            return jdbcTemplate.update(getUpdateTimestampQuery(), currentTimestamp, processorName, key);
+        } finally {
+            sl.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    protected void doInit() throws Exception {
+        if (lockMaxAgeMillis <= lockKeepAliveIntervalMillis) {
+            throw new IllegalStateException("value of lockMaxAgeMillis cannot be <= lockKeepAliveIntervalMillis");
+        }
+        Objects.requireNonNull(this.context, () -> "context cannot be null");
+
+        super.doInit();
+        if (getTableName() != null) {
+            updateTimestampQuery = updateTimestampQuery.replaceFirst(DEFAULT_TABLENAME, getTableName());
+        }
+        executorServiceManager = context.getExecutorServiceManager();
+        executorService = executorServiceManager.newSingleThreadScheduledExecutor(this, this.getClass().getName());
+        /**
+         * Schedule a task which will keep updating the timestamp on the acquired locks at lockKeepAliveInterval so that
+         * the timestamp does not reaches lockMaxAge
+         */
+        executorService.scheduleWithFixedDelay(new LockKeepAliveTask(), lockKeepAliveIntervalMillis,
+                lockKeepAliveIntervalMillis, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    protected int delete() {
+        long stamp = sl.writeLock();
+        try {
+            int result = super.delete();
+            processorNameMessageIdSet.clear();
+            return result;
+        } finally {
+            sl.unlockWrite(stamp);
+        }
+
+    }
+
+    void keepAlive() {
+        Timestamp currentTimestamp = new Timestamp(System.currentTimeMillis());
+        long stamp = sl.readLock();
+        try {
+            List<Object[]> args = processorNameMessageIdSet.stream()
+                    .map(processorNameMessageId -> new Object[] {
+                            currentTimestamp, processorNameMessageId.processorName, processorNameMessageId.messageId })
+                    .collect(Collectors.toList());
+            transactionTemplate.execute(status -> jdbcTemplate.batchUpdate(getUpdateTimestampQuery(), args));
+        } catch (Exception e) {
+            log.error("failed updating createdAt in keepAlive due to ", e);
+        } finally {
+            sl.unlockRead(stamp);
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        executorServiceManager.shutdownGraceful(executorService);
+    }
+
+    public Set<ProcessorNameAndMessageId> getProcessorNameMessageIdSet() {
+        return processorNameMessageIdSet;
+    }
+
+    public String getUpdateTimestampQuery() {
+        return updateTimestampQuery;
+    }
+
+    public void setUpdateTimestampQuery(String updateTimestampQuery) {
+        this.updateTimestampQuery = updateTimestampQuery;
+    }
+
+    public long getLockMaxAgeMillis() {
+        return lockMaxAgeMillis;
+    }
+
+    public void setLockMaxAgeMillis(long lockMaxAgeMillis) {
+        this.lockMaxAgeMillis = lockMaxAgeMillis;
+    }
+
+    public long getLockKeepAliveIntervalMillis() {
+        return lockKeepAliveIntervalMillis;
+    }
+
+    public void setLockKeepAliveIntervalMillis(long lockKeepAliveIntervalMillis) {
+        this.lockKeepAliveIntervalMillis = lockKeepAliveIntervalMillis;
+    }
+
+    class LockKeepAliveTask implements Runnable {
+
+        @Override
+        public void run() {
+            keepAlive();
+        }
+    }
+
+    static class ProcessorNameAndMessageId {
+        private final String processorName;
+        private final String messageId;
+
+        public ProcessorNameAndMessageId(String processorName, String messageId) {
+            this.processorName = processorName;
+            this.messageId = messageId;
+        }
+
+        public String getProcessorName() {
+            return processorName;
+        }
+
+        public String getMessageId() {
+            return messageId;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((messageId == null) ? 0 : messageId.hashCode());
+            result = prime * result + ((processorName == null) ? 0 : processorName.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            ProcessorNameAndMessageId other = (ProcessorNameAndMessageId) obj;
+            if (messageId == null) {
+                if (other.messageId != null) {
+                    return false;
+                }
+            } else if (!messageId.equals(other.messageId)) {
+                return false;
+            }
+            if (processorName == null) {
+                if (other.processorName != null) {
+                    return false;
+                }
+            } else if (!processorName.equals(other.processorName)) {
+                return false;
+            }
+            return true;
+        }
+    }
+
+}
diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepositoryTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepositoryTest.java
new file mode 100644
index 0000000..5a9479e
--- /dev/null
+++ b/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepositoryTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.jdbc;
+
+import java.sql.Timestamp;
+
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository.ProcessorNameAndMessageId;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@TestInstance(Lifecycle.PER_CLASS)
+public class JdbcOrphanLockAwareIdempotentRepositoryTest {
+
+    private static final String APP_NAME = "APP_1";
+
+    private EmbeddedDatabase dataSource;
+
+    private JdbcOrphanLockAwareIdempotentRepository jdbcMessageIdRepository;
+
+    @BeforeAll
+    public void setup() throws Exception {
+        dataSource = new EmbeddedDatabaseBuilder()
+                .setType(EmbeddedDatabaseType.HSQL)
+                .addScript("classpath:sql/idempotentWithOrphanLockRemoval.sql")
+                .generateUniqueName(true)
+                .build();
+        jdbcMessageIdRepository = new JdbcOrphanLockAwareIdempotentRepository(dataSource, APP_NAME, new DefaultCamelContext());
+        jdbcMessageIdRepository.setLockMaxAgeMillis(3000_00L);
+        jdbcMessageIdRepository.setLockKeepAliveIntervalMillis(3000L);
+        jdbcMessageIdRepository.doInit();
+    }
+
+    @Test
+    public void testLockNotGrantedForCurrentTimeStamp() {
+        assertTrue(jdbcMessageIdRepository.contains("FILE_1"));
+    }
+
+    @Test
+    public void testLockNotGrantedForCurrentTimeStampPlus2Min() {
+        assertTrue(jdbcMessageIdRepository.contains("FILE_2"));
+    }
+
+    @Test
+    public void testLockGrantedForCurrentTimeStampPlus5Min() {
+        assertFalse(jdbcMessageIdRepository.contains("FILE_3"));
+    }
+
+    @Test
+    public void testLockKeepAliveWorks() {
+        assertFalse(jdbcMessageIdRepository.contains("FILE_4"));
+        jdbcMessageIdRepository.insert("FILE_4");
+        assertTrue(jdbcMessageIdRepository.contains("FILE_4"));
+        JdbcTemplate template = new JdbcTemplate(dataSource);
+        Timestamp timestamp = new Timestamp(System.currentTimeMillis() - 5 * 60 * 1000L);
+        template.update("UPDATE CAMEL_MESSAGEPROCESSED SET createdAT = ? WHERE processorName = ? AND messageId = ?", timestamp,
+                APP_NAME, "FILE_4");
+        assertFalse(jdbcMessageIdRepository.contains("FILE_4"));
+        jdbcMessageIdRepository.keepAlive();
+        assertTrue(jdbcMessageIdRepository.contains("FILE_4"));
+    }
+
+    @Test
+    public void testInsertQueryDelete() {
+        assertFalse(jdbcMessageIdRepository.contains("FILE_5"));
+        assertFalse(jdbcMessageIdRepository.getProcessorNameMessageIdSet()
+                .contains(new ProcessorNameAndMessageId(APP_NAME, "FILE_5")));
+
+        jdbcMessageIdRepository.add("FILE_5");
+
+        assertTrue(jdbcMessageIdRepository.getProcessorNameMessageIdSet()
+                .contains(new ProcessorNameAndMessageId(APP_NAME, "FILE_5")));
+        assertTrue(jdbcMessageIdRepository.contains("FILE_5"));
+        jdbcMessageIdRepository.remove("FILE_5");
+        assertFalse(jdbcMessageIdRepository.contains("FILE_5"));
+        assertFalse(jdbcMessageIdRepository.getProcessorNameMessageIdSet()
+                .contains(new ProcessorNameAndMessageId(APP_NAME, "FILE_5")));
+    }
+
+}
diff --git a/components/camel-sql/src/test/resources/sql/idempotentWithOrphanLockRemoval.sql b/components/camel-sql/src/test/resources/sql/idempotentWithOrphanLockRemoval.sql
new file mode 100644
index 0000000..ce108eb
--- /dev/null
+++ b/components/camel-sql/src/test/resources/sql/idempotentWithOrphanLockRemoval.sql
@@ -0,0 +1,14 @@
+-- Add DDL to create tables, views, indexes, etc needed by tests. These should match the expected database structure as it will appear in production.
+SET DATABASE SQL SYNTAX PGS TRUE; -- tells HSQLDB that this schema uses MYSQL syntax
+SET PROPERTY "sql.enforce_strict_size" FALSE;
+
+CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP);
+
+ALTER TABLE CAMEL_MESSAGEPROCESSED ADD PRIMARY KEY (processorName, messageId);
+
+
+INSERT INTO CAMEL_MESSAGEPROCESSED VALUES ('APP_1', 'FILE_1', CURRENT_TIMESTAMP);
+
+INSERT INTO CAMEL_MESSAGEPROCESSED VALUES ('APP_1', 'FILE_2',TIMESTAMPADD(SQL_TSI_MINUTE, -2, CURRENT_TIMESTAMP));
+
+INSERT INTO CAMEL_MESSAGEPROCESSED VALUES ('APP_1', 'FILE_3',TIMESTAMPADD(SQL_TSI_MINUTE, -5, CURRENT_TIMESTAMP));