You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/06/10 10:18:43 UTC
[camel] branch main updated: CAMEL-18127: added adapter auto-configuration for CouchDb
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 4a0cc8c18f2 CAMEL-18127: added adapter auto-configuration for CouchDb
4a0cc8c18f2 is described below
commit 4a0cc8c18f2a6e40e1101ff38ad1bb91540c2d2b
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jun 10 11:06:45 2022 +0200
CAMEL-18127: added adapter auto-configuration for CouchDb
---
.../apache/camel/catalog/components/couchdb.json | 3 +-
.../apache/camel/component/couchdb/couchdb.json | 3 +-
.../component/couchdb/CouchDbChangesetTracker.java | 25 ++------
.../camel/component/couchdb/CouchDbConstants.java | 3 +
.../camel/component/couchdb/CouchDbConsumer.java | 20 ++++++
.../couchdb/consumer/CouchDbResumeAdapter.java | 9 +--
.../consumer/CouchDbResumeStrategyFactory.java | 37 -----------
.../consumer/DefaultCouchDbResumeAdapter.java | 71 ++++++++++++++++++++++
.../LatestUpdateSequenceResumeAdapter.java | 39 ------------
.../org/apache/camel/resume/adapter.properties | 18 ++++++
10 files changed, 125 insertions(+), 103 deletions(-)
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json
index b2a37997bc2..eb561fe866e 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json
@@ -31,7 +31,8 @@
"CouchDbSeq": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The couchdb changeset sequence number of the update \/ delete message", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_SEQ" },
"CouchDbId": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The couchdb document id", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_DOC_ID" },
"CouchDbRev": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The couchdb document revision", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_DOC_REV" },
- "CouchDbMethod": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The method (delete \/ update)", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_METHOD" }
+ "CouchDbMethod": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The method (delete \/ update)", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_METHOD" },
+ "CamelCouchDbResumeAction": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The resume action to execute when resuming.", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#COUCHDB_RESUME_ACTION" }
},
"properties": {
"protocol": { "kind": "path", "displayName": "Protocol", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "enum": [ "http", "https" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The protocol to use for communicating with the database." },
diff --git a/components/camel-couchdb/src/generated/resources/org/apache/camel/component/couchdb/couchdb.json b/components/camel-couchdb/src/generated/resources/org/apache/camel/component/couchdb/couchdb.json
index b2a37997bc2..eb561fe866e 100644
--- a/components/camel-couchdb/src/generated/resources/org/apache/camel/component/couchdb/couchdb.json
+++ b/components/camel-couchdb/src/generated/resources/org/apache/camel/component/couchdb/couchdb.json
@@ -31,7 +31,8 @@
"CouchDbSeq": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The couchdb changeset sequence number of the update \/ delete message", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_SEQ" },
"CouchDbId": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The couchdb document id", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_DOC_ID" },
"CouchDbRev": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The couchdb document revision", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_DOC_REV" },
- "CouchDbMethod": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The method (delete \/ update)", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_METHOD" }
+ "CouchDbMethod": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The method (delete \/ update)", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#HEADER_METHOD" },
+ "CamelCouchDbResumeAction": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The resume action to execute when resuming.", "constantName": "org.apache.camel.component.couchdb.CouchDbConstants#COUCHDB_RESUME_ACTION" }
},
"properties": {
"protocol": { "kind": "path", "displayName": "Protocol", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "enum": [ "http", "https" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The protocol to use for communicating with the database." },
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
index f398bc1e9b1..735e43f47ac 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
@@ -20,10 +20,6 @@ import java.time.Duration;
import com.google.gson.JsonObject;
import org.apache.camel.Exchange;
-import org.apache.camel.component.couchdb.consumer.CouchDbResumable;
-import org.apache.camel.component.couchdb.consumer.CouchDbResumeAdapter;
-import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategyFactory;
-import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.task.BlockingTask;
import org.apache.camel.support.task.Tasks;
import org.apache.camel.support.task.budget.Budgets;
@@ -51,24 +47,12 @@ public class CouchDbChangesetTracker implements Runnable {
}
private void initChanges(final String sequence) {
- CouchDbResumable resumable = new CouchDbResumable(couchClient, sequence);
-
- if (sequence == null) {
- ResumeStrategy resumeStrategy = CouchDbResumeStrategyFactory.newResumeStrategy(this.consumer);
-
- assert resumeStrategy != null;
-
- CouchDbResumeAdapter adapter = resumeStrategy.getAdapter(CouchDbResumeAdapter.class);
-
- if (adapter != null) {
- adapter.setResumable(resumable);
- adapter.resume();
- }
+ String since = sequence;
+ if (null == since) {
+ since = couchClient.getLatestUpdateSequence();
}
-
- LOG.debug("Last sequence [{}]", resumable.getLastOffset());
changes = couchClient.changes().style(endpoint.getStyle()).includeDocs(true)
- .since(resumable.getLastOffset().getValue()).heartBeat(endpoint.getHeartbeat()).continuousChanges();
+ .since(since).heartBeat(endpoint.getHeartbeat()).continuousChanges();
}
@Override
@@ -157,5 +141,4 @@ public class CouchDbChangesetTracker implements Runnable {
public void stop() {
changes.stop();
}
-
}
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConstants.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConstants.java
index 0876e857405..c85763fae05 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConstants.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConstants.java
@@ -35,4 +35,7 @@ public interface CouchDbConstants {
@Metadata(description = "The method (delete / update)", javaType = "String")
String HEADER_METHOD = "CouchDbMethod";
+ @Metadata(label = "consumer", description = "The resume action to execute when resuming.", javaType = "String")
+ String COUCHDB_RESUME_ACTION = "CamelCouchDbResumeAction";
+
}
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
index 4512004fff1..3e050726517 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
@@ -21,9 +21,14 @@ import java.util.concurrent.ExecutorService;
import com.google.gson.JsonObject;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.component.couchdb.consumer.CouchDbResumeAdapter;
+import org.apache.camel.resume.ResumeAction;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+
+import static org.apache.camel.component.couchdb.CouchDbConstants.COUCHDB_RESUME_ACTION;
public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<ResumeStrategy> {
@@ -62,12 +67,27 @@ public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<Resu
@Override
protected void doStart() throws Exception {
+ if (resumeStrategy != null) {
+ resumeStrategy.loadCache();
+
+ CouchDbResumeAdapter adapter = resumeStrategy.getAdapter(CouchDbResumeAdapter.class);
+ if (adapter != null) {
+ ResumeAction action = (ResumeAction) getEndpoint().getCamelContext().getRegistry()
+ .lookupByName(COUCHDB_RESUME_ACTION);
+ ObjectHelper.notNull(action, "The resume action cannot be null", this);
+
+ adapter.setResumeAction(action);
+ adapter.resume();
+ }
+ }
+
super.doStart();
executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(),
1);
task = new CouchDbChangesetTracker(endpoint, this, couchClient);
executor.submit(task);
+
}
@Override
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
index f66a2ccd5a8..4c8f4c29914 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.couchdb.consumer;
+import org.apache.camel.resume.ResumeAction;
import org.apache.camel.resume.ResumeAdapter;
/**
@@ -24,9 +25,9 @@ import org.apache.camel.resume.ResumeAdapter;
*/
public interface CouchDbResumeAdapter extends ResumeAdapter {
/**
- * Sets the resumable for the adapter
- *
- * @param resumable the resumable instance
+ * Sets an action that will be executed during resume
+ *
+ * @param resumeAction the action to execute during resume
*/
- void setResumable(CouchDbResumable resumable);
+ void setResumeAction(ResumeAction resumeAction);
}
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java
deleted file mode 100644
index b8515b1ad6a..00000000000
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.couchdb.consumer;
-
-import org.apache.camel.component.couchdb.CouchDbConsumer;
-import org.apache.camel.processor.resume.TransientResumeStrategy;
-import org.apache.camel.resume.ResumeStrategy;
-
-public final class CouchDbResumeStrategyFactory {
- private CouchDbResumeStrategyFactory() {
- }
-
- public static ResumeStrategy newResumeStrategy(CouchDbConsumer consumer) {
- ResumeStrategy resumeStrategy = consumer.getResumeStrategy();
-
- if (resumeStrategy == null) {
- resumeStrategy = new TransientResumeStrategy(new LatestUpdateSequenceResumeAdapter());
- }
-
- return resumeStrategy;
- }
-}
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/DefaultCouchDbResumeAdapter.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/DefaultCouchDbResumeAdapter.java
new file mode 100644
index 00000000000..4ecb97e369e
--- /dev/null
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/DefaultCouchDbResumeAdapter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.couchdb.consumer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.resume.Deserializable;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
+import org.apache.camel.resume.ResumeAction;
+import org.apache.camel.resume.cache.ResumeCache;
+
+public class DefaultCouchDbResumeAdapter implements CouchDbResumeAdapter, Cacheable, Deserializable {
+ private ResumeCache<Object> cache;
+ private ResumeAction resumeAction;
+
+ @Override
+ public void setResumeAction(ResumeAction resumeAction) {
+ this.resumeAction = resumeAction;
+ }
+
+ @Override
+ public void resume() {
+ cache.forEach(resumeAction::evalEntry);
+ }
+
+ private boolean add(Object key, Object offset) {
+ cache.add(key, offset);
+
+ return true;
+ }
+
+ @Override
+ public boolean add(OffsetKey<?> key, Offset<?> offset) {
+ return add(key.getValue(), offset.getValue());
+ }
+
+ @Override
+ public void setCache(ResumeCache<?> cache) {
+ this.cache = (ResumeCache<Object>) cache;
+ }
+
+ @Override
+ public ResumeCache<?> getCache() {
+ return cache;
+ }
+
+ @Override
+ public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
+ Object key = deserializeObject(keyBuffer);
+ Object value = deserializeObject(valueBuffer);
+
+ return add(key, value);
+ }
+}
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeAdapter.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeAdapter.java
deleted file mode 100644
index e9815b9171a..00000000000
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeAdapter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.couchdb.consumer;
-
-import org.apache.camel.component.couchdb.CouchDbClientWrapper;
-
-/**
- * A resume adapter for couchdb that resumes from the last update sequence
- */
-public final class LatestUpdateSequenceResumeAdapter implements CouchDbResumeAdapter {
- private CouchDbResumable resumable;
-
- @Override
- public void setResumable(CouchDbResumable resumable) {
- this.resumable = resumable;
- }
-
- @Override
- public void resume() {
- CouchDbClientWrapper clientWrapper = resumable.getClientWrapper();
-
- resumable.updateLastOffset(clientWrapper.getLatestUpdateSequence());
- }
-}
diff --git a/components/camel-couchdb/src/main/resources/org/apache/camel/resume/adapter.properties b/components/camel-couchdb/src/main/resources/org/apache/camel/resume/adapter.properties
new file mode 100644
index 00000000000..c8d679476bb
--- /dev/null
+++ b/components/camel-couchdb/src/main/resources/org/apache/camel/resume/adapter.properties
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+org.apache.camel.component.couchdb.consumer.DefaultCouchDbResumeAdapter