You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/06/10 10:18:43 UTC

[camel] branch main updated: CAMEL-18127: added adapter auto-configuration for CouchDb

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a0cc8c18f2 CAMEL-18127: added adapter auto-configuration for CouchDb
4a0cc8c18f2 is described below

commit 4a0cc8c18f2a6e40e1101ff38ad1bb91540c2d2b
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jun 10 11:06:45 2022 +0200

    CAMEL-18127: added adapter auto-configuration for CouchDb
---
 .../apache/camel/catalog/components/couchdb.json   |  3 +-
 .../apache/camel/component/couchdb/couchdb.json    |  3 +-
 .../component/couchdb/CouchDbChangesetTracker.java | 25 ++------
 .../camel/component/couchdb/CouchDbConstants.java  |  3 +
 .../camel/component/couchdb/CouchDbConsumer.java   | 20 ++++++
 .../couchdb/consumer/CouchDbResumeAdapter.java     |  9 +--
 .../consumer/CouchDbResumeStrategyFactory.java     | 37 -----------
 .../consumer/DefaultCouchDbResumeAdapter.java      | 71 ++++++++++++++++++++++
 .../LatestUpdateSequenceResumeAdapter.java         | 39 ------------
 .../org/apache/camel/resume/adapter.properties     | 18 ++++++
 10 files changed, 125 insertions(+), 103 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json
index b2a37997bc2..eb561fe866e 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json
@@ -31,7 +31,8 @@
     "CouchDbSeq": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The couchdb changeset sequence number of the update \/ delete message", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_SEQ" },
     "CouchDbId": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The couchdb document id", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_DOC_ID" },
     "CouchDbRev": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The couchdb document revision", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_DOC_REV" },
-    "CouchDbMethod": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The method (delete \/ update)", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_METHOD" }
+    "CouchDbMethod": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The method (delete \/ update)", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_METHOD" },
+    "CamelCouchDbResumeAction": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The resume action to execute when resuming.", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#COUCHDB_RESUME_ACTION" }
   },
   "properties": {
     "protocol": { "kind": "path", "displayName": "Protocol", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "enum": [ "http", "https" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The protocol to use for communicating with the database." },
diff --git a/components/camel-couchdb/src/generated/resources/org/apache/camel/component/couchdb/couchdb.json b/components/camel-couchdb/src/generated/resources/org/apache/camel/component/couchdb/couchdb.json
index b2a37997bc2..eb561fe866e 100644
--- a/components/camel-couchdb/src/generated/resources/org/apache/camel/component/couchdb/couchdb.json
+++ b/components/camel-couchdb/src/generated/resources/org/apache/camel/component/couchdb/couchdb.json
@@ -31,7 +31,8 @@
     "CouchDbSeq": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The couchdb changeset sequence number of the update \/ delete message", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_SEQ" },
     "CouchDbId": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The couchdb document id", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_DOC_ID" },
     "CouchDbRev": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The couchdb document revision", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_DOC_REV" },
-    "CouchDbMethod": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The method (delete \/ update)", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_METHOD" }
+    "CouchDbMethod": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The method (delete \/ update)", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_METHOD" },
+    "CamelCouchDbResumeAction": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The resume action to execute when resuming.", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#COUCHDB_RESUME_ACTION" }
   },
   "properties": {
     "protocol": { "kind": "path", "displayName": "Protocol", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "enum": [ "http", "https" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The protocol to use for communicating with the database." },
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
index f398bc1e9b1..735e43f47ac 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
@@ -20,10 +20,6 @@ import java.time.Duration;
 
 import com.google.gson.JsonObject;
 import org.apache.camel.Exchange;
-import org.apache.camel.component.couchdb.consumer.CouchDbResumable;
-import org.apache.camel.component.couchdb.consumer.CouchDbResumeAdapter;
-import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategyFactory;
-import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.task.BlockingTask;
 import org.apache.camel.support.task.Tasks;
 import org.apache.camel.support.task.budget.Budgets;
@@ -51,24 +47,12 @@ public class CouchDbChangesetTracker implements Runnable {
     }
 
     private void initChanges(final String sequence) {
-        CouchDbResumable resumable = new CouchDbResumable(couchClient, sequence);
-
-        if (sequence == null) {
-            ResumeStrategy resumeStrategy = CouchDbResumeStrategyFactory.newResumeStrategy(this.consumer);
-
-            assert resumeStrategy != null;
-
-            CouchDbResumeAdapter adapter = resumeStrategy.getAdapter(CouchDbResumeAdapter.class);
-
-            if (adapter != null) {
-                adapter.setResumable(resumable);
-                adapter.resume();
-            }
+        String since = sequence;
+        if (null == since) {
+            since = couchClient.getLatestUpdateSequence();
         }
-
-        LOG.debug("Last sequence [{}]", resumable.getLastOffset());
         changes = couchClient.changes().style(endpoint.getStyle()).includeDocs(true)
-                .since(resumable.getLastOffset().getValue()).heartBeat(endpoint.getHeartbeat()).continuousChanges();
+                .since(since).heartBeat(endpoint.getHeartbeat()).continuousChanges();
     }
 
     @Override
@@ -157,5 +141,4 @@ public class CouchDbChangesetTracker implements Runnable {
     public void stop() {
         changes.stop();
     }
-
 }
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConstants.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConstants.java
index 0876e857405..c85763fae05 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConstants.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConstants.java
@@ -35,4 +35,7 @@ public interface CouchDbConstants {
     @Metadata(description = "The method (delete / update)", javaType = "String")
     String HEADER_METHOD = "CouchDbMethod";
 
+    @Metadata(label = "consumer", description = "The resume action to execute when resuming.", javaType = "String")
+    String COUCHDB_RESUME_ACTION = "CamelCouchDbResumeAction";
+
 }
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
index 4512004fff1..3e050726517 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
@@ -21,9 +21,14 @@ import java.util.concurrent.ExecutorService;
 import com.google.gson.JsonObject;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.component.couchdb.consumer.CouchDbResumeAdapter;
+import org.apache.camel.resume.ResumeAction;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+
+import static org.apache.camel.component.couchdb.CouchDbConstants.COUCHDB_RESUME_ACTION;
 
 public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<ResumeStrategy> {
 
@@ -62,12 +67,27 @@ public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<Resu
 
     @Override
     protected void doStart() throws Exception {
+        if (resumeStrategy != null) {
+            resumeStrategy.loadCache();
+
+            CouchDbResumeAdapter adapter = resumeStrategy.getAdapter(CouchDbResumeAdapter.class);
+            if (adapter != null) {
+                ResumeAction action = (ResumeAction) getEndpoint().getCamelContext().getRegistry()
+                        .lookupByName(COUCHDB_RESUME_ACTION);
+                ObjectHelper.notNull(action, "The resume action cannot be null", this);
+
+                adapter.setResumeAction(action);
+                adapter.resume();
+            }
+        }
+
         super.doStart();
 
         executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(),
                 1);
         task = new CouchDbChangesetTracker(endpoint, this, couchClient);
         executor.submit(task);
+
     }
 
     @Override
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
index f66a2ccd5a8..4c8f4c29914 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.component.couchdb.consumer;
 
+import org.apache.camel.resume.ResumeAction;
 import org.apache.camel.resume.ResumeAdapter;
 
 /**
@@ -24,9 +25,9 @@ import org.apache.camel.resume.ResumeAdapter;
  */
 public interface CouchDbResumeAdapter extends ResumeAdapter {
     /**
-     * Sets the resumable for the adapter
-     * 
-     * @param resumable the resumable instance
+     * Sets an action that will be executed during resume
+     *
+     * @param resumeAction the action to execute during resume
      */
-    void setResumable(CouchDbResumable resumable);
+    void setResumeAction(ResumeAction resumeAction);
 }
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java
deleted file mode 100644
index b8515b1ad6a..00000000000
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.couchdb.consumer;
-
-import org.apache.camel.component.couchdb.CouchDbConsumer;
-import org.apache.camel.processor.resume.TransientResumeStrategy;
-import org.apache.camel.resume.ResumeStrategy;
-
-public final class CouchDbResumeStrategyFactory {
-    private CouchDbResumeStrategyFactory() {
-    }
-
-    public static ResumeStrategy newResumeStrategy(CouchDbConsumer consumer) {
-        ResumeStrategy resumeStrategy = consumer.getResumeStrategy();
-
-        if (resumeStrategy == null) {
-            resumeStrategy = new TransientResumeStrategy(new LatestUpdateSequenceResumeAdapter());
-        }
-
-        return resumeStrategy;
-    }
-}
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/DefaultCouchDbResumeAdapter.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/DefaultCouchDbResumeAdapter.java
new file mode 100644
index 00000000000..4ecb97e369e
--- /dev/null
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/DefaultCouchDbResumeAdapter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.couchdb.consumer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.resume.Deserializable;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
+import org.apache.camel.resume.ResumeAction;
+import org.apache.camel.resume.cache.ResumeCache;
+
+public class DefaultCouchDbResumeAdapter implements CouchDbResumeAdapter, Cacheable, Deserializable {
+    private ResumeCache<Object> cache;
+    private ResumeAction resumeAction;
+
+    @Override
+    public void setResumeAction(ResumeAction resumeAction) {
+        this.resumeAction = resumeAction;
+    }
+
+    @Override
+    public void resume() {
+        cache.forEach(resumeAction::evalEntry);
+    }
+
+    private boolean add(Object key, Object offset) {
+        cache.add(key, offset);
+
+        return true;
+    }
+
+    @Override
+    public boolean add(OffsetKey<?> key, Offset<?> offset) {
+        return add(key.getValue(), offset.getValue());
+    }
+
+    @Override
+    public void setCache(ResumeCache<?> cache) {
+        this.cache = (ResumeCache<Object>) cache;
+    }
+
+    @Override
+    public ResumeCache<?> getCache() {
+        return cache;
+    }
+
+    @Override
+    public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
+        Object key = deserializeObject(keyBuffer);
+        Object value = deserializeObject(valueBuffer);
+
+        return add(key, value);
+    }
+}
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeAdapter.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeAdapter.java
deleted file mode 100644
index e9815b9171a..00000000000
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeAdapter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.couchdb.consumer;
-
-import org.apache.camel.component.couchdb.CouchDbClientWrapper;
-
-/**
- * A resume adapter for couchdb that resumes from the last update sequence
- */
-public final class LatestUpdateSequenceResumeAdapter implements CouchDbResumeAdapter {
-    private CouchDbResumable resumable;
-
-    @Override
-    public void setResumable(CouchDbResumable resumable) {
-        this.resumable = resumable;
-    }
-
-    @Override
-    public void resume() {
-        CouchDbClientWrapper clientWrapper = resumable.getClientWrapper();
-
-        resumable.updateLastOffset(clientWrapper.getLatestUpdateSequence());
-    }
-}
diff --git a/components/camel-couchdb/src/main/resources/org/apache/camel/resume/adapter.properties b/components/camel-couchdb/src/main/resources/org/apache/camel/resume/adapter.properties
new file mode 100644
index 00000000000..c8d679476bb
--- /dev/null
+++ b/components/camel-couchdb/src/main/resources/org/apache/camel/resume/adapter.properties
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+org.apache.camel.component.couchdb.consumer.DefaultCouchDbResumeAdapter