You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/03/06 09:55:07 UTC
svn commit: r634196 - in /activemq/camel/trunk/components/camel-mina/src:
main/java/org/apache/camel/component/mina/MinaConsumer.java
test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java
Author: ningjiang
Date: Thu Mar 6 00:55:05 2008
New Revision: 634196
URL: http://svn.apache.org/viewvc?rev=634196&view=rev
Log:
CAMEL-356 Applied the patch with thanks to Claus
Modified:
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=634196&r1=634195&r2=634196&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Thu Mar 6 00:55:05 2008
@@ -17,6 +17,7 @@
package org.apache.camel.component.mina;
import org.apache.camel.Processor;
+import org.apache.camel.CamelException;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
@@ -29,7 +30,7 @@
import java.net.SocketAddress;
/**
- * A @{link Consumer} implementation for MINA
+ * A {@link org.apache.camel.Consumer Consumer} implementation for Apache MINA.
* @version $Revision$
*/
public class MinaConsumer extends DefaultConsumer<MinaExchange> {
@@ -54,6 +55,17 @@
}
IoHandler handler = new IoHandlerAdapter() {
+
+ @Override
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+ // close invalid session
+ LOG.debug("Closing session as an exception was thrown from MINA");
+ session.close();
+
+ // must wrap and rethrow since cause can be of Throwable and we must only throw Exception
+ throw new CamelException(cause);
+ }
+
@Override
public void messageReceived(IoSession session, Object object) throws Exception {
if (LOG.isDebugEnabled()) {
@@ -65,10 +77,19 @@
if (ExchangeHelper.isOutCapable(exchange)) {
Object body = exchange.getOut().getBody();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Writing body: " + body);
+
+ // TODO: if exchange.isFailed() then out could potential be in - (what should we do)
+
+ if (body != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing body: " + body);
+ }
+ session.write(body);
+ } else {
+ // must close session if no data to write otherwise client will never receive a response and wait forever
+ LOG.warn("Can not write body since its null, closing session");
+ session.close();
}
- session.write(body);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Can not write body since this exchange is not out capable: " + exchange);
@@ -85,4 +106,5 @@
acceptor.unbind(address);
super.doStop();
}
+
}
Modified: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java?rev=634196&r1=634195&r2=634196&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java Thu Mar 6 00:55:05 2008
@@ -30,11 +30,14 @@
import java.io.IOException;
/**
+ * To test camel-mina component using a TCP client that communicates using TCP socket communication.
+ *
* @version $Revision$
*/
public class MinaTcpWithInOutUsingPlainSocketTest extends TestCase {
protected CamelContext container = new DefaultCamelContext();
+
// use parameter sync=true to force InOut pattern of the MinaExchange
protected String uri = "mina:tcp://localhost:8888?textline=true&sync=true";
@@ -55,6 +58,25 @@
assertEquals("Hello Paris", paris);
}
+ public void testReceiveNoResponseSinceOutBodyIsNull() throws Exception {
+ String out = sendAndReceive("force-null-out-body");
+ assertNull("no data should be recieved", out);
+ }
+
+ public void testReceiveNoResponseSinceOutBodyIsNullTwice() throws Exception {
+ String out = sendAndReceive("force-null-out-body");
+ assertNull("no data should be recieved", out);
+
+ out = sendAndReceive("force-null-out-body");
+ assertNull("no data should be recieved", out);
+ }
+
+ public void testExchangeFailedOutShouldBeNull() throws Exception {
+ String out = sendAndReceive("force-exception");
+ assertTrue("out should not be the same as in when the exchange has failed", !"force-exception".equals(out));
+ assertNull("no data should be retrieved", out);
+ }
+
private String sendAndReceive(String input) throws IOException {
byte buf[] = new byte[128];
@@ -70,10 +92,14 @@
os.write((input + "\n").getBytes());
is = soc.getInputStream();
- is.read(buf);
+ int len = is.read(buf);
+ if (len == -1) {
+ // no data received
+ return null;
+ }
} finally {
- is.close();
- os.close();
+ if (is != null) is.close();
+ if (os != null) os.close();
soc.close();
}
@@ -81,7 +107,7 @@
StringBuffer sb = new StringBuffer();
for (byte b : buf) {
char ch = (char) b;
- if (ch == '\n' || b == 0) {
+ if (ch == '\n' || ch == 0) {
// newline denotes end of text (added in the end in the processor below)
break;
} else {
@@ -110,8 +136,17 @@
from(uri).process(new Processor() {
public void process(Exchange e) {
String in = e.getIn().getBody(String.class);
- // append newline at end to denote end of data for textline codec
- e.getOut().setBody("Hello " + in + "\n");
+ if ("force-null-out-body".equals(in)) {
+ // forcing a null out body
+ e.getOut().setBody(null);
+ } else if ("force-exception".equals(in)) {
+ // clear out before throwing exception
+ e.getOut().setBody(null);
+ throw new IllegalArgumentException("Forced exception");
+ } else {
+ // append newline as stop character for textline codec
+ e.getOut().setBody("Hello " + in + "\n");
+ }
}
});
}