You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/04/10 14:22:28 UTC

[camel] 02/03: CAMEL-12427: camel-netty4 - Add SPI to plugin custom correlation state for request/reply in producer

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 30f57c2b739a14b3cc2b0281855ab54a5a76b367
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 9 17:53:08 2018 +0200

    CAMEL-12427: camel-netty4 - Add SPI to plugin custom correlation state for request/reply in producer
---
 .../netty4/NettyCustomCorrelationManagerTest.java  | 122 +++++++++++++++++++++
 1 file changed, 122 insertions(+)

diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCustomCorrelationManagerTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCustomCorrelationManagerTest.java
new file mode 100644
index 0000000..35b2d6b
--- /dev/null
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCustomCorrelationManagerTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+public class NettyCustomCorrelationManagerTest extends BaseNettyTest {
+
+    private final MyCorrelationManager myManager = new MyCorrelationManager();
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myManager", myManager);
+        return jndi;
+    }
+
+    @Test
+    public void testCustomCorrelationManager() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        // the messages can be processed in any order
+        mock.expectedBodiesReceivedInAnyOrder("Bye A", "Bye B", "Bye C");
+        // the custom manager should be used
+        mock.allMessages().header("manager").isEqualTo(myManager);
+        // check that the request and reply are correlated correctly
+        mock.allMessages().predicate(exchange -> {
+            String request = exchange.getMessage().getHeader("request", String.class);
+            String reply = exchange.getMessage().getBody(String.class);
+            return reply.endsWith(request);
+        });
+
+        template.sendBodyAndHeader("seda:start", "A", "request", "A");
+        template.sendBodyAndHeader("seda:start", "B", "request", "B");
+        template.sendBodyAndHeader("seda:start", "C", "request", "C");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start")
+                    .log("before ${body}")
+                    .to("netty4:tcp://localhost:{{port}}?textline=true&sync=true&correlationManager=#myManager")
+                    .log("after ${body}")
+                    .to("mock:result");
+
+                from("netty4:tcp://localhost:{{port}}?textline=true&sync=true")
+                    .transform(body().prepend("Bye "));
+            }
+        };
+    }
+
+    private static final class MyCorrelationManager implements NettyCamelStateCorrelationManager {
+
+        private volatile NettyCamelState stateA;
+        private volatile NettyCamelState stateB;
+        private volatile NettyCamelState stateC;
+
+        @Override
+        public void putState(Channel channel, NettyCamelState state) {
+            String body = state.getExchange().getMessage().getBody(String.class);
+            if ("A".equals(body)) {
+                stateA = state;
+            } else if ("B".equals(body)) {
+                stateB = state;
+            } else if ("C".equals(body)) {
+                stateC = state;
+            }
+        }
+
+        @Override
+        public void removeState(ChannelHandlerContext ctx, Channel channel) {
+            // noop
+        }
+
+        @Override
+        public NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Object msg) {
+            String body = msg.toString();
+            if (body.endsWith("A")) {
+                stateA.getExchange().getMessage().setHeader("manager", this);
+                return stateA;
+            } else if (body.endsWith("B")) {
+                stateB.getExchange().getMessage().setHeader("manager", this);
+                return stateB;
+            } else if (body.endsWith("C")) {
+                stateC.getExchange().getMessage().setHeader("manager", this);
+                return stateC;
+            }
+            return null;
+        }
+
+        @Override
+        public NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Throwable cause) {
+            // noop
+            return null;
+        }
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
davsclaus@apache.org.