You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/04/12 13:23:29 UTC

[camel] 02/02: CAMEL-12438: camel-netty4 - Add timeout support for SPI correlation manager

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ac3466a867836fd774e911fb0f2583685092af83
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Apr 12 15:06:49 2018 +0200

    CAMEL-12438: camel-netty4 - Add timeout support for SPI correlation manager
---
 .../src/main/docs/netty4-component.adoc            |   5 +-
 .../camel/component/netty4/NettyConfiguration.java |   3 +
 .../camel/component/netty4/NettyProducer.java      |   8 +
 .../netty4/TimeoutCorrelationManagerSupport.java   | 243 +++++++++++++++++++++
 .../springboot/NettyComponentConfiguration.java    |   5 +
 5 files changed, 263 insertions(+), 1 deletion(-)

diff --git a/components/camel-netty4/src/main/docs/netty4-component.adoc b/components/camel-netty4/src/main/docs/netty4-component.adoc
index f66a582..28d961e 100644
--- a/components/camel-netty4/src/main/docs/netty4-component.adoc
+++ b/components/camel-netty4/src/main/docs/netty4-component.adoc
@@ -120,7 +120,7 @@ with the following path and query parameters:
 | *connectTimeout* (producer) | Time to wait for a socket connection to be available. Value is in millis. | 10000 | int
 | *requestTimeout* (producer) | Allows to use a timeout for the Netty producer when calling a remote server. By default no timeout is in use. The value is in milli seconds, so eg 30000 is 30 seconds. The requestTimeout is using Netty's ReadTimeoutHandler to trigger the timeout. |  | long
 | *clientInitializerFactory* (producer) | To use a custom ClientInitializerFactory |  | ClientInitializer Factory
-| *correlationManager* (producer) | To use a custom correlation manager to manage how request and reply messages are mapped when using request/reply with the netty producer. This should only be used if you have a way to map requests together with replies such as if there is correlation ids in both the request and reply messages. This can be used if you want to multiplex concurrent messages on the same channel (aka connection) in netty. When doing this you must have a way to correlate the [...]
+| *correlationManager* (producer) | To use a custom correlation manager to manage how request and reply messages are mapped when using request/reply with the netty producer. This should only be used if you have a way to map requests together with replies such as if there is correlation ids in both the request and reply messages. This can be used if you want to multiplex concurrent messages on the same channel (aka connection) in netty. When doing this you must have a way to correlate the [...]
 | *lazyChannelCreation* (producer) | Channels can be lazily created to avoid exceptions, if the remote server is not up and running when the Camel producer is started. | true | boolean
 | *producerPoolEnabled* (producer) | Whether producer pool is enabled or not. Important: If you turn this off then a single shared connection is used for the producer, also if you are doing request/reply. That means there is a potential issue with interleaved responses if replies comes back out-of-order. Therefore you need to have a correlation id in both the request and reply messages so you can properly correlate the replies to the Camel callback that is responsible for continue proces [...]
 | *producerPoolMaxActive* (producer) | Sets the cap on the number of objects that can be allocated by the pool (checked out to clients, or idle awaiting checkout) at a given time. Use a negative value for no limit. | -1 | int
@@ -683,6 +683,9 @@ correlate the replies to the Camel callback that is responsible for continue pro
 To do this you need to implement `NettyCamelStateCorrelationManager` as correlation manager and configure
 it via the `correlationManager=#myManager` option.
 
+NOTE: We recommend extending the `TimeoutCorrelationManagerSupport` when you build custom correlation managers.
+This provides support for timeout and other complexities you otherwise would need to implement as well.
+
 === See Also
 
 * <<netty-http-component,Netty HTTP>>
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
index a2086ef..32cf5dc 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
@@ -675,6 +675,9 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
      * this you must have a way to correlate the request and reply messages so you can store the right reply on the inflight Camel Exchange before
      * its continued routed.
      * <p/>
+     * We recommend extending the {@link TimeoutCorrelationManagerSupport} when you build custom correlation managers.
+     * This provides support for timeout and other complexities you otherwise would need to implement as well.
+     * <p/>
      * See also the <tt>producerPoolEnabled</tt> option for more details.
      */
     public void setCorrelationManager(NettyCamelStateCorrelationManager correlationManager) {
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
index 0da5508..025d34b 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
@@ -40,6 +40,7 @@ import io.netty.util.ReferenceCountUtil;
 import io.netty.util.concurrent.ImmediateEventExecutor;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultAsyncProducer;
@@ -47,6 +48,7 @@ import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.CamelLogger;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.pool.ObjectPool;
 import org.apache.commons.pool.PoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericObjectPool;
@@ -102,6 +104,10 @@ public class NettyProducer extends DefaultAsyncProducer {
         } else {
             correlationManager = new DefaultNettyCamelStateCorrelationManager();
         }
+        if (correlationManager instanceof CamelContextAware) {
+            ((CamelContextAware) correlationManager).setCamelContext(getContext());
+        }
+        ServiceHelper.startService(correlationManager);
 
         if (configuration.getWorkerGroup() == null) {
             // create new pool which we should shutdown when stopping as its not shared
@@ -183,6 +189,8 @@ public class NettyProducer extends DefaultAsyncProducer {
             pool = null;
         }
 
+        ServiceHelper.stopService(correlationManager);
+
         super.doStop();
     }
 
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/TimeoutCorrelationManagerSupport.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/TimeoutCorrelationManagerSupport.java
new file mode 100644
index 0000000..eb58e51
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/TimeoutCorrelationManagerSupport.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.TimeoutMap;
+import org.apache.camel.support.DefaultTimeoutMap;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A base class for using {@link NettyCamelStateCorrelationManager} that supports timeout.
+ */
+public abstract class TimeoutCorrelationManagerSupport extends ServiceSupport implements CamelContextAware, NettyCamelStateCorrelationManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TimeoutCorrelationManagerSupport.class);
+
+    private volatile ScheduledExecutorService scheduledExecutorService;
+    private volatile boolean stopScheduledExecutorService;
+    private volatile ExecutorService workerPool;
+    private volatile boolean stopWorkerPool;
+    private volatile TimeoutMap<String, NettyCamelState> map;
+    private volatile CamelLogger timeoutLogger;
+
+    private CamelContext camelContext;
+    private long timeout = 30000;
+    private long timeoutChecker = 1000;
+    private LoggingLevel timeoutLoggingLevel = LoggingLevel.DEBUG;
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * Sets timeout value in millis seconds. The default value is 30000 (30 seconds).
+     */
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    public long getTimeoutChecker() {
+        return timeoutChecker;
+    }
+
+    /**
+     * Time in millis how frequent to check for timeouts. Set this to a lower value if you want
+     * to react faster upon timeouts. The default value is 1000.
+     */
+    public void setTimeoutChecker(long timeoutChecker) {
+        this.timeoutChecker = timeoutChecker;
+    }
+
+    public LoggingLevel getTimeoutLoggingLevel() {
+        return timeoutLoggingLevel;
+    }
+
+    /**
+     * Sets the logging level to use when a timeout was hit.
+     */
+    public void setTimeoutLoggingLevel(LoggingLevel timeoutLoggingLevel) {
+        this.timeoutLoggingLevel = timeoutLoggingLevel;
+    }
+
+    public ExecutorService getWorkerPool() {
+        return workerPool;
+    }
+
+    /**
+     * To use a shared worker pool for processing timed out requests.
+     */
+    public void setWorkerPool(ExecutorService workerPool) {
+        this.workerPool = workerPool;
+    }
+
+    /**
+     * Implement this method to extract the correaltion id from the request message body.
+     */
+    public abstract String getRequestCorrelationId(Object request);
+
+    /**
+     * Implement this method to extract the correaltion id from the response message body.
+     */
+    public abstract String getResponseCorrelationId(Object response);
+
+    /**
+     * Override this to implement a custom timeout response message.
+     *
+     * @param correlationId  the correlation id
+     * @param request        the request message
+     * @return the response message or <tt>null</tt> to use an {@link ExchangeTimedOutException} exception.
+     */
+    public String getTimeoutResponse(String correlationId, Object request) {
+        return null;
+    }
+
+    @Override
+    public void putState(Channel channel, NettyCamelState state) {
+        // grab the correlation id
+        Object body = state.getExchange().getMessage().getBody();
+        // the correlation id is the first part of the message
+        String cid = getRequestCorrelationId(body);
+        if (ObjectHelper.isEmpty(cid)) {
+            throw new IllegalArgumentException("CorrelationID is missing");
+        }
+        LOG.debug("putState({}) on channel: {}", cid, channel.id());
+        map.put(cid, state, timeout);
+    }
+
+    @Override
+    public void removeState(ChannelHandlerContext ctx, Channel channel) {
+        // noop
+    }
+
+    @Override
+    public NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Object msg) {
+        String cid = getResponseCorrelationId(msg);
+        if (ObjectHelper.isEmpty(cid)) {
+            LOG.warn("CorrelationID is missing from response message.");
+            return null;
+        }
+        LOG.debug("getState({}) on channel: {}", cid, channel.id());
+        // lets remove after use as its no longer needed
+        return map.remove(cid);
+    }
+
+    @Override
+    public NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Throwable cause) {
+        // noop
+        return null;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(camelContext, "CamelContext", this);
+
+        timeoutLogger = new CamelLogger(LOG, timeoutLoggingLevel);
+
+        if (scheduledExecutorService == null) {
+            scheduledExecutorService = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "NettyTimeoutCorrelationManager");
+        }
+        if (workerPool == null) {
+            workerPool = camelContext.getExecutorServiceManager().newDefaultThreadPool(this, "NettyTimeoutWorkerPool");
+        }
+
+        map = new NettyStateTimeoutMap(scheduledExecutorService);
+
+        ServiceHelper.startService(map);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(map);
+
+        if (scheduledExecutorService != null && stopScheduledExecutorService) {
+            camelContext.getExecutorServiceManager().shutdown(scheduledExecutorService);
+            scheduledExecutorService = null;
+        }
+        if (workerPool != null && stopWorkerPool) {
+            camelContext.getExecutorServiceManager().shutdown(workerPool);
+            workerPool = null;
+        }
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownService(map);
+
+        if (scheduledExecutorService != null && stopScheduledExecutorService) {
+            camelContext.getExecutorServiceManager().shutdown(scheduledExecutorService);
+            scheduledExecutorService = null;
+        }
+        if (workerPool != null && stopWorkerPool) {
+            camelContext.getExecutorServiceManager().shutdown(workerPool);
+            workerPool = null;
+        }
+    }
+
+    private final class NettyStateTimeoutMap extends DefaultTimeoutMap<String, NettyCamelState> {
+
+        NettyStateTimeoutMap(ScheduledExecutorService executor) {
+            super(executor, timeoutChecker);
+        }
+
+        @Override
+        public boolean onEviction(String key, NettyCamelState value) {
+            timeoutLogger.log("Timeout of correlation id: " + key);
+
+            workerPool.submit(() -> {
+                Exchange exchange = value.getExchange();
+                AsyncCallback callback = value.getCallback();
+                if (exchange != null && callback != null) {
+                    Object timeoutBody = getTimeoutResponse(key, exchange.getMessage().getBody());
+                    if (timeoutBody != null) {
+                        exchange.getMessage().setBody(timeoutBody);
+                    } else {
+                        exchange.setException(new ExchangeTimedOutException(exchange, timeout));
+                    }
+                    callback.done(false);
+                }
+            });
+
+            return true;
+        }
+    }
+}
diff --git a/platforms/spring-boot/components-starter/camel-netty4-starter/src/main/java/org/apache/camel/component/netty4/springboot/NettyComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-netty4-starter/src/main/java/org/apache/camel/component/netty4/springboot/NettyComponentConfiguration.java
index 99d0c33..ca91306 100644
--- a/platforms/spring-boot/components-starter/camel-netty4-starter/src/main/java/org/apache/camel/component/netty4/springboot/NettyComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-netty4-starter/src/main/java/org/apache/camel/component/netty4/springboot/NettyComponentConfiguration.java
@@ -334,6 +334,11 @@ public class NettyComponentConfiguration
          * reply messages so you can store the right reply on the inflight Camel
          * Exchange before its continued routed.
          * <p/>
+         * We recommend extending the {@link TimeoutCorrelationManagerSupport}
+         * when you build custom correlation managers. This provides support for
+         * timeout and other complexities you otherwise would need to implement
+         * as well.
+         * <p/>
          * See also the <tt>producerPoolEnabled</tt> option for more details.
          */
         private NettyCamelStateCorrelationManager correlationManager;

-- 
To stop receiving notification emails like this one, please contact
davsclaus@apache.org.