You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/03/09 10:51:14 UTC
svn commit: r635206 - in /activemq/camel/trunk/components/camel-mina/src:
main/java/org/apache/camel/component/mina/
test/java/org/apache/camel/component/mina/
Author: ningjiang
Date: Sun Mar 9 01:51:11 2008
New Revision: 635206
URL: http://svn.apache.org/viewvc?rev=635206&view=rev
Log:
CAMEL-257 CAMEL-371 patch applied with thanks to Claus
Added:
activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaComponentTest.java (with props)
activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTimeOutTest.java (with props)
Modified:
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=635206&r1=635205&r2=635206&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Sun Mar 9 01:51:11 2008
@@ -93,7 +93,7 @@
IoAcceptor acceptor = new VmPipeAcceptor();
SocketAddress address = new VmPipeAddress(connectUri.getPort());
IoConnector connector = new VmPipeConnector();
- return new MinaEndpoint(uri, this, address, acceptor, null, connector, null, false);
+ return new MinaEndpoint(uri, this, address, acceptor, null, connector, null, false, 0);
}
protected MinaEndpoint createSocketEndpoint(String uri, URI connectUri, Map parameters) {
@@ -114,8 +114,9 @@
acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
-
- MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, acceptorConfig, connector, connectorConfig, lazySessionCreation);
+ long timeout = getTimeoutParameter(parameters);
+
+ MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, acceptorConfig, connector, connectorConfig, lazySessionCreation, timeout);
boolean sync = ObjectConverter.toBool(parameters.get("sync"));
if (sync) {
@@ -149,7 +150,7 @@
IoAcceptor acceptor = new DatagramAcceptor();
SocketAddress address = new InetSocketAddress(connectUri.getHost(), connectUri.getPort());
IoConnector connector = new DatagramConnector();
-
+
DatagramConnectorConfig connectorConfig = new DatagramConnectorConfig();
configureDataGramCodecFactory(connectorConfig, parameters);
connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
@@ -161,8 +162,9 @@
acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
-
- MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, acceptorConfig, connector, connectorConfig, lazySessionCreation);
+ long timeout = getTimeoutParameter(parameters);
+
+ MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, acceptorConfig, connector, connectorConfig, lazySessionCreation, timeout);
boolean sync = ObjectConverter.toBool(parameters.get("sync"));
if (sync) {
@@ -174,6 +176,20 @@
return endpoint;
}
+ private static long getTimeoutParameter(Map parameters) throws IllegalArgumentException {
+ long timeout = 0;
+ String value = (String) parameters.get("timeout");
+ if (value != null) {
+ try {
+ timeout = ObjectConverter.toLong(value);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("The timeout parameter is not a number: " + value);
+ }
+ }
+
+ return timeout;
+ }
+
/**
* For datagrams the entire message is available as a single ByteBuffer so lets just pass those around by default
* and try converting whatever they payload is into ByteBuffers unless some custom converter is specified
@@ -198,6 +214,9 @@
public ProtocolDecoder getDecoder() throws Exception {
return new ProtocolDecoder() {
public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
+ // must acquire the bytebuffer since we just pass it below instead of creating a new one (CAMEL-257)
+ in.acquire();
+
// lets just pass the ByteBuffer in
out.write(in);
}
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?rev=635206&r1=635205&r2=635206&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java Sun Mar 9 01:51:11 2008
@@ -35,6 +35,10 @@
* @version $Revision$
*/
public class MinaEndpoint extends DefaultEndpoint<MinaExchange> {
+
+ private static final long DEFAULT_TIMEOUT = 30000;
+ private long timeout = DEFAULT_TIMEOUT;
+
private final IoAcceptor acceptor;
private final SocketAddress address;
private final IoConnector connector;
@@ -42,7 +46,7 @@
private final IoConnectorConfig connectorConfig;
private final boolean lazySessionCreation;
- public MinaEndpoint(String endpointUri, MinaComponent component, SocketAddress address, IoAcceptor acceptor, IoAcceptorConfig acceptorConfig, IoConnector connector, IoConnectorConfig connectorConfig, boolean lazySessionCreation) {
+ public MinaEndpoint(String endpointUri, MinaComponent component, SocketAddress address, IoAcceptor acceptor, IoAcceptorConfig acceptorConfig, IoConnector connector, IoConnectorConfig connectorConfig, boolean lazySessionCreation, long timeout) {
super(endpointUri, component);
this.address = address;
this.acceptor = acceptor;
@@ -50,6 +54,10 @@
this.connectorConfig = connectorConfig;
this.connector = connector;
this.lazySessionCreation = lazySessionCreation;
+ if (timeout > 0) {
+ // override default timeout if provided
+ this.timeout = timeout;
+ }
}
public Producer<MinaExchange> createProducer() throws Exception {
@@ -85,7 +93,7 @@
return connector;
}
- public boolean getLazySessionCreation() {
+ public boolean isLazySessionCreation() {
return lazySessionCreation;
}
@@ -101,4 +109,8 @@
return true;
}
+ public long getTimeout() {
+ return timeout;
+ }
+
}
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=635206&r1=635205&r2=635206&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Sun Mar 9 01:51:11 2008
@@ -42,18 +42,17 @@
*/
public class MinaProducer extends DefaultProducer {
private static final transient Log LOG = LogFactory.getLog(MinaProducer.class);
- // TODO: The max wait response should be configurable
- // The URI parameter could be a option
- private static final long MAX_WAIT_RESPONSE = 30000;
private IoSession session;
private MinaEndpoint endpoint;
private CountDownLatch latch;
private boolean lazySessionCreation;
+ private long timeout;
public MinaProducer(MinaEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
- this.lazySessionCreation = this.endpoint.getLazySessionCreation();
+ this.lazySessionCreation = this.endpoint.isLazySessionCreation();
+ this.timeout = this.endpoint.getTimeout();
}
public void process(Exchange exchange) throws Exception {
@@ -78,13 +77,13 @@
WriteFuture future = session.write(body);
future.join();
if (!future.isWritten()) {
- throw new ExchangeTimedOutException(exchange, MAX_WAIT_RESPONSE);
+ throw new ExchangeTimedOutException(exchange, timeout);
}
// wait for response, consider timeout
- latch.await(MAX_WAIT_RESPONSE, TimeUnit.MILLISECONDS);
+ latch.await(timeout, TimeUnit.MILLISECONDS);
if (latch.getCount() == 1) {
- throw new ExchangeTimedOutException(exchange, MAX_WAIT_RESPONSE);
+ throw new ExchangeTimedOutException(exchange, timeout);
}
// did we get a response
@@ -121,7 +120,7 @@
SocketAddress address = endpoint.getAddress();
IoConnector connector = endpoint.getConnector();
if (LOG.isDebugEnabled()) {
- LOG.debug("Creating connector to address: " + address + " using connector: " + connector);
+ LOG.debug("Creating connector to address: " + address + " using connector: " + connector + " timeout: " + timeout + " millis.");
}
IoHandler ioHandler = new ResponseHandler(endpoint);
ConnectFuture future = connector.connect(address, ioHandler, endpoint.getConnectorConfig());
Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaComponentTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaComponentTest.java?rev=635206&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaComponentTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaComponentTest.java Sun Mar 9 01:51:11 2008
@@ -0,0 +1,24 @@
+package org.apache.camel.component.mina;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ResolveEndpointFailedException;
+
+/**
+ * For testing various minor holes that hasn't been covered by other unit tests.
+ *
+ * @version $Revision$
+ */
+public class MinaComponentTest extends ContextTestSupport {
+
+ public void testUnknownProtocol() {
+ try {
+ template.setDefaultEndpointUri("mina:xxx://localhost:8080");
+ template.sendBody("mina:xxx://localhost:8080");
+ fail("Should have thrown a ResolveEndpointFailedException");
+ } catch (ResolveEndpointFailedException e) {
+ assertTrue("Should be an IAE exception", e.getCause() instanceof IllegalArgumentException);
+ assertEquals("Unrecognised MINA protocol: xxx for uri: mina:xxx://localhost:8080", e.getCause().getMessage());
+ }
+ }
+
+}
Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaComponentTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaComponentTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTimeOutTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTimeOutTest.java?rev=635206&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTimeOutTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTimeOutTest.java Sun Mar 9 01:51:11 2008
@@ -0,0 +1,77 @@
+package org.apache.camel.component.mina;
+
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * To test timeout.
+ *
+ * @version $Revision$
+ */
+public class MinaExchangeTimeOutTest extends ContextTestSupport {
+
+ private static final Log LOG = LogFactory.getLog(MinaExchangeTimeOutTest.class);
+
+ private static final int PORT = 6335;
+ protected String uri = "mina:tcp://localhost:" + PORT + "?textline=true&sync=true";
+
+ public void testTimedOut() {
+ LOG.info("Sending a message to Camel that should timeout after 30 sec, so be patient");
+
+ // default timeout is 30 sec so in the router below the response is slow and we timeout
+ try {
+ template.requestBody(uri, "Hello World");
+ fail("Should have thrown an ExchangeTimedOutException wrapped in a RuntimeCamelException");
+ } catch (RuntimeCamelException e) {
+ assertTrue("Should have thrown an ExchangeTimedOutException", e.getCause() instanceof ExchangeTimedOutException);
+ }
+ }
+
+ public void testUsingTimeoutParameter() throws Exception {
+ LOG.info("Sending a message to Camel that takes 35 sec to reply, so be patient");
+
+ // use a timeout value of 40 seconds (timeout is in millis) so we should actually get a response in this test
+ Endpoint endpoint = this.context.getEndpoint("mina:tcp://localhost:" + PORT + "?textline=true&sync=true&timeout=40000");
+ Producer producer = endpoint.createProducer();
+ producer.start();
+ Exchange exchange = producer.createExchange();
+ exchange.getIn().setBody("Hello World");
+ producer.process(exchange);
+
+ String out = exchange.getOut().getBody(String.class);
+ assertEquals("Okay I will be faster in the future", out);
+
+ producer.stop();
+ }
+
+ public void testTimeoutInvalidParameter() throws Exception {
+ // invalid timeout parameter that can not be converted to a number
+ try {
+ this.context.getEndpoint("mina:tcp://localhost:" + PORT + "?textline=true&sync=true&timeout=XXX");
+ fail("Should have thrown a ResolveEndpointFailedException due to invalid timeout parameter");
+ } catch (ResolveEndpointFailedException e) {
+ assertTrue(e.getCause() instanceof IllegalArgumentException);
+ assertEquals("The timeout parameter is not a number: XXX", e.getCause().getMessage());
+ }
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from(uri).process(new Processor() {
+ public void process(Exchange e) throws Exception {
+ assertEquals("Hello World", e.getIn().getBody(String.class));
+ // MinaProducer has a default timeout of 30 seconds so we wait 35 seconds
+ // (template.requestBody is a MinaProducer behind the doors)
+ Thread.sleep(35000);
+
+ e.getOut().setBody("Okay I will be faster in the future");
+ }
+ });
+ }
+ };
+ }
+
+}
Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTimeOutTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTimeOutTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date