You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/04/21 08:02:39 UTC
[camel] branch main updated: CAMEL-19286: camel-ignite - Support the REPLACE operation in the Ignite Cache Producer (#9908)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 6e87bb72797 CAMEL-19286: camel-ignite - Support the REPLACE operation in the Ignite Cache Producer (#9908)
6e87bb72797 is described below
commit 6e87bb72797b297cef7402908fd0ba3e470d7465
Author: Kengo Seki <se...@apache.org>
AuthorDate: Fri Apr 21 17:02:32 2023 +0900
CAMEL-19286: camel-ignite - Support the REPLACE operation in the Ignite Cache Producer (#9908)
---
.../camel/component/ignite/IgniteConstants.java | 4 ++--
.../ignite/cache/IgniteCacheOperation.java | 3 ++-
.../ignite/cache/IgniteCacheProducer.java | 22 +++++++++++++++++++
.../camel/component/ignite/IgniteCacheTest.java | 25 ++++++++++++++++++++++
4 files changed, 51 insertions(+), 3 deletions(-)
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java
index 5c7dc92df2c..2ab7b75d15f 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java
@@ -55,8 +55,8 @@ public final class IgniteConstants {
"It does not allow you to dynamically change the cache against which a producer operation is performed. Use EIPs for that (e.g. recipient list, dynamic router).",
javaType = "String", applicableFor = SCHEME_CACHE)
public static final String IGNITE_CACHE_NAME = "CamelIgniteCacheName";
- @Metadata(label = "consumer",
- description = "This header carries the old cache value when passed in the incoming cache event.",
+ @Metadata(description = "(producer) The old cache value to be replaced when invoking the REPLACE operation. \n" +
+ "(consumer) This header carries the old cache value when passed in the incoming cache event.",
javaType = "Object", applicableFor = SCHEME_CACHE)
public static final String IGNITE_CACHE_OLD_VALUE = "CamelIgniteCacheOldValue";
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java
index 1ca66304be3..ce76db93250 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java
@@ -27,6 +27,7 @@ public enum IgniteCacheOperation {
SIZE,
REBALANCE,
QUERY,
- CLEAR
+ CLEAR,
+ REPLACE,
}
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
index ebcb4f5552b..afdab7542a0 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
@@ -85,6 +85,10 @@ public class IgniteCacheProducer extends DefaultAsyncProducer {
doRebalance(in, out);
break;
+ case REPLACE:
+ doReplace(in, out);
+ break;
+
default:
break;
}
@@ -196,6 +200,24 @@ public class IgniteCacheProducer extends DefaultAsyncProducer {
out.setBody(result);
}
+ private void doReplace(Message in, Message out) {
+ Object cacheKey = in.getHeader(IgniteConstants.IGNITE_CACHE_KEY);
+
+ if (cacheKey == null) {
+ throw new RuntimeCamelException(
+ "Cache REPLACE operation requires the cache key in the CamelIgniteCacheKey header");
+ }
+
+ Object oldValue = in.getHeader(IgniteConstants.IGNITE_CACHE_OLD_VALUE);
+ if (oldValue == null) {
+ cache.replace(cacheKey, in.getBody());
+ } else {
+ cache.replace(cacheKey, oldValue, in.getBody());
+ }
+
+ IgniteHelper.maybePropagateIncomingBody(endpoint, in, out);
+ }
+
private Object cacheKey(Message msg) {
Object cacheKey = msg.getHeader(IgniteConstants.IGNITE_CACHE_KEY);
if (cacheKey == null) {
diff --git a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheTest.java b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheTest.java
index b55984d89f3..6e077553b22 100644
--- a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheTest.java
+++ b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheTest.java
@@ -179,6 +179,31 @@ public class IgniteCacheTest extends AbstractIgniteTest {
}
+ @Test
+ public void testReplaceEntry() {
+ template.requestBodyAndHeader("ignite-cache:" + resourceUid + "?operation=REPLACE", "5678",
+ IgniteConstants.IGNITE_CACHE_KEY, "abcd");
+ IgniteCache<String, String> cache = ignite().getOrCreateCache(resourceUid);
+ Assertions.assertThat(cache.size(CachePeekMode.ALL)).isEqualTo(0);
+
+ cache.put("abcd", "1234");
+ template.requestBodyAndHeader("ignite-cache:" + resourceUid + "?operation=REPLACE", "5678",
+ IgniteConstants.IGNITE_CACHE_KEY, "abcd");
+ Assertions.assertThat(cache.size(CachePeekMode.ALL)).isEqualTo(1);
+ Assertions.assertThat(cache.get("abcd")).isEqualTo("5678");
+
+ Map<String, Object> headers
+ = Map.of(IgniteConstants.IGNITE_CACHE_KEY, "abcd", IgniteConstants.IGNITE_CACHE_OLD_VALUE, "1234");
+ template.requestBodyAndHeaders("ignite-cache:" + resourceUid + "?operation=REPLACE", "9", headers);
+ Assertions.assertThat(cache.size(CachePeekMode.ALL)).isEqualTo(1);
+ Assertions.assertThat(cache.get("abcd")).isEqualTo("5678");
+
+ headers = Map.of(IgniteConstants.IGNITE_CACHE_KEY, "abcd", IgniteConstants.IGNITE_CACHE_OLD_VALUE, "5678");
+ template.requestBodyAndHeaders("ignite-cache:" + resourceUid + "?operation=REPLACE", "9", headers);
+ Assertions.assertThat(cache.size(CachePeekMode.ALL)).isEqualTo(1);
+ Assertions.assertThat(cache.get("abcd")).isEqualTo("9");
+ }
+
@Test
public void testClearCache() {
IgniteCache<String, String> cache = ignite().getOrCreateCache(resourceUid);