You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/04/09 15:55:11 UTC

[camel] branch master updated (08cc384 -> 71f5d66)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 08cc384  Use log debug
     new 86c201d  CAMEL-12427: camel-netty4 - Add SPI to plugin custom correlation state for request/reply in producer
     new 71f5d66  CAMEL-12427: camel-netty4 - Add SPI to plugin custom correlation state for request/reply in producer

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../component/netty4/http/NettyHttpEndpoint.java   |   2 +-
 .../src/main/docs/netty4-component.adoc            |   3 +-
 ... DefaultNettyCamelStateCorrelationManager.java} |  25 +++--
 .../netty4/NettyCamelStateCorrelationManager.java  |  65 +++++++++++
 .../camel/component/netty4/NettyConfiguration.java |  17 +++
 .../camel/component/netty4/NettyProducer.java      |  45 +++-----
 .../netty4/handlers/ClientChannelHandler.java      |  67 +++++------
 .../netty4/NettyCustomCorrelationManagerTest.java  | 122 +++++++++++++++++++++
 .../springboot/NettyComponentConfiguration.java    |  22 ++++
 9 files changed, 297 insertions(+), 71 deletions(-)
 copy components/camel-netty4/src/main/java/org/apache/camel/component/netty4/{DefaultChannelHandlerFactory.java => DefaultNettyCamelStateCorrelationManager.java} (54%)
 create mode 100644 components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyCamelStateCorrelationManager.java
 create mode 100644 components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCustomCorrelationManagerTest.java

-- 
To stop receiving notification emails like this one, please contact
davsclaus@apache.org.

[camel] 01/02: CAMEL-12427: camel-netty4 - Add SPI to plugin custom correlation state for request/reply in producer

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 86c201d0b97ba204461de565c1ef54516fc9ddc5
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 9 17:26:09 2018 +0200

    CAMEL-12427: camel-netty4 - Add SPI to plugin custom correlation state for request/reply in producer
---
 .../component/netty4/http/NettyHttpEndpoint.java   |  2 +-
 .../src/main/docs/netty4-component.adoc            |  3 +-
 .../DefaultNettyCamelStateCorrelationManager.java  | 48 ++++++++++++++++
 .../netty4/NettyCamelStateCorrelationManager.java  | 65 +++++++++++++++++++++
 .../camel/component/netty4/NettyConfiguration.java | 17 ++++++
 .../camel/component/netty4/NettyProducer.java      | 45 +++++----------
 .../netty4/handlers/ClientChannelHandler.java      | 67 +++++++++++-----------
 .../springboot/NettyComponentConfiguration.java    | 22 +++++++
 8 files changed, 205 insertions(+), 64 deletions(-)

diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java
index 320ca18..b499ff4 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
 @UriEndpoint(firstVersion = "2.14.0", scheme = "netty4-http", extendsScheme = "netty4", title = "Netty4 HTTP",
         syntax = "netty4-http:protocol:host:port/path", consumerClass = NettyHttpConsumer.class, label = "http", lenientProperties = true,
         excludeProperties = "textline,delimiter,autoAppendDelimiter,decoderMaxLineLength,encoding,allowDefaultCodec,udpConnectionlessSending,networkInterface"
-                + ",clientMode,reconnect,reconnectInterval,useByteBuf,udpByteArrayCodec,broadcast")
+                + ",clientMode,reconnect,reconnectInterval,useByteBuf,udpByteArrayCodec,broadcast,correlationManager")
 public class NettyHttpEndpoint extends NettyEndpoint implements AsyncEndpoint, HeaderFilterStrategyAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(NettyHttpEndpoint.class);
diff --git a/components/camel-netty4/src/main/docs/netty4-component.adoc b/components/camel-netty4/src/main/docs/netty4-component.adoc
index d1c909b..4d8c61e 100644
--- a/components/camel-netty4/src/main/docs/netty4-component.adoc
+++ b/components/camel-netty4/src/main/docs/netty4-component.adoc
@@ -100,7 +100,7 @@ with the following path and query parameters:
 |===
 
 
-==== Query Parameters (71 parameters):
+==== Query Parameters (72 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -133,6 +133,7 @@ with the following path and query parameters:
 | *connectTimeout* (producer) | Time to wait for a socket connection to be available. Value is in millis. | 10000 | int
 | *requestTimeout* (producer) | Allows to use a timeout for the Netty producer when calling a remote server. By default no timeout is in use. The value is in milli seconds, so eg 30000 is 30 seconds. The requestTimeout is using Netty's ReadTimeoutHandler to trigger the timeout. |  | long
 | *clientInitializerFactory* (producer) | To use a custom ClientInitializerFactory |  | ClientInitializer Factory
+| *correlationManager* (producer) | To use a custom correlation manager to manage how request and reply messages are mapped when using request/reply with the netty producer. This should only be used if you have a way to map requests together with replies such as if there is correlation ids in both the request and reply messages. This can be used if you want to multiplex concurrent messages on the same channel (aka connection) in netty. When doing this you must have a way to correlate the [...]
 | *lazyChannelCreation* (producer) | Channels can be lazily created to avoid exceptions, if the remote server is not up and running when the Camel producer is started. | true | boolean
 | *producerPoolEnabled* (producer) | Whether producer pool is enabled or not. Important: Do not turn this off, as the pooling is needed for handling concurrency and reliable request/reply. | true | boolean
 | *producerPoolMaxActive* (producer) | Sets the cap on the number of objects that can be allocated by the pool (checked out to clients, or idle awaiting checkout) at a given time. Use a negative value for no limit. | -1 | int
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultNettyCamelStateCorrelationManager.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultNettyCamelStateCorrelationManager.java
new file mode 100644
index 0000000..2a909bb
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultNettyCamelStateCorrelationManager.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+
+public class DefaultNettyCamelStateCorrelationManager implements NettyCamelStateCorrelationManager {
+
+    private final Map<Channel, NettyCamelState> cache = new ConcurrentHashMap<Channel, NettyCamelState>();
+
+    @Override
+    public void putState(Channel channel, NettyCamelState state) {
+        cache.put(channel, state);
+    }
+
+    @Override
+    public void removeState(ChannelHandlerContext ctx, Channel channel) {
+        cache.remove(channel);
+    }
+
+    @Override
+    public NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Object msg) {
+        return cache.get(channel);
+    }
+
+    @Override
+    public NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Throwable cause) {
+        return cache.get(channel);
+    }
+}
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyCamelStateCorrelationManager.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyCamelStateCorrelationManager.java
new file mode 100644
index 0000000..cd76196
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyCamelStateCorrelationManager.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+
+/**
+ * To manage and correlate state of {@link NettyCamelState} when doing request/reply via {@link NettyProducer}.
+ * <p/>
+ * This SPI allows custom implementations to correlate the request and replies.
+ */
+public interface NettyCamelStateCorrelationManager {
+
+    /**
+     * Puts the state.
+     * <p/>
+     * You can get access to the Camel message from the {@link NettyCamelState} instance.
+     *
+     * @param channel the channel
+     * @param state   the Camel state to be stored
+     */
+    void putState(Channel channel, NettyCamelState state);
+
+    /**
+     * Removes the state when the channel is inactive.
+     *
+     * @param ctx netty channel handler context
+     * @param channel the channel
+     */
+    void removeState(ChannelHandlerContext ctx, Channel channel);
+
+    /**
+     * Gets the state when a response message has been received.
+     *
+     * @param ctx netty channel handler context
+     * @param channel the channel
+     * @param msg the response message
+     */
+    NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Object msg);
+
+    /**
+     * Gets the state when some error occurred.
+     *
+     * @param ctx netty channel handler context
+     * @param channel the channel
+     * @param cause the error
+     */
+    NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Throwable cause);
+
+}
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
index c63ab65..98e9298 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
@@ -110,6 +110,8 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
     private boolean udpByteArrayCodec;
     @UriParam(label = "common")
     private boolean reuseChannel;
+    @UriParam(label = "producer,advanced")
+    private NettyCamelStateCorrelationManager correlationManager;
 
     /**
      * Returns a copy of this configuration
@@ -655,6 +657,21 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
         this.reuseChannel = reuseChannel;
     }
 
+    public NettyCamelStateCorrelationManager getCorrelationManager() {
+        return correlationManager;
+    }
+
+    /**
+     * To use a custom correlation manager to manage how request and reply messages are mapped when using request/reply with the netty producer.
+     * This should only be used if you have a way to map requests together with replies such as if there is correlation ids in both the request
+     * and reply messages. This can be used if you want to multiplex concurrent messages on the same channel (aka connection) in netty. When doing
+     * this you must have a way to correlate the request and reply messages so you can store the right reply on the inflight Camel Exchange before
+     * its continued routed.
+     */
+    public void setCorrelationManager(NettyCamelStateCorrelationManager correlationManager) {
+        this.correlationManager = correlationManager;
+    }
+
     private static <T> void addToHandlersList(List<T> configured, List<T> handlers, Class<T> handlerType) {
         if (handlers != null) {
             for (T handler : handlers) {
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
index dcbcc03..203f9c0 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
@@ -18,8 +18,6 @@ package org.apache.camel.component.netty4;
 
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -64,7 +62,7 @@ public class NettyProducer extends DefaultAsyncProducer {
     private CamelLogger noReplyLogger;
     private EventLoopGroup workerGroup;
     private ObjectPool<ChannelFuture> pool;
-    private Map<Channel, NettyCamelState> nettyCamelStatesMap = new ConcurrentHashMap<Channel, NettyCamelState>();
+    private NettyCamelStateCorrelationManager correlationManager;
 
     public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) {
         super(nettyEndpoint);
@@ -87,6 +85,10 @@ public class NettyProducer extends DefaultAsyncProducer {
         return context;
     }
 
+    public NettyCamelStateCorrelationManager getCorrelationManager() {
+        return correlationManager;
+    }
+
     protected boolean isTcp() {
         return configuration.getProtocol().equalsIgnoreCase("tcp");
     }
@@ -94,6 +96,13 @@ public class NettyProducer extends DefaultAsyncProducer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
+
+        if (configuration.getCorrelationManager() != null) {
+            correlationManager = configuration.getCorrelationManager();
+        } else {
+            correlationManager = new DefaultNettyCamelStateCorrelationManager();
+        }
+
         if (configuration.getWorkerGroup() == null) {
             // create new pool which we should shutdown when stopping as its not shared
             workerGroup = new NettyWorkerPoolBuilder()
@@ -301,7 +310,7 @@ public class NettyProducer extends DefaultAsyncProducer {
         }
 
         // setup state as attachment on the channel, so we can access the state later when needed
-        putState(channel, new NettyCamelState(producerCallback, exchange));
+        correlationManager.putState(channel, new NettyCamelState(producerCallback, exchange));
         // here we need to setup the remote address information here
         InetSocketAddress remoteAddress = null;
         if (!isTcp()) {
@@ -372,28 +381,6 @@ public class NettyProducer extends DefaultAsyncProducer {
         return body;
     }
 
-    /**
-     * To get the {@link NettyCamelState} from the given channel.
-     */
-    public NettyCamelState getState(Channel channel) {
-        return nettyCamelStatesMap.get(channel);
-    }
-
-    /**
-     * To remove the {@link NettyCamelState} stored on the channel,
-     * when no longer needed
-     */
-    public void removeState(Channel channel) {
-        nettyCamelStatesMap.remove(channel);
-    }
-
-    /**
-     * Put the {@link NettyCamelState} into the map use the given channel as the key
-     */
-    public void putState(Channel channel, NettyCamelState state) {
-        nettyCamelStatesMap.put(channel, state);
-    }
-
     protected EventLoopGroup getWorkerGroup() {
         // prefer using explicit configured thread pools
         EventLoopGroup wg = configuration.getWorkerGroup();
@@ -420,7 +407,7 @@ public class NettyProducer extends DefaultAsyncProducer {
             clientBootstrap.option(ChannelOption.SO_REUSEADDR, configuration.isReuseAddress());
             clientBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.getConnectTimeout());
 
-            //TODO need to check it later
+            //TODO need to check it later;
             // set any additional netty options
             /*
             if (configuration.getOptions() != null) {
@@ -526,7 +513,6 @@ public class NettyProducer extends DefaultAsyncProducer {
         this.configuration = configuration;
     }
 
-
     public ChannelGroup getAllChannels() {
         return allChannels;
     }
@@ -656,8 +642,7 @@ public class NettyProducer extends DefaultAsyncProducer {
     }
 
     /**
-     * This class is used to release body in case when some error occured and body was not handed over
-     * to netty
+     * This class is used to release body in case when some error occurred and body was not handed over to netty
      */
     private static final class BodyReleaseCallback implements AsyncCallback {
         private volatile Object body;
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index b9a2a17..77f498e 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -75,8 +75,9 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
             LOG.debug("Closing channel as an exception was thrown from Netty", cause);
         }
 
-        Exchange exchange = getExchange(ctx);
-        AsyncCallback callback = getAsyncCallback(ctx);
+        NettyCamelState state = getState(ctx, cause);
+        Exchange exchange = state != null ? state.getExchange() : null;
+        AsyncCallback callback = state != null ? state.getCallback() : null;
 
         // the state may not be set
         if (exchange != null && callback != null) {
@@ -102,35 +103,38 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
             LOG.trace("Channel closed: {}", ctx.channel());
         }
 
-        Exchange exchange = getExchange(ctx);
-        AsyncCallback callback = getAsyncCallback(ctx);
+        NettyCamelState state = getState(ctx, null);
+        Exchange exchange = state != null ? state.getExchange() : null;
+        AsyncCallback callback = state != null ? state.getCallback() : null;
 
         // remove state
-        producer.removeState(ctx.channel());
+        producer.getCorrelationManager().removeState(ctx, ctx.channel());
 
         // to keep track of open sockets
         producer.getAllChannels().remove(ctx.channel());
 
-        // this channel is maybe closing graceful and the exchange is already done
-        // and if so we should not trigger an exception
-        boolean doneUoW = exchange.getUnitOfWork() == null;
-
-        NettyConfiguration configuration = producer.getConfiguration();
-        if (configuration.isSync() && !doneUoW && !messageReceived && !exceptionHandled) {
-            // To avoid call the callback.done twice
-            exceptionHandled = true;
-            // session was closed but no message received. This could be because the remote server had an internal error
-            // and could not return a response. We should count down to stop waiting for a response            
-            String address = configuration != null ? configuration.getAddress() : "";
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Channel closed but no message received from address: {}", address);
-            }
-            // don't fail the exchange if we actually specify to disconnect
-            if (!configuration.isDisconnect()) {
-                exchange.setException(new CamelExchangeException("No response received from remote server: " + address, exchange));
+        if (exchange != null) {
+            // this channel is maybe closing graceful and the exchange is already done
+            // and if so we should not trigger an exception
+            boolean doneUoW = exchange.getUnitOfWork() == null;
+
+            NettyConfiguration configuration = producer.getConfiguration();
+            if (configuration.isSync() && !doneUoW && !messageReceived && !exceptionHandled) {
+                // To avoid call the callback.done twice
+                exceptionHandled = true;
+                // session was closed but no message received. This could be because the remote server had an internal error
+                // and could not return a response. We should count down to stop waiting for a response
+                String address = configuration.getAddress();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Channel closed but no message received from address: {}", address);
+                }
+                // don't fail the exchange if we actually specify to disconnect
+                if (!configuration.isDisconnect()) {
+                    exchange.setException(new CamelExchangeException("No response received from remote server: " + address, exchange));
+                }
+                // signal callback
+                callback.done(false);
             }
-            // signal callback
-            callback.done(false);
         }
         
         // make sure the event can be processed by other handlers
@@ -151,12 +155,13 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
             ctx.pipeline().remove(handler);
         }
 
-        Exchange exchange = getExchange(ctx);
+        NettyCamelState state = getState(ctx, msg);
+        Exchange exchange = state != null ? state.getExchange() : null;
         if (exchange == null) {
             // we just ignore the received message as the channel is closed
             return;
         }
-        AsyncCallback callback = getAsyncCallback(ctx);
+        AsyncCallback callback = state.getCallback();
 
         Message message;
         try {
@@ -246,14 +251,12 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
         }
     }
 
-    private Exchange getExchange(ChannelHandlerContext ctx) {
-        NettyCamelState state = producer.getState(ctx.channel());
-        return state != null ? state.getExchange() : null;
+    private NettyCamelState getState(ChannelHandlerContext ctx, Object msg) {
+        return producer.getCorrelationManager().getState(ctx, ctx.channel(), msg);
     }
 
-    private AsyncCallback getAsyncCallback(ChannelHandlerContext ctx) {
-        NettyCamelState state = producer.getState(ctx.channel());
-        return state != null ? state.getCallback() : null;
+    private NettyCamelState getState(ChannelHandlerContext ctx, Throwable cause) {
+        return producer.getCorrelationManager().getState(ctx, ctx.channel(), cause);
     }
 
 }
diff --git a/platforms/spring-boot/components-starter/camel-netty4-starter/src/main/java/org/apache/camel/component/netty4/springboot/NettyComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-netty4-starter/src/main/java/org/apache/camel/component/netty4/springboot/NettyComponentConfiguration.java
index 35704d8..0125ee4 100644
--- a/platforms/spring-boot/components-starter/camel-netty4-starter/src/main/java/org/apache/camel/component/netty4/springboot/NettyComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-netty4-starter/src/main/java/org/apache/camel/component/netty4/springboot/NettyComponentConfiguration.java
@@ -27,6 +27,7 @@ import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.EventExecutorGroup;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.component.netty4.ClientInitializerFactory;
+import org.apache.camel.component.netty4.NettyCamelStateCorrelationManager;
 import org.apache.camel.component.netty4.NettyComponent;
 import org.apache.camel.component.netty4.NettyServerBootstrapFactory;
 import org.apache.camel.component.netty4.ServerInitializerFactory;
@@ -314,6 +315,18 @@ public class NettyComponentConfiguration
          */
         private Boolean reuseChannel = false;
         /**
+         * To use a custom correlation manager to manage how request and reply
+         * messages are mapped when using request/reply with the netty producer.
+         * This should only be used if you have a way to map requests together
+         * with replies such as if there is correlation ids in both the request
+         * and reply messages. This can be used if you want to multiplex
+         * concurrent messages on the same channel (aka connection) in netty.
+         * When doing this you must have a way to correlate the request and
+         * reply messages so you can store the right reply on the inflight Camel
+         * Exchange before its continued routed.
+         */
+        private NettyCamelStateCorrelationManager correlationManager;
+        /**
          * The protocol to use which can be tcp or udp.
          */
         private String protocol;
@@ -781,6 +794,15 @@ public class NettyComponentConfiguration
             this.reuseChannel = reuseChannel;
         }
 
+        public NettyCamelStateCorrelationManager getCorrelationManager() {
+            return correlationManager;
+        }
+
+        public void setCorrelationManager(
+                NettyCamelStateCorrelationManager correlationManager) {
+            this.correlationManager = correlationManager;
+        }
+
         public String getProtocol() {
             return protocol;
         }

-- 
To stop receiving notification emails like this one, please contact
davsclaus@apache.org.

[camel] 02/02: CAMEL-12427: camel-netty4 - Add SPI to plugin custom correlation state for request/reply in producer

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 71f5d667ed7c9e5dd316c81dd19d159766c50c0f
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 9 17:53:08 2018 +0200

    CAMEL-12427: camel-netty4 - Add SPI to plugin custom correlation state for request/reply in producer
---
 .../netty4/NettyCustomCorrelationManagerTest.java  | 122 +++++++++++++++++++++
 1 file changed, 122 insertions(+)

diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCustomCorrelationManagerTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCustomCorrelationManagerTest.java
new file mode 100644
index 0000000..35b2d6b
--- /dev/null
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCustomCorrelationManagerTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+public class NettyCustomCorrelationManagerTest extends BaseNettyTest {
+
+    private final MyCorrelationManager myManager = new MyCorrelationManager();
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myManager", myManager);
+        return jndi;
+    }
+
+    @Test
+    public void testCustomCorrelationManager() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        // the messages can be processed in any order
+        mock.expectedBodiesReceivedInAnyOrder("Bye A", "Bye B", "Bye C");
+        // the custom manager should be used
+        mock.allMessages().header("manager").isEqualTo(myManager);
+        // check that the request and reply are correlated correctly
+        mock.allMessages().predicate(exchange -> {
+            String request = exchange.getMessage().getHeader("request", String.class);
+            String reply = exchange.getMessage().getBody(String.class);
+            return reply.endsWith(request);
+        });
+
+        template.sendBodyAndHeader("seda:start", "A", "request", "A");
+        template.sendBodyAndHeader("seda:start", "B", "request", "B");
+        template.sendBodyAndHeader("seda:start", "C", "request", "C");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start")
+                    .log("before ${body}")
+                    .to("netty4:tcp://localhost:{{port}}?textline=true&sync=true&correlationManager=#myManager")
+                    .log("after ${body}")
+                    .to("mock:result");
+
+                from("netty4:tcp://localhost:{{port}}?textline=true&sync=true")
+                    .transform(body().prepend("Bye "));
+            }
+        };
+    }
+
+    private static final class MyCorrelationManager implements NettyCamelStateCorrelationManager {
+
+        private volatile NettyCamelState stateA;
+        private volatile NettyCamelState stateB;
+        private volatile NettyCamelState stateC;
+
+        @Override
+        public void putState(Channel channel, NettyCamelState state) {
+            String body = state.getExchange().getMessage().getBody(String.class);
+            if ("A".equals(body)) {
+                stateA = state;
+            } else if ("B".equals(body)) {
+                stateB = state;
+            } else if ("C".equals(body)) {
+                stateC = state;
+            }
+        }
+
+        @Override
+        public void removeState(ChannelHandlerContext ctx, Channel channel) {
+            // noop
+        }
+
+        @Override
+        public NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Object msg) {
+            String body = msg.toString();
+            if (body.endsWith("A")) {
+                stateA.getExchange().getMessage().setHeader("manager", this);
+                return stateA;
+            } else if (body.endsWith("B")) {
+                stateB.getExchange().getMessage().setHeader("manager", this);
+                return stateB;
+            } else if (body.endsWith("C")) {
+                stateC.getExchange().getMessage().setHeader("manager", this);
+                return stateC;
+            }
+            return null;
+        }
+
+        @Override
+        public NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Throwable cause) {
+            // noop
+            return null;
+        }
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
davsclaus@apache.org.