You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/03/06 09:55:07 UTC

svn commit: r634196 - in /activemq/camel/trunk/components/camel-mina/src: main/java/org/apache/camel/component/mina/MinaConsumer.java test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java

Author: ningjiang
Date: Thu Mar  6 00:55:05 2008
New Revision: 634196

URL: http://svn.apache.org/viewvc?rev=634196&view=rev
Log:
CAMEL-356 Applied the patch with thanks to Claus

Modified:
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=634196&r1=634195&r2=634196&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Thu Mar  6 00:55:05 2008
@@ -17,6 +17,7 @@
 package org.apache.camel.component.mina;
 
 import org.apache.camel.Processor;
+import org.apache.camel.CamelException;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
@@ -29,7 +30,7 @@
 import java.net.SocketAddress;
 
 /**
- * A @{link Consumer} implementation for MINA
+ * A {@link org.apache.camel.Consumer Consumer} implementation for Apache MINA.
  * @version $Revision$
  */
 public class MinaConsumer extends DefaultConsumer<MinaExchange> {
@@ -54,6 +55,17 @@
         }
 
         IoHandler handler = new IoHandlerAdapter() {
+
+            @Override
+            public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+                // close invalid session
+                LOG.debug("Closing session as an exception was thrown from MINA");
+                session.close();
+
+                // must wrap and rethrow since cause can be of Throwable and we must only throw Exception
+                throw new CamelException(cause);
+            }
+
             @Override
             public void messageReceived(IoSession session, Object object) throws Exception {
                 if (LOG.isDebugEnabled()) {
@@ -65,10 +77,19 @@
 
                 if (ExchangeHelper.isOutCapable(exchange)) {
                     Object body = exchange.getOut().getBody();
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Writing body: " + body);
+
+                    // TODO: if exchange.isFailed() then out could potential be in - (what should we do)
+
+                    if (body != null) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Writing body: " + body);
+                        }
+                        session.write(body);
+                    } else {
+                        // must close session if no data to write otherwise client will never receive a response and wait forever
+                        LOG.warn("Can not write body since its null, closing session");
+                        session.close();
                     }
-                    session.write(body);
                 } else {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Can not write body since this exchange is not out capable: " + exchange);
@@ -85,4 +106,5 @@
         acceptor.unbind(address);
         super.doStop();
     }
+    
 }

Modified: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java?rev=634196&r1=634195&r2=634196&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java Thu Mar  6 00:55:05 2008
@@ -30,11 +30,14 @@
 import java.io.IOException;
 
 /**
+ * To test camel-mina component using a TCP client that communicates using TCP socket communication.
+ *
  * @version $Revision$
  */
 public class MinaTcpWithInOutUsingPlainSocketTest extends TestCase {
 
     protected CamelContext container = new DefaultCamelContext();
+
     // use parameter sync=true to force InOut pattern of the MinaExchange 
     protected String uri = "mina:tcp://localhost:8888?textline=true&sync=true";
 
@@ -55,6 +58,25 @@
         assertEquals("Hello Paris", paris);
     }
 
+    public void testReceiveNoResponseSinceOutBodyIsNull() throws Exception {
+        String out = sendAndReceive("force-null-out-body");
+        assertNull("no data should be recieved", out);
+    }
+
+    public void testReceiveNoResponseSinceOutBodyIsNullTwice() throws Exception {
+        String out = sendAndReceive("force-null-out-body");
+        assertNull("no data should be recieved", out);
+
+        out = sendAndReceive("force-null-out-body");
+        assertNull("no data should be recieved", out);
+    }
+
+    public void testExchangeFailedOutShouldBeNull() throws Exception {
+        String out = sendAndReceive("force-exception");
+        assertTrue("out should not be the same as in when the exchange has failed", !"force-exception".equals(out));
+        assertNull("no data should be retrieved", out);
+    }
+
     private String sendAndReceive(String input) throws IOException {
         byte buf[] = new byte[128];
 
@@ -70,10 +92,14 @@
             os.write((input + "\n").getBytes());
 
             is = soc.getInputStream();
-            is.read(buf);
+            int len = is.read(buf);
+            if (len == -1) {
+                // no data received
+                return null;
+            }
         } finally {
-            is.close();
-            os.close();
+            if (is != null) is.close();
+            if (os != null) os.close();
             soc.close();
         }
 
@@ -81,7 +107,7 @@
         StringBuffer sb = new StringBuffer();
         for (byte b : buf) {
             char ch = (char) b;
-            if (ch == '\n' || b == 0) {
+            if (ch == '\n' || ch == 0) {
                 // newline denotes end of text (added in the end in the processor below)
                 break;
             } else {
@@ -110,8 +136,17 @@
                 from(uri).process(new Processor() {
                     public void process(Exchange e) {
                         String in = e.getIn().getBody(String.class);
-                        // append newline at end to denote end of data for textline codec
-                        e.getOut().setBody("Hello " + in + "\n");
+                        if ("force-null-out-body".equals(in)) {
+                            // forcing a null out body
+                            e.getOut().setBody(null);
+                        } else if ("force-exception".equals(in)) {
+                            // clear out before throwing exception
+                            e.getOut().setBody(null);
+                            throw new IllegalArgumentException("Forced exception");
+                        } else {
+                            // append newline as stop character for textline codec
+                            e.getOut().setBody("Hello " + in + "\n");
+                        }
                     }
                 });
             }