You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/04/10 14:22:28 UTC
[camel] 02/03: CAMEL-12427: camel-netty4 - Add SPI to plugin custom
correlation state for request/reply in producer
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-2.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 30f57c2b739a14b3cc2b0281855ab54a5a76b367
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 9 17:53:08 2018 +0200
CAMEL-12427: camel-netty4 - Add SPI to plugin custom correlation state for request/reply in producer
---
.../netty4/NettyCustomCorrelationManagerTest.java | 122 +++++++++++++++++++++
1 file changed, 122 insertions(+)
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCustomCorrelationManagerTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCustomCorrelationManagerTest.java
new file mode 100644
index 0000000..35b2d6b
--- /dev/null
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCustomCorrelationManagerTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+public class NettyCustomCorrelationManagerTest extends BaseNettyTest {
+
+ private final MyCorrelationManager myManager = new MyCorrelationManager();
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("myManager", myManager);
+ return jndi;
+ }
+
+ @Test
+ public void testCustomCorrelationManager() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ // the messages can be processed in any order
+ mock.expectedBodiesReceivedInAnyOrder("Bye A", "Bye B", "Bye C");
+ // the custom manager should be used
+ mock.allMessages().header("manager").isEqualTo(myManager);
+ // check that the request and reply are correlated correctly
+ mock.allMessages().predicate(exchange -> {
+ String request = exchange.getMessage().getHeader("request", String.class);
+ String reply = exchange.getMessage().getBody(String.class);
+ return reply.endsWith(request);
+ });
+
+ template.sendBodyAndHeader("seda:start", "A", "request", "A");
+ template.sendBodyAndHeader("seda:start", "B", "request", "B");
+ template.sendBodyAndHeader("seda:start", "C", "request", "C");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:start")
+ .log("before ${body}")
+ .to("netty4:tcp://localhost:{{port}}?textline=true&sync=true&correlationManager=#myManager")
+ .log("after ${body}")
+ .to("mock:result");
+
+ from("netty4:tcp://localhost:{{port}}?textline=true&sync=true")
+ .transform(body().prepend("Bye "));
+ }
+ };
+ }
+
+ private static final class MyCorrelationManager implements NettyCamelStateCorrelationManager {
+
+ private volatile NettyCamelState stateA;
+ private volatile NettyCamelState stateB;
+ private volatile NettyCamelState stateC;
+
+ @Override
+ public void putState(Channel channel, NettyCamelState state) {
+ String body = state.getExchange().getMessage().getBody(String.class);
+ if ("A".equals(body)) {
+ stateA = state;
+ } else if ("B".equals(body)) {
+ stateB = state;
+ } else if ("C".equals(body)) {
+ stateC = state;
+ }
+ }
+
+ @Override
+ public void removeState(ChannelHandlerContext ctx, Channel channel) {
+ // noop
+ }
+
+ @Override
+ public NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Object msg) {
+ String body = msg.toString();
+ if (body.endsWith("A")) {
+ stateA.getExchange().getMessage().setHeader("manager", this);
+ return stateA;
+ } else if (body.endsWith("B")) {
+ stateB.getExchange().getMessage().setHeader("manager", this);
+ return stateB;
+ } else if (body.endsWith("C")) {
+ stateC.getExchange().getMessage().setHeader("manager", this);
+ return stateC;
+ }
+ return null;
+ }
+
+ @Override
+ public NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Throwable cause) {
+ // noop
+ return null;
+ }
+ }
+}
--
To stop receiving notification emails like this one, please contact
davsclaus@apache.org.