You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/03/09 06:06:42 UTC

svn commit: r635171 - in /activemq/camel/trunk/components/camel-mina/src: main/java/org/apache/camel/component/mina/ test/java/org/apache/camel/component/mina/

Author: ningjiang
Date: Sat Mar  8 21:06:41 2008
New Revision: 635171

URL: http://svn.apache.org/viewvc?rev=635171&view=rev
Log:
CAMEL-375 Applied the patch with thanks to Claus, also changed the MinaProducer's default time out to 30 sec

Added:
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTest.java   (with props)
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java   (with props)
Modified:
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=635171&r1=635170&r2=635171&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Sat Mar  8 21:06:41 2008
@@ -42,6 +42,7 @@
 import org.apache.mina.filter.codec.ProtocolEncoderOutput;
 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
 import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
+import org.apache.mina.filter.LoggingFilter;
 import org.apache.mina.transport.socket.nio.DatagramAcceptor;
 import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
 import org.apache.mina.transport.socket.nio.DatagramConnector;
@@ -103,17 +104,14 @@
         // connector config
         SocketConnectorConfig connectorConfig = new SocketConnectorConfig();
         configureSocketCodecFactory(connectorConfig, parameters);
-        // TODO: verbose logging from Mina should use our logger instead of MINA INFO logger
-        //connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
+        connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
 
         // acceptor connectorConfig
         SocketAcceptorConfig acceptorConfig = new SocketAcceptorConfig();
         configureSocketCodecFactory(acceptorConfig, parameters);
         acceptorConfig.setReuseAddress(true);
         acceptorConfig.setDisconnectOnUnbind(true);
-        // TODO: verbose logging from Mina should use our logger instead of MINA INFO logger
-        //acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
-
+        acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
 
         boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
         
@@ -154,15 +152,13 @@
         
         DatagramConnectorConfig connectorConfig = new DatagramConnectorConfig();
         configureDataGramCodecFactory(connectorConfig, parameters);
-        // TODO: verbose logging from Mina should use our logger instead of MINA INFO logger
-        //connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
+        connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
 
         DatagramAcceptorConfig acceptorConfig = new DatagramAcceptorConfig();
         configureDataGramCodecFactory(acceptorConfig, parameters);
         acceptorConfig.setDisconnectOnUnbind(true);
         // reuse address is default true for datagram
-        // TODO: verbose logging from Mina should use our logger instead of MINA INFO logger
-        //acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
+        acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
 
         boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
         

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=635171&r1=635170&r2=635171&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Sat Mar  8 21:06:41 2008
@@ -79,18 +79,24 @@
 
                 if (ExchangeHelper.isOutCapable(exchange)) {
                     Object body = exchange.getOut().getBody();
+                    boolean failed = exchange.isFailed();
 
-                    // TODO: if exchange.isFailed() then out could potential be in - (what should we do)
-
-                    if (body != null) {
+                    if (failed) {
+                        // can not write a response since the exchange is failed and we don't know in what state the
+                        // in/out messages are in so the session is closed
+                        LOG.warn("Can not write body since the exchange is failed, closing session: " + exchange);
+                        session.close();
+                    } else if (body == null) {
+                        // must close session if no data to write otherwise client will never receive a response
+                        // and wait forever (if not timing out)
+                        LOG.warn("Can not write body since its null, closing session: " + exchange);
+                        session.close();
+                    } else {
+                        // we got a response to write
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Writing body: " + body);
                         }
                         session.write(body);
-                    } else {
-                        // must close session if no data to write otherwise client will never receive a response and wait forever
-                        LOG.warn("Can not write body since its null, closing session");
-                        session.close();
                     }
                 } else {
                     if (LOG.isDebugEnabled()) {

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?rev=635171&r1=635170&r2=635171&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java Sat Mar  8 21:06:41 2008
@@ -62,13 +62,12 @@
 
     @Override
     public MinaExchange createExchange(ExchangePattern pattern) {
-        return new MinaExchange(getContext(), pattern);
+        return new MinaExchange(getContext(), pattern, null);
     }
 
     public MinaExchange createExchange(IoSession session, Object object) {
-        MinaExchange exchange = new MinaExchange(getContext(), getExchangePattern());
+        MinaExchange exchange = new MinaExchange(getContext(), getExchangePattern(), session);
         exchange.getIn().setBody(object);
-        // TODO store session in exchange?
         return exchange;
     }
 

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java?rev=635171&r1=635170&r2=635171&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java Sat Mar  8 21:06:41 2008
@@ -20,15 +20,29 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.mina.common.IoSession;
 
 /**
- * A {@link Exchange} for MINA
+ * A {@link Exchange} for Apache MINA.
  * 
  * @version $Revision$
  */
 public class MinaExchange extends DefaultExchange {
 
-    public MinaExchange(CamelContext camelContext, ExchangePattern pattern) {
+    private IoSession session;
+
+    public MinaExchange(CamelContext camelContext, ExchangePattern pattern, IoSession session) {
         super(camelContext, pattern);
+        this.session = session;
+    }
+
+    /**
+     * The associated Mina session, is <b>only</b> available for {@link MinaConsumer}.
+     * 
+     * @return the Mina session.
+     */
+    public IoSession getSession() {
+        return session;
     }
+
 }

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=635171&r1=635170&r2=635171&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Sat Mar  8 21:06:41 2008
@@ -23,6 +23,7 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Producer;
 import org.apache.camel.CamelException;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
@@ -42,7 +43,8 @@
 public class MinaProducer extends DefaultProducer {
     private static final transient Log LOG = LogFactory.getLog(MinaProducer.class);
     // TODO: The max wait response should be configurable
-    private static final long MAX_WAIT_RESPONSE = 10000;
+    // The URI parameter could be a option
+    private static final long MAX_WAIT_RESPONSE = 30000;
     private IoSession session;
     private MinaEndpoint endpoint;
     private CountDownLatch latch;
@@ -76,13 +78,13 @@
                 WriteFuture future = session.write(body);
                 future.join();
                 if (!future.isWritten()) {
-                    throw new CamelException("Timed out waiting for response: " + exchange);
+                    throw new ExchangeTimedOutException(exchange, MAX_WAIT_RESPONSE);
                 }
 
                 // wait for response, consider timeout
                 latch.await(MAX_WAIT_RESPONSE, TimeUnit.MILLISECONDS);
                 if (latch.getCount() == 1) {
-                    throw new CamelException("No response from server within " + MAX_WAIT_RESPONSE + " millisecs");
+                    throw new ExchangeTimedOutException(exchange, MAX_WAIT_RESPONSE);
                 }
 
                 // did we get a response
@@ -155,6 +157,19 @@
             CountDownLatch downLatch = latch;
             if (downLatch != null) {
                 downLatch.countDown();
+            }
+        }
+
+        @Override
+        public void sessionClosed(IoSession session) throws Exception {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Session closed");
+            }
+
+            if (message == null) {
+                // session was closed but no message received. This is because the remote server had an internal error
+                // and could not return a proper response. We should count down to stop waiting for a response
+                countDown();
             }
         }
 

Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTest.java?rev=635171&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTest.java Sat Mar  8 21:06:41 2008
@@ -0,0 +1,42 @@
+package org.apache.camel.component.mina;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * To unit test that we have access to MinaExchange and the Mina session object.
+ *
+ * @version $Revision$
+ */
+public class MinaExchangeTest extends ContextTestSupport {
+
+    protected String uri = "mina:tcp://localhost:8080";
+
+    public void testMinaRoute() throws Exception {
+        MockEndpoint endpoint = getMockEndpoint("mock:result");
+        Object body = "Hello there!";
+        endpoint.expectedBodiesReceived(body);
+
+        template.sendBody(uri, body);
+
+        assertMockEndpointsSatisifed();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from(uri).process(new Processor(){
+                    public void process(Exchange exchange) throws Exception {
+                        assertTrue("Should be MinaExchange", exchange instanceof MinaExchange);
+                        MinaExchange me = (MinaExchange) exchange;
+                        assertNotNull("IoSession should not be null", me.getSession());
+                    }
+                }).to("mock:result");
+            }
+        };
+    }
+
+}

Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java?rev=635171&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java Sat Mar  8 21:06:41 2008
@@ -0,0 +1,41 @@
+package org.apache.camel.component.mina;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * To unit test CAMEL-364.
+ */
+public class MinaTcpWithIoOutProcessorExceptionTest extends ContextTestSupport {
+
+    protected CamelContext container = new DefaultCamelContext();
+
+    private static final int PORT = 6334;
+    // use parameter sync=true to force InOut pattern of the MinaExchange
+    protected String uri = "mina:tcp://localhost:" + PORT + "?textline=true&sync=true";
+
+    public void testExceptionThrownInProcessor() {
+        String body = "Hello World";
+        String out = (String) template.requestBody(uri, body);
+        assertNull("Should not have sent data back", out);
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from(uri).process(new Processor() {
+                    public void process(Exchange e) {
+                        assertEquals("Hello World", e.getIn().getBody(String.class));
+                        // simulate a problem processing the input to see if we can handle it properly
+                        throw new IllegalArgumentException("Forced exception");
+                    }
+                });
+            }
+        };
+    }
+
+}

Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java?rev=635171&r1=635170&r2=635171&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java Sat Mar  8 21:06:41 2008
@@ -3,7 +3,6 @@
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -12,8 +11,6 @@
 import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.DatagramPacket;
-import java.nio.CharBuffer;
-import java.nio.charset.CharsetDecoder;
 
 /**
  * To test InOut exchange for the UDP protocol.