You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2008/03/05 17:00:35 UTC

svn commit: r633888 - in /activemq/camel/trunk/components/camel-mina/src: main/java/org/apache/camel/component/mina/ test/java/org/apache/camel/component/mina/

Author: hadrian
Date: Wed Mar  5 08:00:24 2008
New Revision: 633888

URL: http://svn.apache.org/viewvc?rev=633888&view=rev
Log:
CAMEL-362.

Modified:
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=633888&r1=633887&r2=633888&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Wed Mar  5 08:00:24 2008
@@ -90,7 +90,7 @@
         IoAcceptor acceptor = new VmPipeAcceptor();
         SocketAddress address = new VmPipeAddress(connectUri.getPort());
         IoConnector connector = new VmPipeConnector();
-        return new MinaEndpoint(uri, this, address, acceptor, connector, null);
+        return new MinaEndpoint(uri, this, address, acceptor, connector, null, false);
     }
 
     protected MinaEndpoint createSocketEndpoint(String uri, URI connectUri, Map parameters) {
@@ -101,7 +101,10 @@
         // TODO customize the config via URI
         SocketConnectorConfig config = new SocketConnectorConfig();
         configureSocketCodecFactory(config, parameters);
-        MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, connector, config);
+        
+        boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
+        
+        MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, connector, config, lazySessionCreation);
 
         boolean sync = ObjectConverter.toBool(parameters.get("sync"));
         if (sync) {
@@ -141,7 +144,9 @@
 
         configureDataGramCodecFactory(config, parameters);
 
-        return new MinaEndpoint(uri, this, address, acceptor, connector, config);
+        boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
+        
+        return new MinaEndpoint(uri, this, address, acceptor, connector, config, lazySessionCreation);
     }
 
     /**

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?rev=633888&r1=633887&r2=633888&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java Wed Mar  5 08:00:24 2008
@@ -36,13 +36,15 @@
     private final SocketAddress address;
     private final IoConnector connector;
     private final IoServiceConfig config;
+    private final boolean lazySessionCreation;
 
-    public MinaEndpoint(String endpointUri, MinaComponent component, SocketAddress address, IoAcceptor acceptor, IoConnector connector, IoServiceConfig config) {
+    public MinaEndpoint(String endpointUri, MinaComponent component, SocketAddress address, IoAcceptor acceptor, IoConnector connector, IoServiceConfig config, boolean lazySessionCreation) {
         super(endpointUri, component);
         this.config = config;
         this.address = address;
         this.acceptor = acceptor;
         this.connector = connector;
+        this.lazySessionCreation = lazySessionCreation;
     }
 
     public Producer<MinaExchange> createProducer() throws Exception {
@@ -83,6 +85,10 @@
         return config;
     }
 
+    public boolean getLazySessionCreation() {
+    	return lazySessionCreation;
+    }
+    
     public boolean isSingleton() {
         return true;
     }

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=633888&r1=633887&r2=633888&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Wed Mar  5 08:00:24 2008
@@ -44,24 +44,26 @@
     private IoSession session;
     private MinaEndpoint endpoint;
     private CountDownLatch latch;
+    private boolean lazySessionCreation;
 
     public MinaProducer(MinaEndpoint endpoint) {
         super(endpoint);
         this.endpoint = endpoint;
+        this.lazySessionCreation = this.endpoint.getLazySessionCreation();
     }
 
     public void process(Exchange exchange) throws Exception {
-        if (session == null) {
+        if (session == null && !lazySessionCreation) {
             throw new IllegalStateException("Not started yet!");
         }
-        if (!session.isConnected()) {
-            doStart();
+        if (session == null || !session.isConnected()) {
+            openConnection();
         }
+
         Object body = exchange.getIn().getBody();
         if (body == null) {
             LOG.warn("No payload for exchange: " + exchange);
-        }
-        else {
+        } else {
             if (ExchangeHelper.isOutCapable(exchange)) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Writing body : " + body);
@@ -70,24 +72,24 @@
                 WriteFuture future = session.write(body);
                 future.join();
                 if (!future.isWritten()) {
-                    throw new RuntimeException("Timed out waiting for response: " + exchange);
+                    throw new RuntimeException(
+                        "Timed out waiting for response: " + exchange);
                 }
                 latch.await(MAX_WAIT_RESPONSE, TimeUnit.MILLISECONDS);
                 if (latch.getCount() == 1) {
-                    throw new RuntimeException("No response from server within " + MAX_WAIT_RESPONSE + " millisecs");
+                    throw new RuntimeException("No response from server within "
+                        + MAX_WAIT_RESPONSE + " millisecs");
                 }
-                ResponseHandler handler = (ResponseHandler) session.getHandler();
+                ResponseHandler handler = (ResponseHandler)session.getHandler();
                 if (handler.getCause() != null) {
                     throw new Exception("Response Handler had an exception", handler.getCause());
-                }
-                else {
+                } else {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Handler message: " + handler.getMessage());
                     }
                     exchange.getOut().setBody(handler.getMessage());
                 }
-            }
-            else {
+            } else {
                 session.write(body);
             }
         }
@@ -95,6 +97,19 @@
 
     @Override
     protected void doStart() throws Exception {
+        if (!lazySessionCreation) {
+            openConnection();
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (session != null) {
+            session.close().join(2000);
+        }
+    }
+
+    private void openConnection() {
         SocketAddress address = endpoint.getAddress();
         IoConnector connector = endpoint.getConnector();
         if (LOG.isDebugEnabled()) {
@@ -104,13 +119,6 @@
         ConnectFuture future = connector.connect(address, ioHandler, endpoint.getConfig());
         future.join();
         session = future.getSession();
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        if (session != null) {
-            session.close().join(2000);
-        }
     }
 
     /**

Modified: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java?rev=633888&r1=633887&r2=633888&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java Wed Mar  5 08:00:24 2008
@@ -35,14 +35,23 @@
  * @version $Revision$
  */
 public class MinaTcpWithInOutTest extends TestCase {
-    protected CamelContext container = new DefaultCamelContext();
-    protected CountDownLatch latch = new CountDownLatch(1);
+	
+    protected String uri;
     protected Exchange receivedExchange;
-    protected String uri = "mina:tcp://localhost:6321?textline=true";
-    protected Producer<Exchange> producer;
-    private ReverserServer server;
-
+    protected CountDownLatch latch;
+    protected CamelContext container;
+	
     public void testMinaRouteWithInOut() throws Exception {
+    	container = new DefaultCamelContext();
+    	latch = new CountDownLatch(1);
+    	uri = "mina:tcp://localhost:6321?textline=true";
+    	Producer<Exchange> producer;
+    	ReverserServer server;
+        server = new ReverserServer();
+        server.start();
+        container.addRoutes(createRouteBuilder());
+        container.start();
+    	
         // now lets fire in a message
         Endpoint<Exchange> endpoint = container.getEndpoint("direct:x");
         Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
@@ -60,24 +69,56 @@
         assertTrue("Did not receive the message!", received);
         assertNotNull(receivedExchange.getIn());
         assertEquals("!olleH", receivedExchange.getIn().getBody());
+
+        if (producer != null) {
+            producer.stop();
+        }
+        container.stop();
+        server.stop();
     }
     
-    @Override
-    protected void setUp() throws Exception {
-        server = new ReverserServer();
-        server.start();
+    public void testMinaRouteWithInOutLazy() throws Exception {
+    	container = new DefaultCamelContext();
+    	latch = new CountDownLatch(1);
+    	uri = "mina:tcp://localhost:6321?textline=true&lazySessionCreation=true";
+    	Producer<Exchange> producer;
         container.addRoutes(createRouteBuilder());
         container.start();
-    }
+    	ReverserServer server;          //The server is activated after Camel to check if the lazyness is working
+        server = new ReverserServer();
+        server.start();
+        
+        // now lets fire in a message
+        Endpoint<Exchange> endpoint = container.getEndpoint("direct:x");
+        Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
+        Message message = exchange.getIn();
+        String hello = "Hello!";
+        message.setBody(hello);
+        message.setHeader("cheese", 123);
 
+        producer = endpoint.createProducer();
+        producer.start();
+        producer.process(exchange);
+
+        // now lets sleep for a while
+        boolean received = latch.await(5, TimeUnit.SECONDS);
+        assertTrue("Did not receive the message!", received);
+        assertNotNull(receivedExchange.getIn());
+        assertEquals("!olleH", receivedExchange.getIn().getBody());
 
-    @Override
-    protected void tearDown() throws Exception {
         if (producer != null) {
             producer.stop();
         }
         container.stop();
         server.stop();
+    }
+    
+    @Override
+    protected void setUp() throws Exception {
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
     }
 
     protected RouteBuilder createRouteBuilder() {