You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2008/03/05 17:00:35 UTC
svn commit: r633888 - in /activemq/camel/trunk/components/camel-mina/src:
main/java/org/apache/camel/component/mina/
test/java/org/apache/camel/component/mina/
Author: hadrian
Date: Wed Mar 5 08:00:24 2008
New Revision: 633888
URL: http://svn.apache.org/viewvc?rev=633888&view=rev
Log:
CAMEL-362.
Modified:
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=633888&r1=633887&r2=633888&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Wed Mar 5 08:00:24 2008
@@ -90,7 +90,7 @@
IoAcceptor acceptor = new VmPipeAcceptor();
SocketAddress address = new VmPipeAddress(connectUri.getPort());
IoConnector connector = new VmPipeConnector();
- return new MinaEndpoint(uri, this, address, acceptor, connector, null);
+ return new MinaEndpoint(uri, this, address, acceptor, connector, null, false);
}
protected MinaEndpoint createSocketEndpoint(String uri, URI connectUri, Map parameters) {
@@ -101,7 +101,10 @@
// TODO customize the config via URI
SocketConnectorConfig config = new SocketConnectorConfig();
configureSocketCodecFactory(config, parameters);
- MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, connector, config);
+
+ boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
+
+ MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, connector, config, lazySessionCreation);
boolean sync = ObjectConverter.toBool(parameters.get("sync"));
if (sync) {
@@ -141,7 +144,9 @@
configureDataGramCodecFactory(config, parameters);
- return new MinaEndpoint(uri, this, address, acceptor, connector, config);
+ boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
+
+ return new MinaEndpoint(uri, this, address, acceptor, connector, config, lazySessionCreation);
}
/**
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?rev=633888&r1=633887&r2=633888&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java Wed Mar 5 08:00:24 2008
@@ -36,13 +36,15 @@
private final SocketAddress address;
private final IoConnector connector;
private final IoServiceConfig config;
+ private final boolean lazySessionCreation;
- public MinaEndpoint(String endpointUri, MinaComponent component, SocketAddress address, IoAcceptor acceptor, IoConnector connector, IoServiceConfig config) {
+ public MinaEndpoint(String endpointUri, MinaComponent component, SocketAddress address, IoAcceptor acceptor, IoConnector connector, IoServiceConfig config, boolean lazySessionCreation) {
super(endpointUri, component);
this.config = config;
this.address = address;
this.acceptor = acceptor;
this.connector = connector;
+ this.lazySessionCreation = lazySessionCreation;
}
public Producer<MinaExchange> createProducer() throws Exception {
@@ -83,6 +85,10 @@
return config;
}
+ public boolean getLazySessionCreation() {
+ return lazySessionCreation;
+ }
+
public boolean isSingleton() {
return true;
}
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=633888&r1=633887&r2=633888&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Wed Mar 5 08:00:24 2008
@@ -44,24 +44,26 @@
private IoSession session;
private MinaEndpoint endpoint;
private CountDownLatch latch;
+ private boolean lazySessionCreation;
public MinaProducer(MinaEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
+ this.lazySessionCreation = this.endpoint.getLazySessionCreation();
}
public void process(Exchange exchange) throws Exception {
- if (session == null) {
+ if (session == null && !lazySessionCreation) {
throw new IllegalStateException("Not started yet!");
}
- if (!session.isConnected()) {
- doStart();
+ if (session == null || !session.isConnected()) {
+ openConnection();
}
+
Object body = exchange.getIn().getBody();
if (body == null) {
LOG.warn("No payload for exchange: " + exchange);
- }
- else {
+ } else {
if (ExchangeHelper.isOutCapable(exchange)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Writing body : " + body);
@@ -70,24 +72,24 @@
WriteFuture future = session.write(body);
future.join();
if (!future.isWritten()) {
- throw new RuntimeException("Timed out waiting for response: " + exchange);
+ throw new RuntimeException(
+ "Timed out waiting for response: " + exchange);
}
latch.await(MAX_WAIT_RESPONSE, TimeUnit.MILLISECONDS);
if (latch.getCount() == 1) {
- throw new RuntimeException("No response from server within " + MAX_WAIT_RESPONSE + " millisecs");
+ throw new RuntimeException("No response from server within "
+ + MAX_WAIT_RESPONSE + " millisecs");
}
- ResponseHandler handler = (ResponseHandler) session.getHandler();
+ ResponseHandler handler = (ResponseHandler)session.getHandler();
if (handler.getCause() != null) {
throw new Exception("Response Handler had an exception", handler.getCause());
- }
- else {
+ } else {
if (LOG.isDebugEnabled()) {
LOG.debug("Handler message: " + handler.getMessage());
}
exchange.getOut().setBody(handler.getMessage());
}
- }
- else {
+ } else {
session.write(body);
}
}
@@ -95,6 +97,19 @@
@Override
protected void doStart() throws Exception {
+ if (!lazySessionCreation) {
+ openConnection();
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (session != null) {
+ session.close().join(2000);
+ }
+ }
+
+ private void openConnection() {
SocketAddress address = endpoint.getAddress();
IoConnector connector = endpoint.getConnector();
if (LOG.isDebugEnabled()) {
@@ -104,13 +119,6 @@
ConnectFuture future = connector.connect(address, ioHandler, endpoint.getConfig());
future.join();
session = future.getSession();
- }
-
- @Override
- protected void doStop() throws Exception {
- if (session != null) {
- session.close().join(2000);
- }
}
/**
Modified: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java?rev=633888&r1=633887&r2=633888&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java Wed Mar 5 08:00:24 2008
@@ -35,14 +35,23 @@
* @version $Revision$
*/
public class MinaTcpWithInOutTest extends TestCase {
- protected CamelContext container = new DefaultCamelContext();
- protected CountDownLatch latch = new CountDownLatch(1);
+
+ protected String uri;
protected Exchange receivedExchange;
- protected String uri = "mina:tcp://localhost:6321?textline=true";
- protected Producer<Exchange> producer;
- private ReverserServer server;
-
+ protected CountDownLatch latch;
+ protected CamelContext container;
+
public void testMinaRouteWithInOut() throws Exception {
+ container = new DefaultCamelContext();
+ latch = new CountDownLatch(1);
+ uri = "mina:tcp://localhost:6321?textline=true";
+ Producer<Exchange> producer;
+ ReverserServer server;
+ server = new ReverserServer();
+ server.start();
+ container.addRoutes(createRouteBuilder());
+ container.start();
+
// now lets fire in a message
Endpoint<Exchange> endpoint = container.getEndpoint("direct:x");
Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
@@ -60,24 +69,56 @@
assertTrue("Did not receive the message!", received);
assertNotNull(receivedExchange.getIn());
assertEquals("!olleH", receivedExchange.getIn().getBody());
+
+ if (producer != null) {
+ producer.stop();
+ }
+ container.stop();
+ server.stop();
}
- @Override
- protected void setUp() throws Exception {
- server = new ReverserServer();
- server.start();
+ public void testMinaRouteWithInOutLazy() throws Exception {
+ container = new DefaultCamelContext();
+ latch = new CountDownLatch(1);
+ uri = "mina:tcp://localhost:6321?textline=true&lazySessionCreation=true";
+ Producer<Exchange> producer;
container.addRoutes(createRouteBuilder());
container.start();
- }
+ ReverserServer server; //The server is activated after Camel to check if the lazyness is working
+ server = new ReverserServer();
+ server.start();
+
+ // now lets fire in a message
+ Endpoint<Exchange> endpoint = container.getEndpoint("direct:x");
+ Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
+ Message message = exchange.getIn();
+ String hello = "Hello!";
+ message.setBody(hello);
+ message.setHeader("cheese", 123);
+ producer = endpoint.createProducer();
+ producer.start();
+ producer.process(exchange);
+
+ // now lets sleep for a while
+ boolean received = latch.await(5, TimeUnit.SECONDS);
+ assertTrue("Did not receive the message!", received);
+ assertNotNull(receivedExchange.getIn());
+ assertEquals("!olleH", receivedExchange.getIn().getBody());
- @Override
- protected void tearDown() throws Exception {
if (producer != null) {
producer.stop();
}
container.stop();
server.stop();
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
}
protected RouteBuilder createRouteBuilder() {