You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/06 12:05:09 UTC
[camel] 16/18: CAMEL-16102: Source code generate @InvokeOnHeader
for reflection free
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch on-header
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5bc7cb60f2d6ba88ddab36fefa708ccdc6ca130f
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Feb 6 11:37:18 2021 +0100
CAMEL-16102: Source code generate @InvokeOnHeader for reflection free
---
.../AtomixMapProducerInvokeOnHeaderFactory.java | 42 ++++----
...omixMessagingProducerInvokeOnHeaderFactory.java | 6 +-
...tomixMultiMapProducerInvokeOnHeaderFactory.java | 30 +++---
.../AtomixQueueProducerInvokeOnHeaderFactory.java | 28 ++---
.../AtomixSetProducerInvokeOnHeaderFactory.java | 18 ++--
.../AtomixValueProducerInvokeOnHeaderFactory.java | 12 +--
...l.component.atomix.client.map.AtomixMapProducer | 2 +-
...atomix.client.messaging.AtomixMessagingProducer | 2 +-
...t.atomix.client.multimap.AtomixMultiMapProducer | 2 +-
...mponent.atomix.client.queue.AtomixQueueProducer | 2 +-
...l.component.atomix.client.set.AtomixSetProducer | 2 +-
...mponent.atomix.client.value.AtomixValueProducer | 2 +-
.../client/AbstractAtomixClientProducer.java | 15 +--
.../atomix/client/map/AtomixMapProducer.java | 56 +++-------
.../client/messaging/AtomixMessagingProducer.java | 16 +--
.../client/multimap/AtomixMultiMapProducer.java | 118 ++++++++-------------
.../atomix/client/queue/AtomixQueueProducer.java | 44 ++------
.../atomix/client/set/AtomixSetProducer.java | 32 ++----
.../atomix/client/value/AtomixValueProducer.java | 24 +----
.../camel/support/HeaderSelectorProducer.java | 20 +++-
.../packaging/GenerateInvokeOnHeaderMojo.java | 12 ++-
21 files changed, 188 insertions(+), 297 deletions(-)
diff --git a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/map/AtomixMapProducerInvokeOnHeaderFactory.java b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/map/AtomixMapProducerInvokeOnHeaderFactory.java
index d8cc6e8..3480b94 100644
--- a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/map/AtomixMapProducerInvokeOnHeaderFactory.java
+++ b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/map/AtomixMapProducerInvokeOnHeaderFactory.java
@@ -16,30 +16,30 @@ public class AtomixMapProducerInvokeOnHeaderFactory implements InvokeOnHeaderStr
public Object invoke(Object obj, String key, Exchange exchange, AsyncCallback callback) throws Exception {
org.apache.camel.component.atomix.client.map.AtomixMapProducer target = (org.apache.camel.component.atomix.client.map.AtomixMapProducer) obj;
switch (key) {
- case "values":
- case "VALUES": return target.onValues(exchange.getMessage(), callback);
- case "is_empty":
- case "IS_EMPTY": return target.onIsEmpty(exchange.getMessage(), callback);
- case "replace":
- case "REPLACE": return target.onReplace(exchange.getMessage(), callback);
- case "put_if_absent":
- case "PUT_IF_ABSENT": return target.onPutIfAbsent(exchange.getMessage(), callback);
+ case "clear":
+ case "CLEAR": target.onClear(exchange.getMessage(), callback); return callback;
+ case "contains_key":
+ case "CONTAINS_KEY": target.onContainsKey(exchange.getMessage(), callback); return callback;
+ case "contains_value":
+ case "CONTAINS_VALUE": target.onContainsValue(exchange.getMessage(), callback); return callback;
+ case "entry_set":
+ case "ENTRY_SET": target.onEntrySet(exchange.getMessage(), callback); return callback;
case "get":
- case "GET": return target.onGet(exchange.getMessage(), callback);
+ case "GET": target.onGet(exchange.getMessage(), callback); return callback;
+ case "is_empty":
+ case "IS_EMPTY": target.onIsEmpty(exchange.getMessage(), callback); return callback;
case "put":
- case "PUT": return target.onPut(exchange.getMessage(), callback);
- case "entry_set":
- case "ENTRY_SET": return target.onEntrySet(exchange.getMessage(), callback);
- case "size":
- case "SIZE": return target.onSize(exchange.getMessage(), callback);
- case "clear":
- case "CLEAR": return target.onClear(exchange.getMessage(), callback);
+ case "PUT": target.onPut(exchange.getMessage(), callback); return callback;
+ case "put_if_absent":
+ case "PUT_IF_ABSENT": target.onPutIfAbsent(exchange.getMessage(), callback); return callback;
case "remove":
- case "REMOVE": return target.onRemove(exchange.getMessage(), callback);
- case "contains_value":
- case "CONTAINS_VALUE": return target.onContainsValue(exchange.getMessage(), callback);
- case "contains_key":
- case "CONTAINS_KEY": return target.onContainsKey(exchange.getMessage(), callback);
+ case "REMOVE": target.onRemove(exchange.getMessage(), callback); return callback;
+ case "replace":
+ case "REPLACE": target.onReplace(exchange.getMessage(), callback); return callback;
+ case "size":
+ case "SIZE": target.onSize(exchange.getMessage(), callback); return callback;
+ case "values":
+ case "VALUES": target.onValues(exchange.getMessage(), callback); return callback;
default: return null;
}
}
diff --git a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducerInvokeOnHeaderFactory.java b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducerInvokeOnHeaderFactory.java
index 445c845..995bae3 100644
--- a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducerInvokeOnHeaderFactory.java
+++ b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducerInvokeOnHeaderFactory.java
@@ -16,10 +16,10 @@ public class AtomixMessagingProducerInvokeOnHeaderFactory implements InvokeOnHea
public Object invoke(Object obj, String key, Exchange exchange, AsyncCallback callback) throws Exception {
org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducer target = (org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducer) obj;
switch (key) {
- case "direct":
- case "DIRECT": return target.onDirect(exchange.getMessage(), callback);
case "broadcast":
- case "BROADCAST": return target.onBroadcast(exchange.getMessage(), callback);
+ case "BROADCAST": target.onBroadcast(exchange.getMessage(), callback); return callback;
+ case "direct":
+ case "DIRECT": target.onDirect(exchange.getMessage(), callback); return callback;
default: return null;
}
}
diff --git a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducerInvokeOnHeaderFactory.java b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducerInvokeOnHeaderFactory.java
index 5597dca..8056159 100644
--- a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducerInvokeOnHeaderFactory.java
+++ b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducerInvokeOnHeaderFactory.java
@@ -16,22 +16,26 @@ public class AtomixMultiMapProducerInvokeOnHeaderFactory implements InvokeOnHead
public Object invoke(Object obj, String key, Exchange exchange, AsyncCallback callback) throws Exception {
org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducer target = (org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducer) obj;
switch (key) {
- case "is_empty":
- case "IS_EMPTY": return target.onIsEmpty(exchange.getMessage(), callback);
- case "size":
- case "SIZE": return target.onSize(exchange.getMessage(), callback);
+ case "clear":
+ case "CLEAR": target.onClear(exchange.getMessage(), callback); return callback;
+ case "contains_entry":
+ case "CONTAINS_ENTRY": target.onContainsEntry(exchange.getMessage(), callback); return callback;
case "contains_key":
- case "CONTAINS_KEY": return target.onContainsKey(exchange.getMessage(), callback);
- case "put":
- case "PUT": return target.onPut(exchange.getMessage(), callback);
- case "remove_value":
- case "REMOVE_VALUE": return target.onRemoveValue(exchange.getMessage(), callback);
+ case "CONTAINS_KEY": target.onContainsKey(exchange.getMessage(), callback); return callback;
+ case "contains_value":
+ case "CONTAINS_VALUE": target.onContainsValue(exchange.getMessage(), callback); return callback;
case "get":
- case "GET": return target.onGet(exchange.getMessage(), callback);
- case "clear":
- case "CLEAR": return target.onClear(exchange.getMessage(), callback);
+ case "GET": target.onGet(exchange.getMessage(), callback); return callback;
+ case "is_empty":
+ case "IS_EMPTY": target.onIsEmpty(exchange.getMessage(), callback); return callback;
+ case "put":
+ case "PUT": target.onPut(exchange.getMessage(), callback); return callback;
case "remove":
- case "REMOVE": return target.onRemove(exchange.getMessage(), callback);
+ case "REMOVE": target.onRemove(exchange.getMessage(), callback); return callback;
+ case "remove_value":
+ case "REMOVE_VALUE": target.onRemoveValue(exchange.getMessage(), callback); return callback;
+ case "size":
+ case "SIZE": target.onSize(exchange.getMessage(), callback); return callback;
default: return null;
}
}
diff --git a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducerInvokeOnHeaderFactory.java b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducerInvokeOnHeaderFactory.java
index 3f60ee6..e62c138 100644
--- a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducerInvokeOnHeaderFactory.java
+++ b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducerInvokeOnHeaderFactory.java
@@ -16,24 +16,24 @@ public class AtomixQueueProducerInvokeOnHeaderFactory implements InvokeOnHeaderS
public Object invoke(Object obj, String key, Exchange exchange, AsyncCallback callback) throws Exception {
org.apache.camel.component.atomix.client.queue.AtomixQueueProducer target = (org.apache.camel.component.atomix.client.queue.AtomixQueueProducer) obj;
switch (key) {
- case "poll":
- case "POLL": return target.onPoll(exchange.getMessage(), callback);
+ case "add":
+ case "ADD": target.onAdd(exchange.getMessage(), callback); return callback;
case "clear":
- case "CLEAR": return target.onClear(exchange.getMessage(), callback);
- case "size":
- case "SIZE": return target.onSize(exchange.getMessage(), callback);
+ case "CLEAR": target.onClear(exchange.getMessage(), callback); return callback;
case "contains":
- case "CONTAINS": return target.onContains(exchange.getMessage(), callback);
- case "remove":
- case "REMOVE": return target.onRemove(exchange.getMessage(), callback);
- case "peek":
- case "PEEK": return target.onPeek(exchange.getMessage(), callback);
- case "add":
- case "ADD": return target.onAdd(exchange.getMessage(), callback);
+ case "CONTAINS": target.onContains(exchange.getMessage(), callback); return callback;
case "is_empty":
- case "IS_EMPTY": return target.onIsEmpty(exchange.getMessage(), callback);
+ case "IS_EMPTY": target.onIsEmpty(exchange.getMessage(), callback); return callback;
case "offer":
- case "OFFER": return target.onOffer(exchange.getMessage(), callback);
+ case "OFFER": target.onOffer(exchange.getMessage(), callback); return callback;
+ case "peek":
+ case "PEEK": target.onPeek(exchange.getMessage(), callback); return callback;
+ case "poll":
+ case "POLL": target.onPoll(exchange.getMessage(), callback); return callback;
+ case "remove":
+ case "REMOVE": target.onRemove(exchange.getMessage(), callback); return callback;
+ case "size":
+ case "SIZE": target.onSize(exchange.getMessage(), callback); return callback;
default: return null;
}
}
diff --git a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/set/AtomixSetProducerInvokeOnHeaderFactory.java b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/set/AtomixSetProducerInvokeOnHeaderFactory.java
index d432ba5..9311986 100644
--- a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/set/AtomixSetProducerInvokeOnHeaderFactory.java
+++ b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/set/AtomixSetProducerInvokeOnHeaderFactory.java
@@ -16,18 +16,18 @@ public class AtomixSetProducerInvokeOnHeaderFactory implements InvokeOnHeaderStr
public Object invoke(Object obj, String key, Exchange exchange, AsyncCallback callback) throws Exception {
org.apache.camel.component.atomix.client.set.AtomixSetProducer target = (org.apache.camel.component.atomix.client.set.AtomixSetProducer) obj;
switch (key) {
+ case "add":
+ case "ADD": target.onAdd(exchange.getMessage(), callback); return callback;
case "clear":
- case "CLEAR": return target.onClear(exchange.getMessage(), callback);
+ case "CLEAR": target.onClear(exchange.getMessage(), callback); return callback;
case "contains":
- case "CONTAINS": return target.onContains(exchange.getMessage(), callback);
- case "size":
- case "SIZE": return target.onSize(exchange.getMessage(), callback);
- case "add":
- case "ADD": return target.onAdd(exchange.getMessage(), callback);
- case "remove":
- case "REMOVE": return target.onRemove(exchange.getMessage(), callback);
+ case "CONTAINS": target.onContains(exchange.getMessage(), callback); return callback;
case "is_empty":
- case "IS_EMPTY": return target.onIsEmpty(exchange.getMessage(), callback);
+ case "IS_EMPTY": target.onIsEmpty(exchange.getMessage(), callback); return callback;
+ case "remove":
+ case "REMOVE": target.onRemove(exchange.getMessage(), callback); return callback;
+ case "size":
+ case "SIZE": target.onSize(exchange.getMessage(), callback); return callback;
default: return null;
}
}
diff --git a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/value/AtomixValueProducerInvokeOnHeaderFactory.java b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/value/AtomixValueProducerInvokeOnHeaderFactory.java
index ce51057..2aef392 100644
--- a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/value/AtomixValueProducerInvokeOnHeaderFactory.java
+++ b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/value/AtomixValueProducerInvokeOnHeaderFactory.java
@@ -16,14 +16,14 @@ public class AtomixValueProducerInvokeOnHeaderFactory implements InvokeOnHeaderS
public Object invoke(Object obj, String key, Exchange exchange, AsyncCallback callback) throws Exception {
org.apache.camel.component.atomix.client.value.AtomixValueProducer target = (org.apache.camel.component.atomix.client.value.AtomixValueProducer) obj;
switch (key) {
- case "set":
- case "SET": return target.onSet(exchange.getMessage(), callback);
case "compare_and_set":
- case "COMPARE_AND_SET": return target.onCompareAndSet(exchange.getMessage(), callback);
- case "get_and_set":
- case "GET_AND_SET": return target.onGetAndSet(exchange.getMessage(), callback);
+ case "COMPARE_AND_SET": target.onCompareAndSet(exchange.getMessage(), callback); return callback;
case "get":
- case "GET": return target.onGet(exchange.getMessage(), callback);
+ case "GET": target.onGet(exchange.getMessage(), callback); return callback;
+ case "get_and_set":
+ case "GET_AND_SET": target.onGetAndSet(exchange.getMessage(), callback); return callback;
+ case "set":
+ case "SET": target.onSet(exchange.getMessage(), callback); return callback;
default: return null;
}
}
diff --git a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.map.AtomixMapProducer b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.map.AtomixMapProducer
index 457bff1..ad0a22c 100644
--- a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.map.AtomixMapProducer
+++ b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.map.AtomixMapProducer
@@ -1,2 +1,2 @@
# Generated by camel build tools - do NOT edit this file!
-class=org.apache.camel.component.atomix.client.map.AtomixMapProducer.AtomixMapProducerInvokeOnHeaderFactory
+class=org.apache.camel.component.atomix.client.map.AtomixMapProducerInvokeOnHeaderFactory
diff --git a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducer b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducer
index ab52975..845cc5b 100644
--- a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducer
+++ b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducer
@@ -1,2 +1,2 @@
# Generated by camel build tools - do NOT edit this file!
-class=org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducer.AtomixMessagingProducerInvokeOnHeaderFactory
+class=org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducerInvokeOnHeaderFactory
diff --git a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducer b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducer
index 0ffdfe3..d4e57ec 100644
--- a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducer
+++ b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducer
@@ -1,2 +1,2 @@
# Generated by camel build tools - do NOT edit this file!
-class=org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducer.AtomixMultiMapProducerInvokeOnHeaderFactory
+class=org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducerInvokeOnHeaderFactory
diff --git a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.queue.AtomixQueueProducer b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.queue.AtomixQueueProducer
index 4c242db..67df94d 100644
--- a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.queue.AtomixQueueProducer
+++ b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.queue.AtomixQueueProducer
@@ -1,2 +1,2 @@
# Generated by camel build tools - do NOT edit this file!
-class=org.apache.camel.component.atomix.client.queue.AtomixQueueProducer.AtomixQueueProducerInvokeOnHeaderFactory
+class=org.apache.camel.component.atomix.client.queue.AtomixQueueProducerInvokeOnHeaderFactory
diff --git a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.set.AtomixSetProducer b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.set.AtomixSetProducer
index 0c00cc8..03bb4d2 100644
--- a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.set.AtomixSetProducer
+++ b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.set.AtomixSetProducer
@@ -1,2 +1,2 @@
# Generated by camel build tools - do NOT edit this file!
-class=org.apache.camel.component.atomix.client.set.AtomixSetProducer.AtomixSetProducerInvokeOnHeaderFactory
+class=org.apache.camel.component.atomix.client.set.AtomixSetProducerInvokeOnHeaderFactory
diff --git a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.value.AtomixValueProducer b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.value.AtomixValueProducer
index 0bbabf2..2ca7b91 100644
--- a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.value.AtomixValueProducer
+++ b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.value.AtomixValueProducer
@@ -1,2 +1,2 @@
# Generated by camel build tools - do NOT edit this file!
-class=org.apache.camel.component.atomix.client.value.AtomixValueProducer.AtomixValueProducerInvokeOnHeaderFactory
+class=org.apache.camel.component.atomix.client.value.AtomixValueProducerInvokeOnHeaderFactory
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java
index c2d766a..f39d217 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java
@@ -16,35 +16,28 @@
*/
package org.apache.camel.component.atomix.client;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import io.atomix.resource.Resource;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Message;
-import org.apache.camel.component.atomix.AtomixAsyncMessageProcessor;
import org.apache.camel.support.HeaderSelectorProducer;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT;
-import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME;
+import static org.apache.camel.component.atomix.client.AtomixClientConstants.*;
public abstract class AbstractAtomixClientProducer<E extends AbstractAtomixClientEndpoint, R extends Resource>
extends HeaderSelectorProducer {
private static final Logger LOG = LoggerFactory.getLogger(AbstractAtomixClientProducer.class);
- private final Map<String, AtomixAsyncMessageProcessor> processors;
private ConcurrentMap<String, R> resources;
- protected AbstractAtomixClientProducer(E endpoint) {
- super(endpoint, get);
-
- this.processors = new HashMap<>();
+ protected AbstractAtomixClientProducer(E endpoint, String defaultHeader) {
+ super(endpoint, RESOURCE_ACTION, defaultHeader);
this.resources = new ConcurrentHashMap<>();
}
@@ -82,8 +75,6 @@ public abstract class AbstractAtomixClientProducer<E extends AbstractAtomixClien
return resources.computeIfAbsent(resourceName, name -> createResource(name));
}
- protected abstract String getProcessorKey(Message message);
-
protected abstract String getResourceName(Message message);
protected abstract R createResource(String name);
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapProducer.java
index 3d8eac4..ca79ee5 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapProducer.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapProducer.java
@@ -26,7 +26,6 @@ import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer;
import org.apache.camel.spi.InvokeOnHeader;
import org.apache.camel.util.ObjectHelper;
-import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_DEFAULT_VALUE;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_KEY;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME;
@@ -39,7 +38,7 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix
private final AtomixMapConfiguration configuration;
protected AtomixMapProducer(AtomixMapEndpoint endpoint) {
- super(endpoint);
+ super(endpoint, endpoint.getConfiguration().getDefaultAction().name());
this.configuration = endpoint.getConfiguration();
}
@@ -53,7 +52,7 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix
}
@InvokeOnHeader("PUT")
- boolean onPut(Message message, AsyncCallback callback) throws Exception {
+ void onPut(Message message, AsyncCallback callback) throws Exception {
final DistributedMap<Object, Object> map = getResource(message);
final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class);
final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
@@ -69,12 +68,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix
map.put(key, val).thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("PUT_IF_ABSENT")
- boolean onPutIfAbsent(Message message, AsyncCallback callback) throws Exception {
+ void onPutIfAbsent(Message message, AsyncCallback callback) throws Exception {
final DistributedMap<Object, Object> map = getResource(message);
final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class);
final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
@@ -90,12 +87,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix
map.putIfAbsent(key, val).thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("GET")
- boolean onGet(Message message, AsyncCallback callback) throws Exception {
+ void onGet(Message message, AsyncCallback callback) throws Exception {
final DistributedMap<Object, Object> map = getResource(message);
final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class);
final Object defaultValue = message.getHeader(RESOURCE_DEFAULT_VALUE);
@@ -121,22 +116,18 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix
result -> processResult(message, callback, result));
}
}
-
- return false;
}
@InvokeOnHeader("CLEAR")
- boolean onClear(Message message, AsyncCallback callback) throws Exception {
+ void onClear(Message message, AsyncCallback callback) throws Exception {
final DistributedMap<Object, Object> map = getResource(message);
map.clear().thenAccept(
result -> processResult(message, callback, result));
-
- return false;
}
@InvokeOnHeader("SIZE")
- boolean onSize(Message message, AsyncCallback callback) throws Exception {
+ void onSize(Message message, AsyncCallback callback) throws Exception {
final DistributedMap<Object, Object> map = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -148,12 +139,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix
map.size().thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("IS_EMPTY")
- boolean onIsEmpty(Message message, AsyncCallback callback) throws Exception {
+ void onIsEmpty(Message message, AsyncCallback callback) throws Exception {
final DistributedMap<Object, Object> map = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -165,12 +154,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix
map.isEmpty().thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("ENTRY_SET")
- boolean onEntrySet(Message message, AsyncCallback callback) throws Exception {
+ void onEntrySet(Message message, AsyncCallback callback) throws Exception {
final DistributedMap<Object, Object> map = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -182,12 +169,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix
map.entrySet().thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("VALUES")
- boolean onValues(Message message, AsyncCallback callback) throws Exception {
+ void onValues(Message message, AsyncCallback callback) throws Exception {
final DistributedMap<Object, Object> map = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -199,12 +184,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix
map.values().thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("CONTAINS_KEY")
- boolean onContainsKey(Message message, AsyncCallback callback) throws Exception {
+ void onContainsKey(Message message, AsyncCallback callback) throws Exception {
final DistributedMap<Object, Object> map = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -219,12 +202,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix
map.containsKey(key).thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("CONTAINS_VALUE")
- boolean onContainsValue(Message message, AsyncCallback callback) throws Exception {
+ void onContainsValue(Message message, AsyncCallback callback) throws Exception {
final DistributedMap<Object, Object> map = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -239,12 +220,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix
map.containsValue(value).thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("REMOVE")
- boolean onRemove(Message message, AsyncCallback callback) throws Exception {
+ void onRemove(Message message, AsyncCallback callback) throws Exception {
final DistributedMap<Object, Object> map = getResource(message);
final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class);
final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
@@ -258,12 +237,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix
map.remove(key).thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("REPLACE")
- boolean onReplace(Message message, AsyncCallback callback) throws Exception {
+ void onReplace(Message message, AsyncCallback callback) throws Exception {
final DistributedMap<Object, Object> map = getResource(message);
final long ttl = getResourceTtl(message);
final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class);
@@ -290,8 +267,6 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix
result -> processResult(message, callback, result));
}
}
-
- return false;
}
// *********************************
@@ -299,11 +274,6 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix
// *********************************
@Override
- protected String getProcessorKey(Message message) {
- return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class);
- }
-
- @Override
protected String getResourceName(Message message) {
return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class);
}
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducer.java
index 1dacf9e..a062824 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducer.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducer.java
@@ -28,7 +28,6 @@ import org.apache.camel.util.ObjectHelper;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.BROADCAST_TYPE;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.CHANNEL_NAME;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.MEMBER_NAME;
-import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE;
import static org.apache.camel.component.atomix.client.messaging.AtomixMessaging.OPTIONS_BROADCAST;
@@ -39,7 +38,7 @@ public final class AtomixMessagingProducer extends AbstractAtomixClientProducer<
private final AtomixMessagingConfiguration configuration;
protected AtomixMessagingProducer(AtomixMessagingEndpoint endpoint) {
- super(endpoint);
+ super(endpoint, endpoint.getConfiguration().getDefaultAction().name());
this.configuration = endpoint.getConfiguration();
}
@@ -48,7 +47,7 @@ public final class AtomixMessagingProducer extends AbstractAtomixClientProducer<
// *********************************
@InvokeOnHeader("DIRECT")
- boolean onDirect(Message message, AsyncCallback callback) throws Exception {
+ void onDirect(Message message, AsyncCallback callback) throws Exception {
final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
final String memberName = message.getHeader(MEMBER_NAME, configuration::getMemberName, String.class);
final String channelName = message.getHeader(CHANNEL_NAME, configuration::getChannelName, String.class);
@@ -63,12 +62,10 @@ public final class AtomixMessagingProducer extends AbstractAtomixClientProducer<
producer.send(value).thenAccept(
result -> processResult(message, callback, result));
-
- return false;
}
@InvokeOnHeader("BROADCAST")
- boolean onBroadcast(Message message, AsyncCallback callback) throws Exception {
+ void onBroadcast(Message message, AsyncCallback callback) throws Exception {
final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
final String channelName = message.getHeader(CHANNEL_NAME, configuration::getChannelName, String.class);
final AtomixMessaging.BroadcastType type
@@ -86,8 +83,6 @@ public final class AtomixMessagingProducer extends AbstractAtomixClientProducer<
producer.send(value).thenRun(
() -> processResult(message, callback, null));
-
- return false;
}
// *********************************
@@ -95,11 +90,6 @@ public final class AtomixMessagingProducer extends AbstractAtomixClientProducer<
// *********************************
@Override
- protected String getProcessorKey(Message message) {
- return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class);
- }
-
- @Override
protected String getResourceName(Message message) {
return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class);
}
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java
index 4e2972e..4d89c2f 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java
@@ -26,7 +26,6 @@ import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer;
import org.apache.camel.spi.InvokeOnHeader;
import org.apache.camel.util.ObjectHelper;
-import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_KEY;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY;
@@ -37,7 +36,7 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A
private final AtomixMultiMapConfiguration configuration;
protected AtomixMultiMapProducer(AtomixMultiMapEndpoint endpoint) {
- super(endpoint);
+ super(endpoint, endpoint.getConfiguration().getDefaultAction().name());
this.configuration = endpoint.getConfiguration();
}
@@ -51,7 +50,7 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A
}
@InvokeOnHeader("PUT")
- boolean onPut(Message message, AsyncCallback callback) throws Exception {
+ void onPut(Message message, AsyncCallback callback) throws Exception {
final DistributedMultiMap<Object, Object> map = getResource(message);
final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class);
final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
@@ -67,12 +66,10 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A
map.put(key, val).thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("GET")
- boolean onGet(Message message, AsyncCallback callback) throws Exception {
+ void onGet(Message message, AsyncCallback callback) throws Exception {
final DistributedMultiMap<Object, Object> map = getResource(message);
final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class);
final ReadConsistency consistency
@@ -87,22 +84,18 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A
map.get(key).thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("CLEAR")
- boolean onClear(Message message, AsyncCallback callback) throws Exception {
+ void onClear(Message message, AsyncCallback callback) throws Exception {
final DistributedMultiMap<Object, Object> map = getResource(message);
map.clear().thenAccept(
result -> processResult(message, callback, result));
-
- return false;
}
@InvokeOnHeader("SIZE")
- boolean onSize(Message message, AsyncCallback callback) throws Exception {
+ void onSize(Message message, AsyncCallback callback) throws Exception {
final DistributedMultiMap<Object, Object> map = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -125,12 +118,10 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A
result -> processResult(message, callback, result));
}
}
-
- return false;
}
@InvokeOnHeader("IS_EMPTY")
- boolean onIsEmpty(Message message, AsyncCallback callback) throws Exception {
+ void onIsEmpty(Message message, AsyncCallback callback) throws Exception {
final DistributedMultiMap<Object, Object> map = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -142,12 +133,10 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A
map.isEmpty().thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("CONTAINS_KEY")
- boolean onContainsKey(Message message, AsyncCallback callback) throws Exception {
+ void onContainsKey(Message message, AsyncCallback callback) throws Exception {
final DistributedMultiMap<Object, Object> map = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -162,56 +151,48 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A
map.containsKey(key).thenAccept(
result -> processResult(message, callback, result));
}
+ }
+
+ @InvokeOnHeader("CONTAINS_VALUE")
+ void onContainsValue(Message message, AsyncCallback callback) throws Exception {
+ final DistributedMultiMap<Object, Object> map = getResource(message);
+ final ReadConsistency consistency
+ = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
+ final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
- return false;
+ ObjectHelper.notNull(value, RESOURCE_VALUE);
+
+ if (consistency != null) {
+ map.containsValue(value, consistency).thenAccept(
+ result -> processResult(message, callback, result));
+ } else {
+ map.containsValue(value).thenAccept(
+ result -> processResult(message, callback, result));
+ }
}
- // @InvokeOnHeader("CONTAINS_VALUE")
- // boolean onContainsValue(Message message, AsyncCallback callback) throws Exception {
- // final DistributedMultiMap<Object, Object> map = getResource(message);
- // final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
- // final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
- //
- // ObjectHelper.notNull(value, RESOURCE_VALUE);
- //
- // if (consistency != null) {
- // map.containsValue(value, consistency).thenAccept(
- // result -> processResult(message, callback, result)
- // );
- // } else {
- // map.containsValue(value).thenAccept(
- // result -> processResult(message, callback, result)
- // );
- // }
- //
- // return false;
- // }
-
- // @InvokeOnHeader("CONTAINS_ENTRY")
- // boolean onContainsEntry(Message message, AsyncCallback callback) throws Exception {
- // final DistributedMultiMap<Object, Object> map = getResource(message);
- // final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
- // final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class);
- // final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
- //
- // ObjectHelper.notNull(key, RESOURCE_VALUE);
- // ObjectHelper.notNull(value, RESOURCE_KEY);
- //
- // if (consistency != null) {
- // map.containsEntry(key, value, consistency).thenAccept(
- // result -> processResult(message, callback, result)
- // );
- // } else {
- // map.containsEntry(key, value).thenAccept(
- // result -> processResult(message, callback, result)
- // );
- // }
- //
- // return false;
- // }
+ @InvokeOnHeader("CONTAINS_ENTRY")
+ void onContainsEntry(Message message, AsyncCallback callback) throws Exception {
+ final DistributedMultiMap<Object, Object> map = getResource(message);
+ final ReadConsistency consistency
+ = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
+ final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class);
+ final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
+
+ ObjectHelper.notNull(key, RESOURCE_VALUE);
+ ObjectHelper.notNull(value, RESOURCE_KEY);
+
+ if (consistency != null) {
+ map.containsEntry(key, value, consistency).thenAccept(
+ result -> processResult(message, callback, result));
+ } else {
+ map.containsEntry(key, value).thenAccept(
+ result -> processResult(message, callback, result));
+ }
+ }
@InvokeOnHeader("REMOVE")
- boolean onRemove(Message message, AsyncCallback callback) throws Exception {
+ void onRemove(Message message, AsyncCallback callback) throws Exception {
final DistributedMultiMap<Object, Object> map = getResource(message);
final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class);
final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
@@ -225,12 +206,10 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A
map.remove(key).thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("REMOVE_VALUE")
- boolean onRemoveValue(Message message, AsyncCallback callback) throws Exception {
+ void onRemoveValue(Message message, AsyncCallback callback) throws Exception {
final DistributedMultiMap<Object, Object> map = getResource(message);
final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
@@ -238,8 +217,6 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A
map.removeValue(value).thenAccept(
result -> processResult(message, callback, result));
-
- return false;
}
// *********************************
@@ -247,11 +224,6 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A
// *********************************
@Override
- protected String getProcessorKey(Message message) {
- return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class);
- }
-
- @Override
protected String getResourceName(Message message) {
return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class);
}
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java
index 5c3f21a..2415fdb 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java
@@ -24,7 +24,6 @@ import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer;
import org.apache.camel.spi.InvokeOnHeader;
import org.apache.camel.util.ObjectHelper;
-import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE;
@@ -33,7 +32,7 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom
private final AtomixQueueConfiguration configuration;
protected AtomixQueueProducer(AtomixQueueEndpoint endpoint) {
- super(endpoint);
+ super(endpoint, endpoint.getConfiguration().getDefaultAction().name());
this.configuration = endpoint.getConfiguration();
}
@@ -42,7 +41,7 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom
// *********************************
@InvokeOnHeader("ADD")
- boolean onAdd(Message message, AsyncCallback callback) throws Exception {
+ void onAdd(Message message, AsyncCallback callback) throws Exception {
final DistributedQueue<Object> queue = getResource(message);
final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
@@ -50,12 +49,10 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom
queue.add(val).thenAccept(
result -> processResult(message, callback, result));
-
- return false;
}
@InvokeOnHeader("OFFER")
- boolean onOffer(Message message, AsyncCallback callback) throws Exception {
+ void onOffer(Message message, AsyncCallback callback) throws Exception {
final DistributedQueue<Object> queue = getResource(message);
final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
@@ -63,42 +60,34 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom
queue.offer(val).thenAccept(
result -> processResult(message, callback, result));
-
- return false;
}
@InvokeOnHeader("PEEK")
- boolean onPeek(Message message, AsyncCallback callback) throws Exception {
+ void onPeek(Message message, AsyncCallback callback) throws Exception {
final DistributedQueue<Object> queue = getResource(message);
queue.peek().thenAccept(
result -> processResult(message, callback, result));
-
- return false;
}
@InvokeOnHeader("POLL")
- boolean onPoll(Message message, AsyncCallback callback) throws Exception {
+ void onPoll(Message message, AsyncCallback callback) throws Exception {
final DistributedQueue<Object> queue = getResource(message);
queue.poll().thenAccept(
result -> processResult(message, callback, result));
-
- return false;
}
@InvokeOnHeader("CLEAR")
- boolean onClear(Message message, AsyncCallback callback) throws Exception {
+ void onClear(Message message, AsyncCallback callback) throws Exception {
final DistributedQueue<Object> queue = getResource(message);
queue.clear().thenAccept(
result -> processResult(message, callback, result));
-
- return false;
}
@InvokeOnHeader("CONTAINS")
- boolean onContains(Message message, AsyncCallback callback) throws Exception {
+ void onContains(Message message, AsyncCallback callback) throws Exception {
final DistributedQueue<Object> queue = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -113,12 +102,10 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom
queue.contains(value).thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("IS_EMPTY")
- boolean onIsEmpty(Message message, AsyncCallback callback) throws Exception {
+ void onIsEmpty(Message message, AsyncCallback callback) throws Exception {
final DistributedQueue<Object> queue = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -130,12 +117,10 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom
queue.isEmpty().thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("REMOVE")
- boolean onRemove(Message message, AsyncCallback callback) throws Exception {
+ void onRemove(Message message, AsyncCallback callback) throws Exception {
final DistributedQueue<Object> queue = getResource(message);
final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
@@ -146,12 +131,10 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom
queue.remove(value).thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("SIZE")
- boolean onSize(Message message, AsyncCallback callback) throws Exception {
+ void onSize(Message message, AsyncCallback callback) throws Exception {
final DistributedQueue<Object> queue = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -163,8 +146,6 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom
queue.size().thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
// *********************************
@@ -172,11 +153,6 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom
// *********************************
@Override
- protected String getProcessorKey(Message message) {
- return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class);
- }
-
- @Override
protected String getResourceName(Message message) {
return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class);
}
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java
index fef2eea..f10c1e4 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java
@@ -26,7 +26,6 @@ import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer;
import org.apache.camel.spi.InvokeOnHeader;
import org.apache.camel.util.ObjectHelper;
-import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_TTL;
@@ -36,7 +35,7 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix
private final AtomixSetConfiguration configuration;
protected AtomixSetProducer(AtomixSetEndpoint endpoint) {
- super(endpoint);
+ super(endpoint, endpoint.getConfiguration().getDefaultAction().name());
this.configuration = endpoint.getConfiguration();
}
@@ -50,7 +49,7 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix
}
@InvokeOnHeader("ADD")
- boolean onAdd(Message message, AsyncCallback callback) throws Exception {
+ void onAdd(Message message, AsyncCallback callback) throws Exception {
final DistributedSet<Object> set = getResource(message);
final long ttl = getResourceTtl(message);
final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
@@ -64,22 +63,18 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix
set.add(val).thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("CLEAR")
- boolean onClear(Message message, AsyncCallback callback) throws Exception {
+ void onClear(Message message, AsyncCallback callback) throws Exception {
final DistributedSet<Object> set = getResource(message);
set.clear().thenAccept(
result -> processResult(message, callback, result));
-
- return false;
}
@InvokeOnHeader("CONTAINS")
- boolean onContains(Message message, AsyncCallback callback) throws Exception {
+ void onContains(Message message, AsyncCallback callback) throws Exception {
final DistributedSet<Object> set = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -94,12 +89,10 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix
set.contains(value).thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("IS_EMPTY")
- boolean onIsEmpty(Message message, AsyncCallback callback) throws Exception {
+ void onIsEmpty(Message message, AsyncCallback callback) throws Exception {
final DistributedSet<Object> set = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -111,12 +104,10 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix
set.isEmpty().thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("REMOVE")
- boolean onRemove(Message message, AsyncCallback callback) throws Exception {
+ void onRemove(Message message, AsyncCallback callback) throws Exception {
final DistributedSet<Object> set = getResource(message);
final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
@@ -124,12 +115,10 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix
set.remove(value).thenAccept(
result -> processResult(message, callback, result));
-
- return false;
}
@InvokeOnHeader("SIZE")
- boolean onSize(Message message, AsyncCallback callback) throws Exception {
+ void onSize(Message message, AsyncCallback callback) throws Exception {
final DistributedSet<Object> set = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -141,8 +130,6 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix
set.size().thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
// *********************************
@@ -150,11 +137,6 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix
// *********************************
@Override
- protected String getProcessorKey(Message message) {
- return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class);
- }
-
- @Override
protected String getResourceName(Message message) {
return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class);
}
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java
index f5a5dfa..f827ecd 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java
@@ -26,7 +26,6 @@ import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer;
import org.apache.camel.spi.InvokeOnHeader;
import org.apache.camel.util.ObjectHelper;
-import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_OLD_VALUE;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY;
@@ -37,7 +36,7 @@ public final class AtomixValueProducer extends AbstractAtomixClientProducer<Atom
private final AtomixValueConfiguration configuration;
protected AtomixValueProducer(AtomixValueEndpoint endpoint) {
- super(endpoint);
+ super(endpoint, endpoint.getConfiguration().getDefaultAction().name());
this.configuration = endpoint.getConfiguration();
}
@@ -51,7 +50,7 @@ public final class AtomixValueProducer extends AbstractAtomixClientProducer<Atom
}
@InvokeOnHeader("SET")
- boolean onSet(Message message, AsyncCallback callback) throws Exception {
+ void onSet(Message message, AsyncCallback callback) throws Exception {
final DistributedValue<Object> value = getResource(message);
final long ttl = getResourceTtl(message);
final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
@@ -65,12 +64,10 @@ public final class AtomixValueProducer extends AbstractAtomixClientProducer<Atom
value.set(val).thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("GET")
- boolean onGet(Message message, AsyncCallback callback) throws Exception {
+ void onGet(Message message, AsyncCallback callback) throws Exception {
final DistributedValue<Object> value = getResource(message);
final ReadConsistency consistency
= message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class);
@@ -82,12 +79,10 @@ public final class AtomixValueProducer extends AbstractAtomixClientProducer<Atom
value.get().thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("GET_AND_SET")
- boolean onGetAndSet(Message message, AsyncCallback callback) throws Exception {
+ void onGetAndSet(Message message, AsyncCallback callback) throws Exception {
final DistributedValue<Object> value = getResource(message);
final long ttl = getResourceTtl(message);
final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
@@ -101,12 +96,10 @@ public final class AtomixValueProducer extends AbstractAtomixClientProducer<Atom
value.getAndSet(val).thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
@InvokeOnHeader("COMPARE_AND_SET")
- boolean onCompareAndSet(Message message, AsyncCallback callback) throws Exception {
+ void onCompareAndSet(Message message, AsyncCallback callback) throws Exception {
final DistributedValue<Object> value = getResource(message);
final long ttl = getResourceTtl(message);
final Object newVal = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class);
@@ -122,8 +115,6 @@ public final class AtomixValueProducer extends AbstractAtomixClientProducer<Atom
value.compareAndSet(oldVal, newVal).thenAccept(
result -> processResult(message, callback, result));
}
-
- return false;
}
// *********************************
@@ -131,11 +122,6 @@ public final class AtomixValueProducer extends AbstractAtomixClientProducer<Atom
// *********************************
@Override
- protected String getProcessorKey(Message message) {
- return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class);
- }
-
- @Override
protected String getResourceName(Message message) {
return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class);
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/HeaderSelectorProducer.java b/core/camel-support/src/main/java/org/apache/camel/support/HeaderSelectorProducer.java
index 523923e..fa65e9a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/HeaderSelectorProducer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/HeaderSelectorProducer.java
@@ -162,6 +162,7 @@ public abstract class HeaderSelectorProducer extends DefaultAsyncProducer implem
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
+ boolean sync = true;
try {
String header = headerSupplier.get();
String action = exchange.getIn().getHeader(header, String.class);
@@ -178,17 +179,28 @@ public abstract class HeaderSelectorProducer extends DefaultAsyncProducer implem
if (answer == null && parentStrategy != null) {
answer = parentStrategy.invoke(target, action, exchange, callback);
}
- LOGGER.trace("Invoked @InvokeOnHeader method: {} -> {}", action, answer);
+ if (answer == callback) {
+ // okay it was an async invoked so we should return false
+ sync = false;
+ answer = null;
+ }
+ if (sync) {
+ LOGGER.trace("Invoked @InvokeOnHeader method: {} -> {}", action, answer);
+ } else {
+ LOGGER.trace("Invoked @InvokeOnHeader method: {} is continuing asynchronously", action);
+ }
if (answer != null) {
exchange.getMessage().setBody(answer);
}
-
} catch (Exception e) {
exchange.setException(e);
}
- callback.done(true);
- return true;
+ if (sync) {
+ // callback was not in use, so we must done it here
+ callback.done(true);
+ }
+ return sync;
}
}
diff --git a/tooling/maven/camel-package-maven-plugin/src/main/java/org/apache/camel/maven/packaging/GenerateInvokeOnHeaderMojo.java b/tooling/maven/camel-package-maven-plugin/src/main/java/org/apache/camel/maven/packaging/GenerateInvokeOnHeaderMojo.java
index a78b90a..35e3eda 100644
--- a/tooling/maven/camel-package-maven-plugin/src/main/java/org/apache/camel/maven/packaging/GenerateInvokeOnHeaderMojo.java
+++ b/tooling/maven/camel-package-maven-plugin/src/main/java/org/apache/camel/maven/packaging/GenerateInvokeOnHeaderMojo.java
@@ -221,22 +221,30 @@ public class GenerateInvokeOnHeaderMojo extends AbstractGeneratorMojo {
if (!models.isEmpty()) {
w.write(" switch (key) {\n");
for (InvokeOnHeaderModel option : models) {
+ boolean sync = true;
String invoke = "target." + option.getMethodName() + "(";
if (!option.getArgs().isEmpty()) {
StringJoiner sj = new StringJoiner(", ");
for (String arg : option.getArgs()) {
String ba = bindArg(arg);
+ // if callback is in use then we are no long synchronous
+ sync &= !ba.equals("callback");
sj.add(ba);
}
invoke += sj.toString();
}
+ String ret = "null";
+ if (!sync) {
+ // return the callback instance in async mode to signal that callback are in use
+ ret = "callback";
+ }
invoke += ")";
if (!option.getKey().toLowerCase().equals(option.getKey())) {
w.write(String.format(" case \"%s\":\n", option.getKey().toLowerCase()));
}
- if (option.getReturnType().equals("VOID")) {
- w.write(String.format(" case \"%s\": %s; return null;\n", option.getKey(), invoke));
+ if (!sync || option.getReturnType().equals("VOID")) {
+ w.write(String.format(" case \"%s\": %s; return %s;\n", option.getKey(), invoke, ret));
} else {
w.write(String.format(" case \"%s\": return %s;\n", option.getKey(), invoke));
}