You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/03/09 10:51:14 UTC

svn commit: r635206 - in /activemq/camel/trunk/components/camel-mina/src: main/java/org/apache/camel/component/mina/ test/java/org/apache/camel/component/mina/

Author: ningjiang
Date: Sun Mar  9 01:51:11 2008
New Revision: 635206

URL: http://svn.apache.org/viewvc?rev=635206&view=rev
Log:
CAMEL-257 CAMEL-371 patch applied with thanks to Claus

Added:
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaComponentTest.java   (with props)
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTimeOutTest.java   (with props)
Modified:
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=635206&r1=635205&r2=635206&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Sun Mar  9 01:51:11 2008
@@ -93,7 +93,7 @@
         IoAcceptor acceptor = new VmPipeAcceptor();
         SocketAddress address = new VmPipeAddress(connectUri.getPort());
         IoConnector connector = new VmPipeConnector();
-        return new MinaEndpoint(uri, this, address, acceptor, null, connector, null, false);
+        return new MinaEndpoint(uri, this, address, acceptor, null, connector, null, false, 0);
     }
 
     protected MinaEndpoint createSocketEndpoint(String uri, URI connectUri, Map parameters) {
@@ -114,8 +114,9 @@
         acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
 
         boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
-        
-        MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, acceptorConfig, connector, connectorConfig, lazySessionCreation);
+        long timeout = getTimeoutParameter(parameters);
+
+        MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, acceptorConfig, connector, connectorConfig, lazySessionCreation, timeout);
 
         boolean sync = ObjectConverter.toBool(parameters.get("sync"));
         if (sync) {
@@ -149,7 +150,7 @@
         IoAcceptor acceptor = new DatagramAcceptor();
         SocketAddress address = new InetSocketAddress(connectUri.getHost(), connectUri.getPort());
         IoConnector connector = new DatagramConnector();
-        
+
         DatagramConnectorConfig connectorConfig = new DatagramConnectorConfig();
         configureDataGramCodecFactory(connectorConfig, parameters);
         connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
@@ -161,8 +162,9 @@
         acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
 
         boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
-        
-        MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, acceptorConfig, connector, connectorConfig, lazySessionCreation);
+        long timeout = getTimeoutParameter(parameters);
+
+        MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, acceptorConfig, connector, connectorConfig, lazySessionCreation, timeout);
 
         boolean sync = ObjectConverter.toBool(parameters.get("sync"));
         if (sync) {
@@ -174,6 +176,20 @@
         return endpoint;
     }
 
+    private static long getTimeoutParameter(Map parameters) throws IllegalArgumentException {
+        long timeout = 0;
+        String value = (String) parameters.get("timeout");
+        if (value != null) {
+            try {
+                timeout = ObjectConverter.toLong(value);
+            } catch (NumberFormatException e) {
+                throw new IllegalArgumentException("The timeout parameter is not a number: " + value);
+            }
+        }
+
+        return timeout;
+    }
+
     /**
      * For datagrams the entire message is available as a single ByteBuffer so lets just pass those around by default
      * and try converting whatever they payload is into ByteBuffers unless some custom converter is specified
@@ -198,6 +214,9 @@
                 public ProtocolDecoder getDecoder() throws Exception {
                     return new ProtocolDecoder() {
                         public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
+                            // must acquire the bytebuffer since we just pass it below instead of creating a new one (CAMEL-257)
+                            in.acquire();
+
                             // lets just pass the ByteBuffer in
                             out.write(in);
                         }

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?rev=635206&r1=635205&r2=635206&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java Sun Mar  9 01:51:11 2008
@@ -35,6 +35,10 @@
  * @version $Revision$
  */
 public class MinaEndpoint extends DefaultEndpoint<MinaExchange> {
+
+    private static final long DEFAULT_TIMEOUT = 30000;
+    private long timeout = DEFAULT_TIMEOUT;
+
     private final IoAcceptor acceptor;
     private final SocketAddress address;
     private final IoConnector connector;
@@ -42,7 +46,7 @@
     private final IoConnectorConfig connectorConfig;
     private final boolean lazySessionCreation;
 
-    public MinaEndpoint(String endpointUri, MinaComponent component, SocketAddress address, IoAcceptor acceptor, IoAcceptorConfig acceptorConfig, IoConnector connector, IoConnectorConfig connectorConfig, boolean lazySessionCreation) {
+    public MinaEndpoint(String endpointUri, MinaComponent component, SocketAddress address, IoAcceptor acceptor, IoAcceptorConfig acceptorConfig, IoConnector connector, IoConnectorConfig connectorConfig, boolean lazySessionCreation, long timeout) {
         super(endpointUri, component);
         this.address = address;
         this.acceptor = acceptor;
@@ -50,6 +54,10 @@
         this.connectorConfig = connectorConfig;
         this.connector = connector;
         this.lazySessionCreation = lazySessionCreation;
+        if (timeout > 0) {
+            // override default timeout if provided
+            this.timeout = timeout;
+        }
     }
 
     public Producer<MinaExchange> createProducer() throws Exception {
@@ -85,7 +93,7 @@
         return connector;
     }
 
-    public boolean getLazySessionCreation() {
+    public boolean isLazySessionCreation() {
     	return lazySessionCreation;
     }
 
@@ -101,4 +109,8 @@
         return true;
     }
 
+    public long getTimeout() {
+        return timeout;
+    }
+    
 }

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=635206&r1=635205&r2=635206&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Sun Mar  9 01:51:11 2008
@@ -42,18 +42,17 @@
  */
 public class MinaProducer extends DefaultProducer {
     private static final transient Log LOG = LogFactory.getLog(MinaProducer.class);
-    // TODO: The max wait response should be configurable
-    // The URI parameter could be a option
-    private static final long MAX_WAIT_RESPONSE = 30000;
     private IoSession session;
     private MinaEndpoint endpoint;
     private CountDownLatch latch;
     private boolean lazySessionCreation;
+    private long timeout;
 
     public MinaProducer(MinaEndpoint endpoint) {
         super(endpoint);
         this.endpoint = endpoint;
-        this.lazySessionCreation = this.endpoint.getLazySessionCreation();
+        this.lazySessionCreation = this.endpoint.isLazySessionCreation();
+        this.timeout = this.endpoint.getTimeout();
     }
 
     public void process(Exchange exchange) throws Exception {
@@ -78,13 +77,13 @@
                 WriteFuture future = session.write(body);
                 future.join();
                 if (!future.isWritten()) {
-                    throw new ExchangeTimedOutException(exchange, MAX_WAIT_RESPONSE);
+                    throw new ExchangeTimedOutException(exchange, timeout);
                 }
 
                 // wait for response, consider timeout
-                latch.await(MAX_WAIT_RESPONSE, TimeUnit.MILLISECONDS);
+                latch.await(timeout, TimeUnit.MILLISECONDS);
                 if (latch.getCount() == 1) {
-                    throw new ExchangeTimedOutException(exchange, MAX_WAIT_RESPONSE);
+                    throw new ExchangeTimedOutException(exchange, timeout);
                 }
 
                 // did we get a response
@@ -121,7 +120,7 @@
         SocketAddress address = endpoint.getAddress();
         IoConnector connector = endpoint.getConnector();
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Creating connector to address: " + address + " using connector: " + connector);
+            LOG.debug("Creating connector to address: " + address + " using connector: " + connector + " timeout: " + timeout + " millis.");
         }
         IoHandler ioHandler = new ResponseHandler(endpoint);
         ConnectFuture future = connector.connect(address, ioHandler, endpoint.getConnectorConfig());

Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaComponentTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaComponentTest.java?rev=635206&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaComponentTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaComponentTest.java Sun Mar  9 01:51:11 2008
@@ -0,0 +1,24 @@
+package org.apache.camel.component.mina;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ResolveEndpointFailedException;
+
+/**
+ * For testing various minor holes that hasn't been covered by other unit tests.
+ *
+ * @version $Revision$
+ */
+public class MinaComponentTest extends ContextTestSupport {
+
+    public void testUnknownProtocol() {
+        try {
+            template.setDefaultEndpointUri("mina:xxx://localhost:8080");
+            template.sendBody("mina:xxx://localhost:8080");
+            fail("Should have thrown a ResolveEndpointFailedException");
+        } catch (ResolveEndpointFailedException e) {
+            assertTrue("Should be an IAE exception", e.getCause() instanceof IllegalArgumentException);
+            assertEquals("Unrecognised MINA protocol: xxx for uri: mina:xxx://localhost:8080", e.getCause().getMessage());
+        }
+    }
+
+}

Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaComponentTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaComponentTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTimeOutTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTimeOutTest.java?rev=635206&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTimeOutTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTimeOutTest.java Sun Mar  9 01:51:11 2008
@@ -0,0 +1,77 @@
+package org.apache.camel.component.mina;
+
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * To test timeout.
+ *
+ * @version $Revision$
+ */
+public class MinaExchangeTimeOutTest extends ContextTestSupport {
+
+    private static final Log LOG = LogFactory.getLog(MinaExchangeTimeOutTest.class);
+
+    private static final int PORT = 6335;
+    protected String uri = "mina:tcp://localhost:" + PORT + "?textline=true&sync=true";
+
+    public void testTimedOut() {
+        LOG.info("Sending a message to Camel that should timeout after 30 sec, so be patient");
+
+        // default timeout is 30 sec so in the router below the response is slow and we timeout
+        try {
+            template.requestBody(uri, "Hello World");
+            fail("Should have thrown an ExchangeTimedOutException wrapped in a RuntimeCamelException");
+        } catch (RuntimeCamelException e) {
+            assertTrue("Should have thrown an ExchangeTimedOutException", e.getCause() instanceof ExchangeTimedOutException);
+        }
+    }
+
+    public void testUsingTimeoutParameter() throws Exception {
+        LOG.info("Sending a message to Camel that takes 35 sec to reply, so be patient");
+
+        // use a timeout value of 40 seconds (timeout is in millis) so we should actually get a response in this test
+        Endpoint endpoint = this.context.getEndpoint("mina:tcp://localhost:" + PORT + "?textline=true&sync=true&timeout=40000");
+        Producer producer = endpoint.createProducer();
+        producer.start();
+        Exchange exchange = producer.createExchange();
+        exchange.getIn().setBody("Hello World");
+        producer.process(exchange);
+
+        String out = exchange.getOut().getBody(String.class);
+        assertEquals("Okay I will be faster in the future", out);
+
+        producer.stop();
+    }
+
+    public void testTimeoutInvalidParameter() throws Exception {
+        // invalid timeout parameter that can not be converted to a number
+        try {
+            this.context.getEndpoint("mina:tcp://localhost:" + PORT + "?textline=true&sync=true&timeout=XXX");
+            fail("Should have thrown a ResolveEndpointFailedException due to invalid timeout parameter");
+        } catch (ResolveEndpointFailedException e) {
+            assertTrue(e.getCause() instanceof IllegalArgumentException);
+            assertEquals("The timeout parameter is not a number: XXX", e.getCause().getMessage());
+        }
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from(uri).process(new Processor() {
+                    public void process(Exchange e) throws Exception {
+                        assertEquals("Hello World", e.getIn().getBody(String.class));
+                        // MinaProducer has a default timeout of 30 seconds so we wait 35 seconds
+                        // (template.requestBody is a MinaProducer behind the doors)
+                        Thread.sleep(35000);
+
+                        e.getOut().setBody("Okay I will be faster in the future");
+                    }
+                });
+            }
+        };
+    }
+
+}

Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTimeOutTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTimeOutTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date