You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/03/09 06:06:42 UTC
svn commit: r635171 - in /activemq/camel/trunk/components/camel-mina/src:
main/java/org/apache/camel/component/mina/
test/java/org/apache/camel/component/mina/
Author: ningjiang
Date: Sat Mar 8 21:06:41 2008
New Revision: 635171
URL: http://svn.apache.org/viewvc?rev=635171&view=rev
Log:
CAMEL-375 Applied the patch with thanks to Claus, also changed the MinaProducer's default time out to 30 sec
Added:
activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTest.java (with props)
activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java (with props)
Modified:
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=635171&r1=635170&r2=635171&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Sat Mar 8 21:06:41 2008
@@ -42,6 +42,7 @@
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
+import org.apache.mina.filter.LoggingFilter;
import org.apache.mina.transport.socket.nio.DatagramAcceptor;
import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
import org.apache.mina.transport.socket.nio.DatagramConnector;
@@ -103,17 +104,14 @@
// connector config
SocketConnectorConfig connectorConfig = new SocketConnectorConfig();
configureSocketCodecFactory(connectorConfig, parameters);
- // TODO: verbose logging from Mina should use our logger instead of MINA INFO logger
- //connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
+ connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
// acceptor connectorConfig
SocketAcceptorConfig acceptorConfig = new SocketAcceptorConfig();
configureSocketCodecFactory(acceptorConfig, parameters);
acceptorConfig.setReuseAddress(true);
acceptorConfig.setDisconnectOnUnbind(true);
- // TODO: verbose logging from Mina should use our logger instead of MINA INFO logger
- //acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
-
+ acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
@@ -154,15 +152,13 @@
DatagramConnectorConfig connectorConfig = new DatagramConnectorConfig();
configureDataGramCodecFactory(connectorConfig, parameters);
- // TODO: verbose logging from Mina should use our logger instead of MINA INFO logger
- //connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
+ connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
DatagramAcceptorConfig acceptorConfig = new DatagramAcceptorConfig();
configureDataGramCodecFactory(acceptorConfig, parameters);
acceptorConfig.setDisconnectOnUnbind(true);
// reuse address is default true for datagram
- // TODO: verbose logging from Mina should use our logger instead of MINA INFO logger
- //acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
+ acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=635171&r1=635170&r2=635171&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Sat Mar 8 21:06:41 2008
@@ -79,18 +79,24 @@
if (ExchangeHelper.isOutCapable(exchange)) {
Object body = exchange.getOut().getBody();
+ boolean failed = exchange.isFailed();
- // TODO: if exchange.isFailed() then out could potential be in - (what should we do)
-
- if (body != null) {
+ if (failed) {
+ // can not write a response since the exchange is failed and we don't know in what state the
+ // in/out messages are in so the session is closed
+ LOG.warn("Can not write body since the exchange is failed, closing session: " + exchange);
+ session.close();
+ } else if (body == null) {
+ // must close session if no data to write otherwise client will never receive a response
+ // and wait forever (if not timing out)
+ LOG.warn("Can not write body since its null, closing session: " + exchange);
+ session.close();
+ } else {
+ // we got a response to write
if (LOG.isDebugEnabled()) {
LOG.debug("Writing body: " + body);
}
session.write(body);
- } else {
- // must close session if no data to write otherwise client will never receive a response and wait forever
- LOG.warn("Can not write body since its null, closing session");
- session.close();
}
} else {
if (LOG.isDebugEnabled()) {
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?rev=635171&r1=635170&r2=635171&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java Sat Mar 8 21:06:41 2008
@@ -62,13 +62,12 @@
@Override
public MinaExchange createExchange(ExchangePattern pattern) {
- return new MinaExchange(getContext(), pattern);
+ return new MinaExchange(getContext(), pattern, null);
}
public MinaExchange createExchange(IoSession session, Object object) {
- MinaExchange exchange = new MinaExchange(getContext(), getExchangePattern());
+ MinaExchange exchange = new MinaExchange(getContext(), getExchangePattern(), session);
exchange.getIn().setBody(object);
- // TODO store session in exchange?
return exchange;
}
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java?rev=635171&r1=635170&r2=635171&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java Sat Mar 8 21:06:41 2008
@@ -20,15 +20,29 @@
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.impl.DefaultExchange;
+import org.apache.mina.common.IoSession;
/**
- * A {@link Exchange} for MINA
+ * A {@link Exchange} for Apache MINA.
*
* @version $Revision$
*/
public class MinaExchange extends DefaultExchange {
- public MinaExchange(CamelContext camelContext, ExchangePattern pattern) {
+ private IoSession session;
+
+ public MinaExchange(CamelContext camelContext, ExchangePattern pattern, IoSession session) {
super(camelContext, pattern);
+ this.session = session;
+ }
+
+ /**
+ * The associated Mina session, is <b>only</b> available for {@link MinaConsumer}.
+ *
+ * @return the Mina session.
+ */
+ public IoSession getSession() {
+ return session;
}
+
}
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=635171&r1=635170&r2=635171&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Sat Mar 8 21:06:41 2008
@@ -23,6 +23,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.CamelException;
+import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
@@ -42,7 +43,8 @@
public class MinaProducer extends DefaultProducer {
private static final transient Log LOG = LogFactory.getLog(MinaProducer.class);
// TODO: The max wait response should be configurable
- private static final long MAX_WAIT_RESPONSE = 10000;
+ // The URI parameter could be a option
+ private static final long MAX_WAIT_RESPONSE = 30000;
private IoSession session;
private MinaEndpoint endpoint;
private CountDownLatch latch;
@@ -76,13 +78,13 @@
WriteFuture future = session.write(body);
future.join();
if (!future.isWritten()) {
- throw new CamelException("Timed out waiting for response: " + exchange);
+ throw new ExchangeTimedOutException(exchange, MAX_WAIT_RESPONSE);
}
// wait for response, consider timeout
latch.await(MAX_WAIT_RESPONSE, TimeUnit.MILLISECONDS);
if (latch.getCount() == 1) {
- throw new CamelException("No response from server within " + MAX_WAIT_RESPONSE + " millisecs");
+ throw new ExchangeTimedOutException(exchange, MAX_WAIT_RESPONSE);
}
// did we get a response
@@ -155,6 +157,19 @@
CountDownLatch downLatch = latch;
if (downLatch != null) {
downLatch.countDown();
+ }
+ }
+
+ @Override
+ public void sessionClosed(IoSession session) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Session closed");
+ }
+
+ if (message == null) {
+ // session was closed but no message received. This is because the remote server had an internal error
+ // and could not return a proper response. We should count down to stop waiting for a response
+ countDown();
}
}
Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTest.java?rev=635171&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTest.java Sat Mar 8 21:06:41 2008
@@ -0,0 +1,42 @@
+package org.apache.camel.component.mina;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * To unit test that we have access to MinaExchange and the Mina session object.
+ *
+ * @version $Revision$
+ */
+public class MinaExchangeTest extends ContextTestSupport {
+
+ protected String uri = "mina:tcp://localhost:8080";
+
+ public void testMinaRoute() throws Exception {
+ MockEndpoint endpoint = getMockEndpoint("mock:result");
+ Object body = "Hello there!";
+ endpoint.expectedBodiesReceived(body);
+
+ template.sendBody(uri, body);
+
+ assertMockEndpointsSatisifed();
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from(uri).process(new Processor(){
+ public void process(Exchange exchange) throws Exception {
+ assertTrue("Should be MinaExchange", exchange instanceof MinaExchange);
+ MinaExchange me = (MinaExchange) exchange;
+ assertNotNull("IoSession should not be null", me.getSession());
+ }
+ }).to("mock:result");
+ }
+ };
+ }
+
+}
Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaExchangeTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java?rev=635171&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java Sat Mar 8 21:06:41 2008
@@ -0,0 +1,41 @@
+package org.apache.camel.component.mina;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * To unit test CAMEL-364.
+ */
+public class MinaTcpWithIoOutProcessorExceptionTest extends ContextTestSupport {
+
+ protected CamelContext container = new DefaultCamelContext();
+
+ private static final int PORT = 6334;
+ // use parameter sync=true to force InOut pattern of the MinaExchange
+ protected String uri = "mina:tcp://localhost:" + PORT + "?textline=true&sync=true";
+
+ public void testExceptionThrownInProcessor() {
+ String body = "Hello World";
+ String out = (String) template.requestBody(uri, body);
+ assertNull("Should not have sent data back", out);
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from(uri).process(new Processor() {
+ public void process(Exchange e) {
+ assertEquals("Hello World", e.getIn().getBody(String.class));
+ // simulate a problem processing the input to see if we can handle it properly
+ throw new IllegalArgumentException("Forced exception");
+ }
+ });
+ }
+ };
+ }
+
+}
Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java?rev=635171&r1=635170&r2=635171&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java Sat Mar 8 21:06:41 2008
@@ -3,7 +3,6 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.builder.RouteBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -12,8 +11,6 @@
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.DatagramPacket;
-import java.nio.CharBuffer;
-import java.nio.charset.CharsetDecoder;
/**
* To test InOut exchange for the UDP protocol.