You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/06/10 14:25:19 UTC

[camel] branch main updated (4a0cc8c18f2 -> 406b4c14d62)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from 4a0cc8c18f2 CAMEL-18127: added adapter auto-configuration for CouchDb
     new cf6c97a7a30 CAMEL-18127: fixed incorrect value for CASSANDRA_RESUME_ACTION constant
     new 73b32e9325e CAMEL-18127: reuse resume actions across multiple components
     new f13c75dd416 CAMEL-18127: make part of the resume logic reusable
     new 406b4c14d62 CAMEL-18127: use a common resume adapter when possible

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/camel/catalog/components/cql.json   |  2 +-
 .../org/apache/camel/component/cassandra/cql.json  |  2 +-
 .../component/cassandra/CassandraConstants.java    |  2 +-
 .../component/cassandra/CassandraConsumer.java     | 18 +----
 .../support/DefaultCassandraResumeAdapter.java     | 71 ----------------
 .../org/apache/camel/resume/adapter.properties     |  2 +-
 .../CassandraComponentResumeStrategyIT.java        |  4 +-
 .../component/couchbase/CouchbaseConsumer.java     | 20 +----
 .../couchbase/CouchbaseResumeAdapter.java          | 34 --------
 .../org/apache/camel/resume/adapter.properties     |  2 +-
 .../integration/ConsumeResumeStrategyIT.java       |  4 +-
 .../camel/component/couchdb/CouchDbConsumer.java   | 18 +----
 .../couchdb/consumer/CouchDbResumable.java         | 61 --------------
 .../couchdb/consumer/CouchDbResumeAdapter.java     | 33 --------
 .../consumer/DefaultCouchDbResumeAdapter.java      | 71 ----------------
 .../org/apache/camel/resume/adapter.properties     |  2 +-
 .../org/apache/camel/resume/ResumeActionAware.java | 17 ++--
 .../support/resume/ResumeActionAwareAdapter.java   | 14 +++-
 .../camel/support/resume/ResumeStrategyHelper.java | 94 ++++++++++++++++++++++
 19 files changed, 131 insertions(+), 340 deletions(-)
 delete mode 100644 components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java
 delete mode 100644 components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeAdapter.java
 delete mode 100644 components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
 delete mode 100644 components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
 delete mode 100644 components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/DefaultCouchDbResumeAdapter.java
 rename components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java => core/camel-api/src/main/java/org/apache/camel/resume/ResumeActionAware.java (61%)
 rename components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/consumer/support/DefaultCouchbaseResumeAdapter.java => core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java (78%)
 create mode 100644 core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeStrategyHelper.java


[camel] 04/04: CAMEL-18127: use a common resume adapter when possible

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 406b4c14d6233f24ed4d797fd0197ab2dcbcdd59
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jun 10 14:58:53 2022 +0200

    CAMEL-18127: use a common resume adapter when possible
---
 .../org/apache/camel/resume/adapter.properties     |  2 +-
 .../support/DefaultCouchbaseResumeAdapter.java     | 72 ----------------------
 .../org/apache/camel/resume/adapter.properties     |  2 +-
 .../consumer/DefaultCouchDbResumeAdapter.java      | 72 ----------------------
 .../org/apache/camel/resume/adapter.properties     |  2 +-
 .../support/resume/ResumeActionAwareAdapter.java   | 10 ++-
 6 files changed, 11 insertions(+), 149 deletions(-)

diff --git a/components/camel-cassandraql/src/main/resources/org/apache/camel/resume/adapter.properties b/components/camel-cassandraql/src/main/resources/org/apache/camel/resume/adapter.properties
index 59898d3e1ac..336c53d8298 100644
--- a/components/camel-cassandraql/src/main/resources/org/apache/camel/resume/adapter.properties
+++ b/components/camel-cassandraql/src/main/resources/org/apache/camel/resume/adapter.properties
@@ -16,4 +16,4 @@
 ## ---------------------------------------------------------------------------
 
 
-adapterClass=org.apache.camel.component.cassandra.consumer.support.DefaultCassandraResumeAdapter
+adapterClass=org.apache.camel.support.resume.ResumeActionAwareAdapter
diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/consumer/support/DefaultCouchbaseResumeAdapter.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/consumer/support/DefaultCouchbaseResumeAdapter.java
deleted file mode 100644
index 8162bf5710d..00000000000
--- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/consumer/support/DefaultCouchbaseResumeAdapter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.couchbase.consumer.support;
-
-import java.nio.ByteBuffer;
-
-import org.apache.camel.resume.Cacheable;
-import org.apache.camel.resume.Deserializable;
-import org.apache.camel.resume.Offset;
-import org.apache.camel.resume.OffsetKey;
-import org.apache.camel.resume.ResumeAction;
-import org.apache.camel.resume.ResumeActionAware;
-import org.apache.camel.resume.cache.ResumeCache;
-
-public class DefaultCouchbaseResumeAdapter implements ResumeActionAware, Cacheable, Deserializable {
-    private ResumeAction resumeAction;
-    private ResumeCache<Object> cache;
-
-    @Override
-    public void setResumeAction(ResumeAction resumeAction) {
-        this.resumeAction = resumeAction;
-    }
-
-    @Override
-    public void resume() {
-        cache.forEach(resumeAction::evalEntry);
-    }
-
-    private boolean add(Object key, Object offset) {
-        cache.add(key, offset);
-
-        return true;
-    }
-
-    @Override
-    public boolean add(OffsetKey<?> key, Offset<?> offset) {
-        return add(key.getValue(), offset.getValue());
-    }
-
-    @Override
-    public void setCache(ResumeCache<?> cache) {
-        this.cache = (ResumeCache<Object>) cache;
-    }
-
-    @Override
-    public ResumeCache<?> getCache() {
-        return cache;
-    }
-
-    @Override
-    public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
-        Object key = deserializeObject(keyBuffer);
-        Object value = deserializeObject(valueBuffer);
-
-        return add(key, value);
-    }
-}
diff --git a/components/camel-couchbase/src/main/resources/org/apache/camel/resume/adapter.properties b/components/camel-couchbase/src/main/resources/org/apache/camel/resume/adapter.properties
index 5a7ec013b1e..336c53d8298 100644
--- a/components/camel-couchbase/src/main/resources/org/apache/camel/resume/adapter.properties
+++ b/components/camel-couchbase/src/main/resources/org/apache/camel/resume/adapter.properties
@@ -16,4 +16,4 @@
 ## ---------------------------------------------------------------------------
 
 
-adapterClass=org.apache.camel.component.couchbase.consumer.support.DefaultCouchbaseResumeAdapter
+adapterClass=org.apache.camel.support.resume.ResumeActionAwareAdapter
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/DefaultCouchDbResumeAdapter.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/DefaultCouchDbResumeAdapter.java
deleted file mode 100644
index 4b7c0550c88..00000000000
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/DefaultCouchDbResumeAdapter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.couchdb.consumer;
-
-import java.nio.ByteBuffer;
-
-import org.apache.camel.resume.Cacheable;
-import org.apache.camel.resume.Deserializable;
-import org.apache.camel.resume.Offset;
-import org.apache.camel.resume.OffsetKey;
-import org.apache.camel.resume.ResumeAction;
-import org.apache.camel.resume.ResumeActionAware;
-import org.apache.camel.resume.cache.ResumeCache;
-
-public class DefaultCouchDbResumeAdapter implements ResumeActionAware, Cacheable, Deserializable {
-    private ResumeCache<Object> cache;
-    private ResumeAction resumeAction;
-
-    @Override
-    public void setResumeAction(ResumeAction resumeAction) {
-        this.resumeAction = resumeAction;
-    }
-
-    @Override
-    public void resume() {
-        cache.forEach(resumeAction::evalEntry);
-    }
-
-    private boolean add(Object key, Object offset) {
-        cache.add(key, offset);
-
-        return true;
-    }
-
-    @Override
-    public boolean add(OffsetKey<?> key, Offset<?> offset) {
-        return add(key.getValue(), offset.getValue());
-    }
-
-    @Override
-    public void setCache(ResumeCache<?> cache) {
-        this.cache = (ResumeCache<Object>) cache;
-    }
-
-    @Override
-    public ResumeCache<?> getCache() {
-        return cache;
-    }
-
-    @Override
-    public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
-        Object key = deserializeObject(keyBuffer);
-        Object value = deserializeObject(valueBuffer);
-
-        return add(key, value);
-    }
-}
diff --git a/components/camel-couchdb/src/main/resources/org/apache/camel/resume/adapter.properties b/components/camel-couchdb/src/main/resources/org/apache/camel/resume/adapter.properties
index c8d679476bb..80310b84de3 100644
--- a/components/camel-couchdb/src/main/resources/org/apache/camel/resume/adapter.properties
+++ b/components/camel-couchdb/src/main/resources/org/apache/camel/resume/adapter.properties
@@ -15,4 +15,4 @@
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
 
-org.apache.camel.component.couchdb.consumer.DefaultCouchDbResumeAdapter
+adapterClass=org.apache.camel.support.resume.ResumeActionAwareAdapter
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java b/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java
similarity index 80%
rename from components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java
rename to core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java
index 3115aeec590..0bc07685ed7 100644
--- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.cassandra.consumer.support;
+package org.apache.camel.support.resume;
 
 import java.nio.ByteBuffer;
 
@@ -27,7 +27,13 @@ import org.apache.camel.resume.ResumeAction;
 import org.apache.camel.resume.ResumeActionAware;
 import org.apache.camel.resume.cache.ResumeCache;
 
-public class DefaultCassandraResumeAdapter implements ResumeActionAware, Cacheable, Deserializable {
+/**
+ * A simple resume adapter that support caching, deserialization and actions. This is usually suitable for supporting
+ * resume operations that have simple cache storage requirements, but delegate the resume action to the integrations
+ * (i.e.: such as when resuming from database components, where the resume operation can only be determined by the
+ * integration itself)
+ */
+public class ResumeActionAwareAdapter implements ResumeActionAware, Cacheable, Deserializable {
     private ResumeCache<Object> cache;
     private ResumeAction resumeAction;
 


[camel] 03/04: CAMEL-18127: make part of the resume logic reusable

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f13c75dd416edfffffee0032bc03cd9debafd002
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jun 10 14:49:11 2022 +0200

    CAMEL-18127: make part of the resume logic reusable
---
 .../component/cassandra/CassandraConsumer.java     | 21 +----
 .../component/couchbase/CouchbaseConsumer.java     | 24 +-----
 .../camel/component/couchdb/CouchDbConsumer.java   | 21 +----
 .../camel/support/resume/ResumeStrategyHelper.java | 94 ++++++++++++++++++++++
 4 files changed, 100 insertions(+), 60 deletions(-)

diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
index 0f87544b632..e8216140ce4 100644
--- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
@@ -22,13 +22,10 @@ import com.datastax.oss.driver.api.core.cql.ResultSet;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.resume.ResumeAction;
-import org.apache.camel.resume.ResumeActionAware;
-import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.ScheduledPollConsumer;
-import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.support.resume.ResumeStrategyHelper;
 
 import static org.apache.camel.component.cassandra.CassandraConstants.CASSANDRA_RESUME_ACTION;
 
@@ -88,21 +85,7 @@ public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAw
             preparedStatement = getEndpoint().prepareStatement();
         }
 
-        if (resumeStrategy != null) {
-            resumeStrategy.loadCache();
-
-            ResumeAdapter resumeAdapter = resumeStrategy.getAdapter(ResumeAdapter.class);
-            if (resumeAdapter != null) {
-                if (resumeAdapter instanceof ResumeActionAware) {
-                    ResumeAction action = (ResumeAction) getEndpoint().getCamelContext().getRegistry()
-                            .lookupByName(CASSANDRA_RESUME_ACTION);
-                    ObjectHelper.notNull(action, "The resume action cannot be null", this);
-
-                    ((ResumeActionAware) resumeAdapter).setResumeAction(action);
-                }
-                resumeAdapter.resume();
-            }
-        }
+        ResumeStrategyHelper.resume(getEndpoint().getCamelContext(), this, resumeStrategy, CASSANDRA_RESUME_ACTION);
 
         super.doStart();
     }
diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
index 1bf14d23ae5..f82621ca63a 100644
--- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
+++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
@@ -26,13 +26,10 @@ import com.couchbase.client.java.view.ViewResult;
 import com.couchbase.client.java.view.ViewRow;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.resume.ResumeAction;
-import org.apache.camel.resume.ResumeActionAware;
-import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.DefaultScheduledPollConsumer;
-import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.support.resume.ResumeStrategyHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,24 +97,7 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements R
     protected void doStart() throws Exception {
         super.doStart();
 
-        if (resumeStrategy != null) {
-            LOG.debug("Loading the resume cache");
-            resumeStrategy.loadCache();
-
-            LOG.info("Couchbase consumer running with resume strategy enabled");
-
-            ResumeAdapter resumeAdapter = resumeStrategy.getAdapter(ResumeAdapter.class);
-            if (resumeAdapter != null) {
-                if (resumeAdapter instanceof ResumeActionAware) {
-                    ResumeAction action = (ResumeAction) getEndpoint().getCamelContext().getRegistry()
-                            .lookupByName(COUCHBASE_RESUME_ACTION);
-                    ObjectHelper.notNull(action, "The resume action cannot be null", this);
-
-                    ((ResumeActionAware) resumeAdapter).setResumeAction(action);
-                }
-                resumeAdapter.resume();
-            }
-        }
+        ResumeStrategyHelper.resume(getEndpoint().getCamelContext(), this, resumeStrategy, COUCHBASE_RESUME_ACTION);
     }
 
     @Override
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
index ff34cc704fd..5ce86edd3dc 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
@@ -21,13 +21,10 @@ import java.util.concurrent.ExecutorService;
 import com.google.gson.JsonObject;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.resume.ResumeAction;
-import org.apache.camel.resume.ResumeActionAware;
-import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.DefaultConsumer;
-import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.support.resume.ResumeStrategyHelper;
 
 import static org.apache.camel.component.couchdb.CouchDbConstants.COUCHDB_RESUME_ACTION;
 
@@ -68,21 +65,7 @@ public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<Resu
 
     @Override
     protected void doStart() throws Exception {
-        if (resumeStrategy != null) {
-            resumeStrategy.loadCache();
-
-            ResumeAdapter adapter = resumeStrategy.getAdapter(ResumeAdapter.class);
-            if (adapter != null) {
-                if (adapter instanceof ResumeActionAware) {
-                    ResumeAction action = (ResumeAction) getEndpoint().getCamelContext().getRegistry()
-                            .lookupByName(COUCHDB_RESUME_ACTION);
-                    ObjectHelper.notNull(action, "The resume action cannot be null", this);
-
-                    ((ResumeActionAware) adapter).setResumeAction(action);
-                }
-                adapter.resume();
-            }
-        }
+        ResumeStrategyHelper.resume(getEndpoint().getCamelContext(), this, resumeStrategy, COUCHDB_RESUME_ACTION);
 
         super.doStart();
 
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeStrategyHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeStrategyHelper.java
new file mode 100644
index 00000000000..33b3fba7264
--- /dev/null
+++ b/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeStrategyHelper.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.support.resume;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.resume.ResumeAction;
+import org.apache.camel.resume.ResumeActionAware;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class for the resume strategy
+ */
+public final class ResumeStrategyHelper {
+    private static final Logger LOG = LoggerFactory.getLogger(ResumeStrategyHelper.class);
+
+    private ResumeStrategyHelper() {
+
+    }
+
+    /**
+     * Executes the resume operation
+     * 
+     * @param  context        a camel context on which the registry will be searched for the resume action
+     * @param  on             the calling instance for the resume operation
+     * @param  resumeStrategy the instance of the {@link ResumeStrategy} to perform the resume
+     * @param  actionName     an action name that maps to a {@link ResumeAction} object in the registry
+     * @throws Exception      if the strategy is unable to load the cache
+     */
+    public static void resume(
+            CamelContext context, Object on, ResumeStrategy resumeStrategy,
+            String actionName)
+            throws Exception {
+        resume(context, on, resumeStrategy, actionName, ResumeAdapter.class);
+    }
+
+    /**
+     * Executes the resume operation
+     * 
+     * @param  context        a camel context on which the registry will be searched for the resume action
+     * @param  on             the calling instance for the resume operation
+     * @param  resumeStrategy the instance of the {@link ResumeStrategy} to perform the resume
+     * @param  actionName     an action name that maps to a {@link ResumeAction} object in the registry
+     * @param  adapterClass   the class of the {@link ResumeAdapter} to look for in the registry
+     * @throws Exception      if the strategy is unable to load the cache
+     */
+    public static <T extends ResumeAdapter> void resume(
+            CamelContext context, Object on, ResumeStrategy resumeStrategy,
+            String actionName, Class<T> adapterClass)
+            throws Exception {
+        if (resumeStrategy == null) {
+            LOG.debug("Skipping resume operation because there's no resume strategy defined");
+
+            return;
+        }
+
+        LOG.debug("Loading the resume cache");
+        resumeStrategy.loadCache();
+
+        T resumeAdapter = resumeStrategy.getAdapter(adapterClass);
+        if (resumeAdapter == null) {
+            LOG.warn("The resume cannot be executed because no resume adapter was provided");
+
+            return;
+        }
+
+        if (resumeAdapter instanceof ResumeActionAware) {
+            ResumeAction action = (ResumeAction) context.getRegistry().lookupByName(actionName);
+            ObjectHelper.notNull(action, "The resume action cannot be null", on);
+
+            ((ResumeActionAware) resumeAdapter).setResumeAction(action);
+        }
+
+        resumeAdapter.resume();
+    }
+}


[camel] 01/04: CAMEL-18127: fixed incorrect value for CASSANDRA_RESUME_ACTION constant

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit cf6c97a7a30c30930ac52db522970e49d6b948ea
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jun 10 11:36:39 2022 +0200

    CAMEL-18127: fixed incorrect value for CASSANDRA_RESUME_ACTION constant
---
 .../java/org/apache/camel/component/cassandra/CassandraConstants.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java
index 4918b1359f1..1886309b758 100644
--- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java
@@ -30,7 +30,7 @@ public final class CassandraConstants {
     public static final String CQL_QUERY = "CamelCqlQuery";
 
     @Metadata(label = "consumer", description = "The resume action to execute when resuming.", javaType = "String")
-    public static final String CASSANDRA_RESUME_ACTION = "CamelCqlResumeQuery";
+    public static final String CASSANDRA_RESUME_ACTION = "CamelCqlResumeAction";
 
     private CassandraConstants() {
     }


[camel] 02/04: CAMEL-18127: reuse resume actions across multiple components

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 73b32e9325ea2e2f8e6197ed7d89da34c5baa88e
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jun 10 11:48:17 2022 +0200

    CAMEL-18127: reuse resume actions across multiple components
---
 .../org/apache/camel/catalog/components/cql.json   |  2 +-
 .../org/apache/camel/component/cassandra/cql.json  |  2 +-
 .../component/cassandra/CassandraConsumer.java     | 15 +++---
 .../consumer/support/CassandraResumeAdapter.java   | 34 ------------
 .../support/DefaultCassandraResumeAdapter.java     |  3 +-
 .../CassandraComponentResumeStrategyIT.java        |  4 +-
 .../component/couchbase/CouchbaseConsumer.java     | 14 +++--
 .../couchbase/CouchbaseResumeAdapter.java          | 34 ------------
 .../support/DefaultCouchbaseResumeAdapter.java     |  4 +-
 .../integration/ConsumeResumeStrategyIT.java       |  4 +-
 .../camel/component/couchdb/CouchDbConsumer.java   | 15 +++---
 .../couchdb/consumer/CouchDbResumable.java         | 61 ----------------------
 .../consumer/DefaultCouchDbResumeAdapter.java      |  3 +-
 .../org/apache/camel/resume/ResumeActionAware.java | 18 ++++---
 14 files changed, 51 insertions(+), 162 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json
index 5e44352680f..ca865430054 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json
@@ -28,7 +28,7 @@
   },
   "headers": {
     "CamelCqlQuery": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The CQL query to execute.", "constantName": "org.apache.camel.component.cassandra.CassandraConstants#CQL_QUERY" },
-    "CamelCqlResumeQuery": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The resume action to execute when resuming.", "constantName": "org.apache.camel.component.cassandra.CassandraConstants#CASSANDRA_RESUME_ACTION" }
+    "CamelCqlResumeAction": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The resume action to execute when resuming.", "constantName": "org.apache.camel.component.cassandra.CassandraConstants#CASSANDRA_RESUME_ACTION" }
   },
   "properties": {
     "beanRef": { "kind": "path", "displayName": "Bean Ref", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "beanRef is defined using bean:id" },
diff --git a/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json b/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json
index 5e44352680f..ca865430054 100644
--- a/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json
+++ b/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json
@@ -28,7 +28,7 @@
   },
   "headers": {
     "CamelCqlQuery": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The CQL query to execute.", "constantName": "org.apache.camel.component.cassandra.CassandraConstants#CQL_QUERY" },
-    "CamelCqlResumeQuery": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The resume action to execute when resuming.", "constantName": "org.apache.camel.component.cassandra.CassandraConstants#CASSANDRA_RESUME_ACTION" }
+    "CamelCqlResumeAction": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The resume action to execute when resuming.", "constantName": "org.apache.camel.component.cassandra.CassandraConstants#CASSANDRA_RESUME_ACTION" }
   },
   "properties": {
     "beanRef": { "kind": "path", "displayName": "Bean Ref", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "beanRef is defined using bean:id" },
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
index 9113e53545b..0f87544b632 100644
--- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
@@ -22,8 +22,9 @@ import com.datastax.oss.driver.api.core.cql.ResultSet;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter;
 import org.apache.camel.resume.ResumeAction;
+import org.apache.camel.resume.ResumeActionAware;
+import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.ScheduledPollConsumer;
@@ -90,13 +91,15 @@ public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAw
         if (resumeStrategy != null) {
             resumeStrategy.loadCache();
 
-            CassandraResumeAdapter resumeAdapter = resumeStrategy.getAdapter(CassandraResumeAdapter.class);
+            ResumeAdapter resumeAdapter = resumeStrategy.getAdapter(ResumeAdapter.class);
             if (resumeAdapter != null) {
-                ResumeAction action = (ResumeAction) getEndpoint().getCamelContext().getRegistry()
-                        .lookupByName(CASSANDRA_RESUME_ACTION);
-                ObjectHelper.notNull(action, "The resume action cannot be null", this);
+                if (resumeAdapter instanceof ResumeActionAware) {
+                    ResumeAction action = (ResumeAction) getEndpoint().getCamelContext().getRegistry()
+                            .lookupByName(CASSANDRA_RESUME_ACTION);
+                    ObjectHelper.notNull(action, "The resume action cannot be null", this);
 
-                resumeAdapter.setResumeAction(action);
+                    ((ResumeActionAware) resumeAdapter).setResumeAction(action);
+                }
                 resumeAdapter.resume();
             }
         }
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
deleted file mode 100644
index 78283658fe5..00000000000
--- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.cassandra.consumer.support;
-
-import org.apache.camel.resume.ResumeAction;
-import org.apache.camel.resume.ResumeAdapter;
-
-/**
- * Provides a resume adapter for Cassandra consumers
- */
-public interface CassandraResumeAdapter extends ResumeAdapter {
-
-    /**
-     * Sets an action that will be executed during resume
-     *
-     * @param resumeAction the action to execute during resume
-     */
-    void setResumeAction(ResumeAction resumeAction);
-}
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java
index dee8ad5e380..3115aeec590 100644
--- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java
@@ -24,9 +24,10 @@ import org.apache.camel.resume.Deserializable;
 import org.apache.camel.resume.Offset;
 import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.ResumeAction;
+import org.apache.camel.resume.ResumeActionAware;
 import org.apache.camel.resume.cache.ResumeCache;
 
-public class DefaultCassandraResumeAdapter implements CassandraResumeAdapter, Cacheable, Deserializable {
+public class DefaultCassandraResumeAdapter implements ResumeActionAware, Cacheable, Deserializable {
     private ResumeCache<Object> cache;
     private ResumeAction resumeAction;
 
diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
index 7a2d2aef525..cbae2c98cc7 100644
--- a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
+++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
@@ -23,10 +23,10 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.apache.camel.resume.ResumeAction;
+import org.apache.camel.resume.ResumeActionAware;
 import org.junit.jupiter.api.Test;
 
 import static org.apache.camel.component.cassandra.CassandraConstants.CASSANDRA_RESUME_ACTION;
@@ -34,7 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class CassandraComponentResumeStrategyIT extends BaseCassandra {
 
-    private static class TestCassandraResumeAdapter implements CassandraResumeAdapter {
+    private static class TestCassandraResumeAdapter implements ResumeActionAware {
         private boolean resumeCalled;
         private boolean resumeActionNotNull;
 
diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
index 1de38c3ac39..1bf14d23ae5 100644
--- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
+++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
@@ -27,6 +27,8 @@ import com.couchbase.client.java.view.ViewRow;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.resume.ResumeAction;
+import org.apache.camel.resume.ResumeActionAware;
+import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.DefaultScheduledPollConsumer;
@@ -104,13 +106,15 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements R
 
             LOG.info("Couchbase consumer running with resume strategy enabled");
 
-            CouchbaseResumeAdapter resumeAdapter = resumeStrategy.getAdapter(CouchbaseResumeAdapter.class);
+            ResumeAdapter resumeAdapter = resumeStrategy.getAdapter(ResumeAdapter.class);
             if (resumeAdapter != null) {
-                ResumeAction action = (ResumeAction) getEndpoint().getCamelContext().getRegistry()
-                        .lookupByName(COUCHBASE_RESUME_ACTION);
-                ObjectHelper.notNull(action, "The resume action cannot be null", this);
+                if (resumeAdapter instanceof ResumeActionAware) {
+                    ResumeAction action = (ResumeAction) getEndpoint().getCamelContext().getRegistry()
+                            .lookupByName(COUCHBASE_RESUME_ACTION);
+                    ObjectHelper.notNull(action, "The resume action cannot be null", this);
 
-                resumeAdapter.setResumeAction(action);
+                    ((ResumeActionAware) resumeAdapter).setResumeAction(action);
+                }
                 resumeAdapter.resume();
             }
         }
diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeAdapter.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeAdapter.java
deleted file mode 100644
index 56bd4747b23..00000000000
--- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeAdapter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.couchbase;
-
-import org.apache.camel.resume.ResumeAction;
-import org.apache.camel.resume.ResumeAdapter;
-
-/**
- * Allow implementing resume adapters for couchbase consumers
- */
-public interface CouchbaseResumeAdapter extends ResumeAdapter {
-
-    /**
-     * Sets an action to be executed whenever the resume happens
-     * 
-     * @param resumeAction
-     */
-    void setResumeAction(ResumeAction resumeAction);
-}
diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/consumer/support/DefaultCouchbaseResumeAdapter.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/consumer/support/DefaultCouchbaseResumeAdapter.java
index e96222c6e2e..8162bf5710d 100644
--- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/consumer/support/DefaultCouchbaseResumeAdapter.java
+++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/consumer/support/DefaultCouchbaseResumeAdapter.java
@@ -19,15 +19,15 @@ package org.apache.camel.component.couchbase.consumer.support;
 
 import java.nio.ByteBuffer;
 
-import org.apache.camel.component.couchbase.CouchbaseResumeAdapter;
 import org.apache.camel.resume.Cacheable;
 import org.apache.camel.resume.Deserializable;
 import org.apache.camel.resume.Offset;
 import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.ResumeAction;
+import org.apache.camel.resume.ResumeActionAware;
 import org.apache.camel.resume.cache.ResumeCache;
 
-public class DefaultCouchbaseResumeAdapter implements CouchbaseResumeAdapter, Cacheable, Deserializable {
+public class DefaultCouchbaseResumeAdapter implements ResumeActionAware, Cacheable, Deserializable {
     private ResumeAction resumeAction;
     private ResumeCache<Object> cache;
 
diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
index 46ca0deb8cc..c57dab3585a 100644
--- a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
+++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
@@ -21,10 +21,10 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.couchbase.CouchbaseResumeAdapter;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.apache.camel.resume.ResumeAction;
+import org.apache.camel.resume.ResumeActionAware;
 import org.apache.camel.support.resume.Resumables;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -34,7 +34,7 @@ import static org.apache.camel.component.couchbase.CouchbaseConstants.COUCHBASE_
 import static org.awaitility.Awaitility.await;
 
 public class ConsumeResumeStrategyIT extends CouchbaseIntegrationTestBase {
-    static class TestCouchbaseResumeAdapter implements CouchbaseResumeAdapter {
+    static class TestCouchbaseResumeAdapter implements ResumeActionAware {
         volatile boolean setResumeActionCalled;
         volatile boolean resumeActionNotNull;
         volatile boolean resumeCalled;
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
index 3e050726517..ff34cc704fd 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
@@ -21,8 +21,9 @@ import java.util.concurrent.ExecutorService;
 import com.google.gson.JsonObject;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.component.couchdb.consumer.CouchDbResumeAdapter;
 import org.apache.camel.resume.ResumeAction;
+import org.apache.camel.resume.ResumeActionAware;
+import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.DefaultConsumer;
@@ -70,13 +71,15 @@ public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<Resu
         if (resumeStrategy != null) {
             resumeStrategy.loadCache();
 
-            CouchDbResumeAdapter adapter = resumeStrategy.getAdapter(CouchDbResumeAdapter.class);
+            ResumeAdapter adapter = resumeStrategy.getAdapter(ResumeAdapter.class);
             if (adapter != null) {
-                ResumeAction action = (ResumeAction) getEndpoint().getCamelContext().getRegistry()
-                        .lookupByName(COUCHDB_RESUME_ACTION);
-                ObjectHelper.notNull(action, "The resume action cannot be null", this);
+                if (adapter instanceof ResumeActionAware) {
+                    ResumeAction action = (ResumeAction) getEndpoint().getCamelContext().getRegistry()
+                            .lookupByName(COUCHDB_RESUME_ACTION);
+                    ObjectHelper.notNull(action, "The resume action cannot be null", this);
 
-                adapter.setResumeAction(action);
+                    ((ResumeActionAware) adapter).setResumeAction(action);
+                }
                 adapter.resume();
             }
         }
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
deleted file mode 100644
index bd81936e55d..00000000000
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.couchdb.consumer;
-
-import org.apache.camel.component.couchdb.CouchDbClientWrapper;
-import org.apache.camel.resume.Offset;
-import org.apache.camel.resume.OffsetKey;
-import org.apache.camel.resume.Resumable;
-import org.apache.camel.support.resume.OffsetKeys;
-import org.apache.camel.support.resume.Offsets;
-
-/**
- * Wraps the resume data for CouchDb
- */
-public class CouchDbResumable implements Resumable {
-    private final CouchDbClientWrapper clientWrapper;
-    private String offset;
-
-    public CouchDbResumable(CouchDbClientWrapper clientWrapper, String offset) {
-        this.clientWrapper = clientWrapper;
-        this.offset = offset;
-    }
-
-    public void updateLastOffset(String offset) {
-        this.offset = offset;
-    }
-
-    @Override
-    public Offset<String> getLastOffset() {
-        return Offsets.of(offset);
-    }
-
-    /**
-     * Gets the client wrapper. Fine for local access, but should be restricted for global access on the API
-     * 
-     * @return the client wrapper
-     */
-    CouchDbClientWrapper getClientWrapper() {
-        return clientWrapper;
-    }
-
-    @Override
-    public OffsetKey<?> getOffsetKey() {
-        return OffsetKeys.empty();
-    }
-}
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/DefaultCouchDbResumeAdapter.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/DefaultCouchDbResumeAdapter.java
index 4ecb97e369e..4b7c0550c88 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/DefaultCouchDbResumeAdapter.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/DefaultCouchDbResumeAdapter.java
@@ -24,9 +24,10 @@ import org.apache.camel.resume.Deserializable;
 import org.apache.camel.resume.Offset;
 import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.ResumeAction;
+import org.apache.camel.resume.ResumeActionAware;
 import org.apache.camel.resume.cache.ResumeCache;
 
-public class DefaultCouchDbResumeAdapter implements CouchDbResumeAdapter, Cacheable, Deserializable {
+public class DefaultCouchDbResumeAdapter implements ResumeActionAware, Cacheable, Deserializable {
     private ResumeCache<Object> cache;
     private ResumeAction resumeAction;
 
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeActionAware.java
similarity index 61%
rename from components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/ResumeActionAware.java
index 4c8f4c29914..f5fa89d91e4 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeActionAware.java
@@ -15,15 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.couchdb.consumer;
-
-import org.apache.camel.resume.ResumeAction;
-import org.apache.camel.resume.ResumeAdapter;
+package org.apache.camel.resume;
 
 /**
- * Defines a resumable adapter usable by the CouchDB component
+ * Provides an interface for adapters and other resume-related code to
+ * allow them to offer a way to set actions to be executed during the resume
+ * process. This is most likely to be used in situations where the resume
+ * adapter does not have the information required to resume because the
+ * resume logic is too broad (i.e.: a database component trying to resume
+ * operations cannot know in advance what is the SQL to be executed).
+ *
+ * This provides a way for integrations to inject that part of the logic
+ * into the resume API.
  */
-public interface CouchDbResumeAdapter extends ResumeAdapter {
+public interface ResumeActionAware extends ResumeAdapter {
+
     /**
      * Sets an action that will be executed during resume
      *