You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/05/13 09:22:25 UTC

[5/6] camel git commit: CAMEL-8771 Add MaxChannelMemorySize and MaxTotalMemorySize for OrderedMemoryAwareThreadPoolExecutor

CAMEL-8771 Add MaxChannelMemorySize and MaxTotalMemorySize for OrderedMemoryAwareThreadPoolExecutor

Conflicts:
	components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f21c65dd
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f21c65dd
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f21c65dd

Branch: refs/heads/camel-2.15.x
Commit: f21c65dd1c28e65fe9724282db94b759b3501037
Parents: bd701e0
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed May 13 14:52:21 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed May 13 15:15:54 2015 +0800

----------------------------------------------------------------------
 .../camel/component/netty/NettyComponent.java   |  7 +++++-
 .../component/netty/NettyConfiguration.java     | 26 +++++++++++++++++---
 2 files changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f21c65dd/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
index ac622d0..e1be1c2 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
@@ -138,10 +138,15 @@ public class NettyComponent extends UriEndpointComponent {
         // replies in the expected order. eg this is required by TCP.
         // and use a Camel thread factory so we have consistent thread namings
         // we should use a shared thread pool as recommended by Netty
+        
+        // NOTE: if we don't specify the MaxChannelMemorySize and MaxTotalMemorySize, the thread pool
+        // could eat up all the heap memory when the tasks are added very fast
+        
         String pattern = getCamelContext().getExecutorServiceManager().getThreadNamePattern();
         ThreadFactory factory = new CamelThreadFactory(pattern, "NettyOrderedWorker", true);
         return new OrderedMemoryAwareThreadPoolExecutor(getMaximumPoolSize(),
-                0L, 0L, 30, TimeUnit.SECONDS, factory);
+                configuration.getMaxChannelMemorySize(), configuration.getMaxTotalMemorySize(),
+                30, TimeUnit.SECONDS, factory);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/f21c65dd/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
index 92de7fe..e0129fd 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
@@ -94,6 +94,10 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
     private boolean udpConnectionlessSending;
     @UriParam
     private boolean clientMode;
+    @UriParam(defaultValue = "" + 10 * 1024 * 1024L)
+    private long maxChannelMemorySize = 10 * 1024 * 1024L;
+    @UriParam(defaultValue = "" + 200 * 1024 * 1024L)
+    private long maxTotalMemorySize = 200 * 1024 * 1024L;
 
     /**
      * Returns a copy of this configuration
@@ -201,7 +205,7 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
                     decoders.add(ChannelHandlerFactories.newStringDecoder(charset));
 
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("Using textline encoders and decoders with charset: {}, delimiter: {} and decoderMaxLineLength: {}", 
+                        LOG.debug("Using textline encoders and decoders with charset: {}, delimiter: {} and decoderMaxLineLength: {}",
                                 new Object[]{charset, delimiter, decoderMaxLineLength});
                     }
                 } else {
@@ -449,7 +453,7 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
     public void setProducerPoolEnabled(boolean producerPoolEnabled) {
         this.producerPoolEnabled = producerPoolEnabled;
     }
-    
+
     public boolean isUdpConnectionlessSending() {
         return udpConnectionlessSending;
     }
@@ -457,7 +461,7 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
     public void setUdpConnectionlessSending(boolean udpConnectionlessSending) {
         this.udpConnectionlessSending = udpConnectionlessSending;
     }
-    
+
     public boolean isClientMode() {
         return clientMode;
     }
@@ -466,6 +470,22 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
         this.clientMode = clientMode;
     }
 
+    public long getMaxChannelMemorySize() {
+        return maxChannelMemorySize;
+    }
+
+    public void setMaxChannelMemorySize(long maxChannelMemorySize) {
+        this.maxChannelMemorySize = maxChannelMemorySize;
+    }
+
+    public long getMaxTotalMemorySize() {
+        return maxTotalMemorySize;
+    }
+
+    public void setMaxTotalMemorySize(long maxTotalMemorySize) {
+        this.maxTotalMemorySize = maxTotalMemorySize;
+    }
+
     private static <T> void addToHandlersList(List<T> configured, List<T> handlers, Class<T> handlerType) {
         if (handlers != null) {
             for (T handler : handlers) {