You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/05/06 17:01:35 UTC

svn commit: r941765 - in /camel/trunk/components/camel-netty/src: main/java/org/apache/camel/component/netty/ main/java/org/apache/camel/component/netty/handlers/ test/java/org/apache/camel/component/netty/

Author: davsclaus
Date: Thu May  6 15:01:34 2010
New Revision: 941765

URL: http://svn.apache.org/viewvc?rev=941765&view=rev
Log:
Aligned timeout options with camel-mina. Fixed copy of configuration. 

Added:
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java   (with props)
Modified:
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java?rev=941765&r1=941764&r2=941765&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java Thu May  6 15:01:34 2010
@@ -24,23 +24,36 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.impl.DefaultComponent;
 
 public class NettyComponent extends DefaultComponent {
-    private NettyConfiguration config;
+    private NettyConfiguration configuration;
 
     public NettyComponent() {
-        config = new NettyConfiguration();
     }
 
     public NettyComponent(CamelContext context) {
         super(context);
-        config = new NettyConfiguration();
     }
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        NettyConfiguration config;
+        if (configuration != null) {
+            config = configuration.copy();
+        } else {
+            config = new NettyConfiguration();
+        }
+
         config.parseURI(new URI(remaining), parameters, this);
         
         NettyEndpoint nettyEndpoint = new NettyEndpoint(remaining, this, config);
         setProperties(nettyEndpoint.getConfiguration(), parameters);
         return nettyEndpoint;
     }
+
+    public NettyConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(NettyConfiguration configuration) {
+        this.configuration = configuration;
+    }
 }
\ No newline at end of file

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=941765&r1=941764&r2=941765&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Thu May  6 15:01:34 2010
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.util.URISupport;
 import org.jboss.netty.channel.ChannelDownstreamHandler;
 import org.jboss.netty.channel.ChannelHandler;
@@ -32,16 +33,16 @@ import org.jboss.netty.handler.codec.ser
 import org.jboss.netty.handler.ssl.SslHandler;
 
 @SuppressWarnings("unchecked")
-public class NettyConfiguration {
+public class NettyConfiguration implements Cloneable {
     private String protocol;
     private String host;
     private int port;
-    private boolean keepAlive;
-    private boolean tcpNoDelay;
+    private boolean keepAlive = true;
+    private boolean tcpNoDelay = true;
     private boolean broadcast;
-    private long connectTimeoutMillis;
-    private long receiveTimeoutMillis;
-    private boolean reuseAddress;
+    private long connectTimeout = 10000;
+    private long timeout = 30000;
+    private boolean reuseAddress = true;
     private boolean sync = true;
     private String passphrase;
     private File keyStoreFile;
@@ -51,30 +52,31 @@ public class NettyConfiguration {
     private List<ChannelUpstreamHandler> decoders = new ArrayList<ChannelUpstreamHandler>();
     private ChannelHandler handler;
     private boolean ssl;
-    private long sendBufferSize;
-    private long receiveBufferSize;
-    private int corePoolSize;
-    private int maxPoolSize;
+    private long sendBufferSize = 65536;
+    private long receiveBufferSize = 65536;
+    private int corePoolSize = 10;
+    private int maxPoolSize = 100;
     private String keyStoreFormat;
     private String securityProvider;
     private boolean disconnect;
     private boolean lazyChannelCreation = true;
     private boolean transferExchange;
 
-    public NettyConfiguration() {
-        setKeepAlive(true);
-        setTcpNoDelay(true);
-        setBroadcast(false);
-        setReuseAddress(true);
-        setSync(true);
-        setConnectTimeoutMillis(10000);
-        setReceiveTimeoutMillis(10000);
-        setSendBufferSize(65536);
-        setReceiveBufferSize(65536);
-        setSsl(false);
-        setCorePoolSize(10);
-        setMaxPoolSize(100);
-        setLazyChannelCreation(true);
+    /**
+     * Returns a copy of this configuration
+     */
+    public NettyConfiguration copy() {
+        try {
+            NettyConfiguration answer = (NettyConfiguration) clone();
+            // make sure the lists is copied in its own instance
+            List<ChannelDownstreamHandler> encodersCopy = new ArrayList<ChannelDownstreamHandler>(encoders);
+            answer.setEncoders(encodersCopy);
+            List<ChannelUpstreamHandler> decodersCopy = new ArrayList<ChannelUpstreamHandler>(decoders);
+            answer.setDecoders(decodersCopy);
+            return answer;
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
     }
 
     public void parseURI(URI uri, Map<String, Object> parameters, NettyComponent component) throws Exception {
@@ -120,13 +122,13 @@ public class NettyConfiguration {
             setReuseAddress(Boolean.valueOf((String) settings.get("reuseAddress")));
         }
         if (settings.containsKey("connectTimeoutMillis")) {
-            setConnectTimeoutMillis(Long.valueOf((String) settings.get("connectTimeoutMillis")));
+            setConnectTimeout(Long.valueOf((String) settings.get("connectTimeoutMillis")));
         }
         if (settings.containsKey("sync")) {
             setTcpNoDelay(Boolean.valueOf((String) settings.get("sync")));
         }
         if (settings.containsKey("receiveTimeoutMillis")) {
-            setReceiveTimeoutMillis(Long.valueOf((String) settings.get("receiveTimeoutMillis")));
+            setTimeout(Long.valueOf((String) settings.get("receiveTimeoutMillis")));
         }
         if (settings.containsKey("sendBufferSize")) {
             setSendBufferSize(Long.valueOf((String) settings.get("sendBufferSize")));
@@ -202,12 +204,12 @@ public class NettyConfiguration {
         this.broadcast = broadcast;
     }
 
-    public long getConnectTimeoutMillis() {
-        return connectTimeoutMillis;
+    public long getConnectTimeout() {
+        return connectTimeout;
     }
 
-    public void setConnectTimeoutMillis(long connectTimeoutMillis) {
-        this.connectTimeoutMillis = connectTimeoutMillis;
+    public void setConnectTimeout(long connectTimeout) {
+        this.connectTimeout = connectTimeout;
     }
 
     public boolean isReuseAddress() {
@@ -278,12 +280,12 @@ public class NettyConfiguration {
         this.handler = handler;
     }
 
-    public long getReceiveTimeoutMillis() {
-        return receiveTimeoutMillis;
+    public long getTimeout() {
+        return timeout;
     }
 
-    public void setReceiveTimeoutMillis(long receiveTimeoutMillis) {
-        this.receiveTimeoutMillis = receiveTimeoutMillis;
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
     }
 
     public long getSendBufferSize() {

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=941765&r1=941764&r2=941765&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Thu May  6 15:01:34 2010
@@ -144,7 +144,7 @@ public class NettyConsumer extends Defau
         serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
         serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
         serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
-        serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+        serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
 
         channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
         // to keep track of all channels in use
@@ -161,7 +161,7 @@ public class NettyConsumer extends Defau
         connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
         connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
         connectionlessServerBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
-        connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+        connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
         connectionlessServerBootstrap.setOption("child.broadcast", configuration.isBroadcast());
         connectionlessServerBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
         connectionlessServerBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=941765&r1=941764&r2=941765&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu May  6 15:01:34 2010
@@ -134,9 +134,9 @@ public class NettyProducer extends Defau
         NettyHelper.writeBody(channel, null, body, exchange);
 
         if (configuration.isSync()) {
-            boolean success = countdownLatch.await(configuration.getReceiveTimeoutMillis(), TimeUnit.MILLISECONDS);
+            boolean success = countdownLatch.await(configuration.getTimeout(), TimeUnit.MILLISECONDS);
             if (!success) {
-                throw new ExchangeTimedOutException(exchange, configuration.getReceiveTimeoutMillis());
+                throw new ExchangeTimedOutException(exchange, configuration.getTimeout());
             }
 
             ClientChannelHandler handler = (ClientChannelHandler) clientPipeline.get("handler");
@@ -189,7 +189,7 @@ public class NettyProducer extends Defau
             clientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
             clientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
             clientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
-            clientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+            clientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
         }
         if (clientPipelineFactory == null) {
             clientPipelineFactory = new ClientPipelineFactory(this);
@@ -209,7 +209,7 @@ public class NettyProducer extends Defau
             connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
             connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
             connectionlessClientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
-            connectionlessClientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+            connectionlessClientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
             connectionlessClientBootstrap.setOption("child.broadcast", configuration.isBroadcast());
             connectionlessClientBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
             connectionlessClientBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=941765&r1=941764&r2=941765&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Thu May  6 15:01:34 2010
@@ -50,15 +50,16 @@ public class ServerChannelHandler extend
     }
 
     @Override
+    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        LOG.debug("Channel closed: " + e.getChannel());
+    }
+
+    @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
-        }
+        LOG.warn("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
+
         // close channel in case an exception was thrown
         NettyHelper.close(exceptionEvent.getChannel());
-
-        // must wrap and rethrow since cause can be of Throwable and we must only throw Exception
-        throw new CamelException(exceptionEvent.getCause());
     }
     
     @Override

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java?rev=941765&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java Thu May  6 15:01:34 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyComponentWithConfigurationTest extends CamelTestSupport {
+
+    @Test
+    public void testMinaComponentWithConfiguration() throws Exception {
+        NettyComponent comp = context.getComponent("netty", NettyComponent.class);
+
+        NettyConfiguration cfg = new NettyConfiguration();
+        cfg.setTimeout(15000);
+
+        comp.setConfiguration(cfg);
+        assertSame(cfg, comp.getConfiguration());
+
+        NettyEndpoint e1 = (NettyEndpoint) comp.createEndpoint("netty://tcp://localhost:4455");
+        NettyEndpoint e2 = (NettyEndpoint) comp.createEndpoint("netty://tcp://localhost:5566?sync=false");
+
+        // should not be same
+        assertNotSame(e1, e2);
+        assertNotSame(e1.getConfiguration(), e2.getConfiguration());
+
+        e2.getConfiguration().setPort(5566);
+
+        assertEquals(true, e1.getConfiguration().isSync());
+        assertEquals(false, e2.getConfiguration().isSync());
+        assertEquals(15000, e1.getConfiguration().getTimeout());
+        assertEquals(15000, e2.getConfiguration().getTimeout());
+        assertEquals(4455, e1.getConfiguration().getPort());
+        assertEquals(5566, e2.getConfiguration().getPort());
+    }
+
+}

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java?rev=941765&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java Thu May  6 15:01:34 2010
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyExchangeTimeoutTest extends CamelTestSupport {
+
+    private static final int PORT = 6336;
+    protected String uri = "netty:tcp://localhost:" + PORT;
+
+    @Test
+    public void testUsingTimeoutParameter() throws Exception {
+        // use a timeout value of 2 seconds (timeout is in millis) so we should actually get a response in this test
+        Endpoint endpoint = this.context.getEndpoint("netty:tcp://localhost:" + PORT + "?timeout=2000");
+        Producer producer = endpoint.createProducer();
+        producer.start();
+        Exchange exchange = producer.createExchange();
+        exchange.getIn().setBody("Hello World");
+        try {
+            producer.process(exchange);
+            fail("Should have thrown an ExchangeTimedOutException wrapped in a RuntimeCamelException");
+        } catch (Exception e) {
+            assertTrue("Should have thrown an ExchangeTimedOutException", e instanceof ExchangeTimedOutException);
+        }
+        producer.stop();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from(uri).process(new Processor() {
+                    public void process(Exchange e) throws Exception {
+                        assertEquals("Hello World", e.getIn().getBody(String.class));
+                        // MinaProducer has a default timeout of 30 seconds so we just wait 5 seconds
+                        // (template.requestBody is a MinaProducer behind the doors)
+                        Thread.sleep(5000);
+
+                        e.getOut().setBody("Okay I will be faster in the future");
+                    }
+                });
+            }
+        };
+    }
+
+}

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date