You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/01/31 09:07:20 UTC
(camel) 04/16: CAMEL-19749: variables - Should also copy message headers into variable when using EIP variables
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch var-headers
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0731a28208698444e717edf72414fbaa5faee63a
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jan 29 10:59:38 2024 +0100
CAMEL-19749: variables - Should also copy message headers into variable when using EIP variables
---
.../java/org/apache/camel/processor/Enricher.java | 19 ++++-
.../org/apache/camel/processor/PollEnricher.java | 17 +++-
.../camel/processor/EnrichVariableHeadersTest.java | 94 ++++++++++++++++++++++
.../processor/PollEnrichVariableHeadersTest.java | 62 ++++++++++++++
.../org/apache/camel/support/ExchangeHelper.java | 6 +-
5 files changed, 187 insertions(+), 11 deletions(-)
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
index bb8350971ec..9d641b01a05 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor;
+import java.util.Map;
+
import org.apache.camel.AggregationStrategy;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
@@ -26,6 +28,7 @@ import org.apache.camel.ExchangePattern;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Expression;
import org.apache.camel.spi.EndpointUtilizationStatistics;
+import org.apache.camel.spi.HeadersMapFactory;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ProcessorExchangeFactory;
import org.apache.camel.spi.RouteIdAware;
@@ -63,6 +66,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
private boolean ignoreInvalidEndpoint;
private boolean allowOptimisedComponents = true;
private boolean autoStartupComponents = true;
+ private HeadersMapFactory headersMapFactory;
private ProcessorExchangeFactory processorExchangeFactory;
private SendDynamicProcessor sendDynamicProcessor;
@@ -188,9 +192,12 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
// if we should store the received message body in a variable,
// then we need to preserve the original message body
Object body = null;
+ Map<String, Object> headers = null;
if (variableReceive != null) {
try {
body = exchange.getMessage().getBody();
+ // do a defensive copy of the headers
+ headers = headersMapFactory.newMap(exchange.getMessage().getHeaders());
} catch (Exception throwable) {
exchange.setException(throwable);
callback.done(true);
@@ -198,6 +205,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
}
}
final Object originalBody = body;
+ final Map<String, Object> originalHeaders = headers;
return sendDynamicProcessor.process(resourceExchange, new AsyncCallback() {
@Override
@@ -216,9 +224,9 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
if (aggregatedExchange != null) {
if (variableReceive != null) {
// result should be stored in variable instead of message body
- Object value = aggregatedExchange.getMessage().getBody();
- ExchangeHelper.setVariable(exchange, variableReceive, value);
- aggregatedExchange.getMessage().setBody(originalBody);
+ ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive);
+ exchange.getMessage().setBody(originalBody);
+ exchange.getMessage().setHeaders(originalHeaders);
}
// copy aggregation result onto original exchange (preserving pattern)
copyResultsWithoutCorrelationId(exchange, aggregatedExchange);
@@ -302,6 +310,11 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
ServiceHelper.buildService(processorExchangeFactory, sendDynamicProcessor);
}
+ @Override
+ protected void doInit() throws Exception {
+ headersMapFactory = camelContext.getCamelContextExtension().getHeadersMapFactory();
+ }
+
@Override
protected void doStart() throws Exception {
ServiceHelper.startService(processorExchangeFactory, aggregationStrategy, sendDynamicProcessor);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
index 21759c73f24..4005d83834c 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor;
+import java.util.Map;
+
import org.apache.camel.AggregationStrategy;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
@@ -32,6 +34,7 @@ import org.apache.camel.PollingConsumer;
import org.apache.camel.spi.ConsumerCache;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.HeadersMapFactory;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.RouteIdAware;
@@ -64,7 +67,8 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
private CamelContext camelContext;
private ConsumerCache consumerCache;
- protected volatile String scheme;
+ private HeadersMapFactory headersMapFactory;
+ private volatile String scheme;
private String id;
private String routeId;
private String variableReceive;
@@ -320,9 +324,12 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
// if we should store the received message body in a variable,
// then we need to preserve the original message body
Object originalBody = null;
+ Map<String, Object> originalHeaders = null;
if (variableReceive != null) {
try {
originalBody = exchange.getMessage().getBody();
+ // do a defensive copy of the headers
+ originalHeaders = headersMapFactory.newMap(exchange.getMessage().getHeaders());
} catch (Exception throwable) {
exchange.setException(throwable);
callback.done(true);
@@ -345,9 +352,9 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
if (aggregatedExchange != null) {
if (variableReceive != null) {
// result should be stored in variable instead of message body
- Object value = aggregatedExchange.getMessage().getBody();
- ExchangeHelper.setVariable(exchange, variableReceive, value);
- aggregatedExchange.getMessage().setBody(originalBody);
+ ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive);
+ exchange.getMessage().setBody(originalBody);
+ exchange.getMessage().setHeaders(originalHeaders);
}
// copy aggregation result onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, aggregatedExchange);
@@ -485,6 +492,8 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
scheme = ExchangeHelper.resolveScheme(u);
}
+ headersMapFactory = camelContext.getCamelContextExtension().getHeadersMapFactory();
+
ServiceHelper.initService(consumerCache, aggregationStrategy);
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableHeadersTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableHeadersTest.java
new file mode 100644
index 00000000000..229e5c86d4e
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableHeadersTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+public class EnrichVariableHeadersTest extends ContextTestSupport {
+
+ @Test
+ public void testSend() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("World");
+ getMockEndpoint("mock:before").expectedVariableReceived("hello", "Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedVariableReceived("hello", "Camel");
+ getMockEndpoint("mock:result").message(0).header("echo").isEqualTo("CamelCamel");
+
+ template.sendBody("direct:send", "World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testReceive() throws Exception {
+ getMockEndpoint("mock:after").expectedBodiesReceived("World");
+ getMockEndpoint("mock:after").expectedVariableReceived("bye", "Bye World");
+ getMockEndpoint("mock:after").message(0).header("echo").isNull();
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye World");
+ getMockEndpoint("mock:result").message(0).header("echo").isNull();
+
+ template.sendBody("direct:receive", "World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testSendAndReceive() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("World");
+ getMockEndpoint("mock:before").expectedVariableReceived("hello", "Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("World");
+ getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye Camel");
+ getMockEndpoint("mock:result").message(0).header("echo").isNull();
+
+ template.sendBody("direct:sendAndReceive", "World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:send")
+ .setVariable("hello", simple("Camel"))
+ .to("mock:before")
+ .enrich().constant("direct:foo").variableSend("hello")
+ .to("mock:result");
+
+ from("direct:receive")
+ .enrich().constant("direct:foo").variableReceive("bye")
+ .to("mock:after")
+ .setBody(simple("${variable:bye}"))
+ .to("mock:result");
+
+ from("direct:sendAndReceive")
+ .setVariable("hello", simple("Camel"))
+ .to("mock:before")
+ .enrich().constant("direct:foo").variableSend("hello").variableReceive("bye")
+ .to("mock:result");
+
+ from("direct:foo")
+ .setHeader("echo", simple("${body}${body}"))
+ .transform().simple("Bye ${body}");
+ }
+ };
+ }
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableHeadersTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableHeadersTest.java
new file mode 100644
index 00000000000..852e3b0cd13
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableHeadersTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Map;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class PollEnrichVariableHeadersTest extends ContextTestSupport {
+
+ @Test
+ public void testReceive() throws Exception {
+ template.sendBodyAndHeader("seda:foo", "Bye World", "echo", "CamelCamel");
+
+ getMockEndpoint("mock:after").expectedBodiesReceived("World");
+ getMockEndpoint("mock:after").expectedVariableReceived("bye", "Bye World");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye World");
+ getMockEndpoint("mock:result").message(0).header("echo").isNull();
+ getMockEndpoint("mock:result").whenAnyExchangeReceived(e -> {
+ Map m = e.getVariable("bye.headers", Map.class);
+ Assertions.assertNotNull(m);
+ Assertions.assertEquals(1, m.size());
+ Assertions.assertEquals("CamelCamel", m.get("echo"));
+ });
+
+ template.sendBody("direct:receive", "World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:receive")
+ .pollEnrich().constant("seda:foo").timeout(1000).variableReceive("bye")
+ .to("mock:after")
+ .setBody(simple("${variable:bye}"))
+ .to("mock:result");
+ }
+ };
+ }
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 85c34ecf86a..02df6ccce7b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -1094,8 +1094,7 @@ public final class ExchangeHelper {
name = StringHelper.after(name, ":");
repo.setVariable(name, value);
} else {
- exchange.setException(
- new IllegalArgumentException("VariableRepository with id: " + id + " does not exist"));
+ throw new IllegalArgumentException("VariableRepository with id: " + id + " does not exist");
}
} else {
exchange.setVariable(name, value);
@@ -1110,8 +1109,7 @@ public final class ExchangeHelper {
= exchange.getContext().getCamelContextExtension().getContextPlugin(VariableRepositoryFactory.class);
repo = factory.getVariableRepository(id);
if (repo == null) {
- exchange.setException(
- new IllegalArgumentException("VariableRepository with id: " + id + " does not exist"));
+ throw new IllegalArgumentException("VariableRepository with id: " + id + " does not exist");
}
name = StringHelper.after(name, ":");
}