You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/05/06 17:01:35 UTC
svn commit: r941765 - in /camel/trunk/components/camel-netty/src:
main/java/org/apache/camel/component/netty/
main/java/org/apache/camel/component/netty/handlers/
test/java/org/apache/camel/component/netty/
Author: davsclaus
Date: Thu May 6 15:01:34 2010
New Revision: 941765
URL: http://svn.apache.org/viewvc?rev=941765&view=rev
Log:
Aligned timeout options with camel-mina. Fixed copy of configuration.
Added:
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java (with props)
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java (with props)
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java?rev=941765&r1=941764&r2=941765&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java Thu May 6 15:01:34 2010
@@ -24,23 +24,36 @@ import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;
public class NettyComponent extends DefaultComponent {
- private NettyConfiguration config;
+ private NettyConfiguration configuration;
public NettyComponent() {
- config = new NettyConfiguration();
}
public NettyComponent(CamelContext context) {
super(context);
- config = new NettyConfiguration();
}
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ NettyConfiguration config;
+ if (configuration != null) {
+ config = configuration.copy();
+ } else {
+ config = new NettyConfiguration();
+ }
+
config.parseURI(new URI(remaining), parameters, this);
NettyEndpoint nettyEndpoint = new NettyEndpoint(remaining, this, config);
setProperties(nettyEndpoint.getConfiguration(), parameters);
return nettyEndpoint;
}
+
+ public NettyConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(NettyConfiguration configuration) {
+ this.configuration = configuration;
+ }
}
\ No newline at end of file
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=941765&r1=941764&r2=941765&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Thu May 6 15:01:34 2010
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.util.URISupport;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelHandler;
@@ -32,16 +33,16 @@ import org.jboss.netty.handler.codec.ser
import org.jboss.netty.handler.ssl.SslHandler;
@SuppressWarnings("unchecked")
-public class NettyConfiguration {
+public class NettyConfiguration implements Cloneable {
private String protocol;
private String host;
private int port;
- private boolean keepAlive;
- private boolean tcpNoDelay;
+ private boolean keepAlive = true;
+ private boolean tcpNoDelay = true;
private boolean broadcast;
- private long connectTimeoutMillis;
- private long receiveTimeoutMillis;
- private boolean reuseAddress;
+ private long connectTimeout = 10000;
+ private long timeout = 30000;
+ private boolean reuseAddress = true;
private boolean sync = true;
private String passphrase;
private File keyStoreFile;
@@ -51,30 +52,31 @@ public class NettyConfiguration {
private List<ChannelUpstreamHandler> decoders = new ArrayList<ChannelUpstreamHandler>();
private ChannelHandler handler;
private boolean ssl;
- private long sendBufferSize;
- private long receiveBufferSize;
- private int corePoolSize;
- private int maxPoolSize;
+ private long sendBufferSize = 65536;
+ private long receiveBufferSize = 65536;
+ private int corePoolSize = 10;
+ private int maxPoolSize = 100;
private String keyStoreFormat;
private String securityProvider;
private boolean disconnect;
private boolean lazyChannelCreation = true;
private boolean transferExchange;
- public NettyConfiguration() {
- setKeepAlive(true);
- setTcpNoDelay(true);
- setBroadcast(false);
- setReuseAddress(true);
- setSync(true);
- setConnectTimeoutMillis(10000);
- setReceiveTimeoutMillis(10000);
- setSendBufferSize(65536);
- setReceiveBufferSize(65536);
- setSsl(false);
- setCorePoolSize(10);
- setMaxPoolSize(100);
- setLazyChannelCreation(true);
+ /**
+ * Returns a copy of this configuration
+ */
+ public NettyConfiguration copy() {
+ try {
+ NettyConfiguration answer = (NettyConfiguration) clone();
+ // make sure the lists is copied in its own instance
+ List<ChannelDownstreamHandler> encodersCopy = new ArrayList<ChannelDownstreamHandler>(encoders);
+ answer.setEncoders(encodersCopy);
+ List<ChannelUpstreamHandler> decodersCopy = new ArrayList<ChannelUpstreamHandler>(decoders);
+ answer.setDecoders(decodersCopy);
+ return answer;
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeCamelException(e);
+ }
}
public void parseURI(URI uri, Map<String, Object> parameters, NettyComponent component) throws Exception {
@@ -120,13 +122,13 @@ public class NettyConfiguration {
setReuseAddress(Boolean.valueOf((String) settings.get("reuseAddress")));
}
if (settings.containsKey("connectTimeoutMillis")) {
- setConnectTimeoutMillis(Long.valueOf((String) settings.get("connectTimeoutMillis")));
+ setConnectTimeout(Long.valueOf((String) settings.get("connectTimeoutMillis")));
}
if (settings.containsKey("sync")) {
setTcpNoDelay(Boolean.valueOf((String) settings.get("sync")));
}
if (settings.containsKey("receiveTimeoutMillis")) {
- setReceiveTimeoutMillis(Long.valueOf((String) settings.get("receiveTimeoutMillis")));
+ setTimeout(Long.valueOf((String) settings.get("receiveTimeoutMillis")));
}
if (settings.containsKey("sendBufferSize")) {
setSendBufferSize(Long.valueOf((String) settings.get("sendBufferSize")));
@@ -202,12 +204,12 @@ public class NettyConfiguration {
this.broadcast = broadcast;
}
- public long getConnectTimeoutMillis() {
- return connectTimeoutMillis;
+ public long getConnectTimeout() {
+ return connectTimeout;
}
- public void setConnectTimeoutMillis(long connectTimeoutMillis) {
- this.connectTimeoutMillis = connectTimeoutMillis;
+ public void setConnectTimeout(long connectTimeout) {
+ this.connectTimeout = connectTimeout;
}
public boolean isReuseAddress() {
@@ -278,12 +280,12 @@ public class NettyConfiguration {
this.handler = handler;
}
- public long getReceiveTimeoutMillis() {
- return receiveTimeoutMillis;
+ public long getTimeout() {
+ return timeout;
}
- public void setReceiveTimeoutMillis(long receiveTimeoutMillis) {
- this.receiveTimeoutMillis = receiveTimeoutMillis;
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
}
public long getSendBufferSize() {
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=941765&r1=941764&r2=941765&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Thu May 6 15:01:34 2010
@@ -144,7 +144,7 @@ public class NettyConsumer extends Defau
serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
- serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+ serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
// to keep track of all channels in use
@@ -161,7 +161,7 @@ public class NettyConsumer extends Defau
connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
connectionlessServerBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
- connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+ connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
connectionlessServerBootstrap.setOption("child.broadcast", configuration.isBroadcast());
connectionlessServerBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
connectionlessServerBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=941765&r1=941764&r2=941765&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu May 6 15:01:34 2010
@@ -134,9 +134,9 @@ public class NettyProducer extends Defau
NettyHelper.writeBody(channel, null, body, exchange);
if (configuration.isSync()) {
- boolean success = countdownLatch.await(configuration.getReceiveTimeoutMillis(), TimeUnit.MILLISECONDS);
+ boolean success = countdownLatch.await(configuration.getTimeout(), TimeUnit.MILLISECONDS);
if (!success) {
- throw new ExchangeTimedOutException(exchange, configuration.getReceiveTimeoutMillis());
+ throw new ExchangeTimedOutException(exchange, configuration.getTimeout());
}
ClientChannelHandler handler = (ClientChannelHandler) clientPipeline.get("handler");
@@ -189,7 +189,7 @@ public class NettyProducer extends Defau
clientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
clientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
clientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
- clientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+ clientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
}
if (clientPipelineFactory == null) {
clientPipelineFactory = new ClientPipelineFactory(this);
@@ -209,7 +209,7 @@ public class NettyProducer extends Defau
connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
connectionlessClientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
- connectionlessClientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+ connectionlessClientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
connectionlessClientBootstrap.setOption("child.broadcast", configuration.isBroadcast());
connectionlessClientBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
connectionlessClientBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=941765&r1=941764&r2=941765&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Thu May 6 15:01:34 2010
@@ -50,15 +50,16 @@ public class ServerChannelHandler extend
}
@Override
+ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ LOG.debug("Channel closed: " + e.getChannel());
+ }
+
+ @Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
- }
+ LOG.warn("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
+
// close channel in case an exception was thrown
NettyHelper.close(exceptionEvent.getChannel());
-
- // must wrap and rethrow since cause can be of Throwable and we must only throw Exception
- throw new CamelException(exceptionEvent.getCause());
}
@Override
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java?rev=941765&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java Thu May 6 15:01:34 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyComponentWithConfigurationTest extends CamelTestSupport {
+
+ @Test
+ public void testMinaComponentWithConfiguration() throws Exception {
+ NettyComponent comp = context.getComponent("netty", NettyComponent.class);
+
+ NettyConfiguration cfg = new NettyConfiguration();
+ cfg.setTimeout(15000);
+
+ comp.setConfiguration(cfg);
+ assertSame(cfg, comp.getConfiguration());
+
+ NettyEndpoint e1 = (NettyEndpoint) comp.createEndpoint("netty://tcp://localhost:4455");
+ NettyEndpoint e2 = (NettyEndpoint) comp.createEndpoint("netty://tcp://localhost:5566?sync=false");
+
+ // should not be same
+ assertNotSame(e1, e2);
+ assertNotSame(e1.getConfiguration(), e2.getConfiguration());
+
+ e2.getConfiguration().setPort(5566);
+
+ assertEquals(true, e1.getConfiguration().isSync());
+ assertEquals(false, e2.getConfiguration().isSync());
+ assertEquals(15000, e1.getConfiguration().getTimeout());
+ assertEquals(15000, e2.getConfiguration().getTimeout());
+ assertEquals(4455, e1.getConfiguration().getPort());
+ assertEquals(5566, e2.getConfiguration().getPort());
+ }
+
+}
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java?rev=941765&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java Thu May 6 15:01:34 2010
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyExchangeTimeoutTest extends CamelTestSupport {
+
+ private static final int PORT = 6336;
+ protected String uri = "netty:tcp://localhost:" + PORT;
+
+ @Test
+ public void testUsingTimeoutParameter() throws Exception {
+ // use a timeout value of 2 seconds (timeout is in millis) so we should actually get a response in this test
+ Endpoint endpoint = this.context.getEndpoint("netty:tcp://localhost:" + PORT + "?timeout=2000");
+ Producer producer = endpoint.createProducer();
+ producer.start();
+ Exchange exchange = producer.createExchange();
+ exchange.getIn().setBody("Hello World");
+ try {
+ producer.process(exchange);
+ fail("Should have thrown an ExchangeTimedOutException wrapped in a RuntimeCamelException");
+ } catch (Exception e) {
+ assertTrue("Should have thrown an ExchangeTimedOutException", e instanceof ExchangeTimedOutException);
+ }
+ producer.stop();
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from(uri).process(new Processor() {
+ public void process(Exchange e) throws Exception {
+ assertEquals("Hello World", e.getIn().getBody(String.class));
+ // MinaProducer has a default timeout of 30 seconds so we just wait 5 seconds
+ // (template.requestBody is a MinaProducer behind the doors)
+ Thread.sleep(5000);
+
+ e.getOut().setBody("Okay I will be faster in the future");
+ }
+ });
+ }
+ };
+ }
+
+}
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date