You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/09/20 07:51:56 UTC
svn commit: r577559 - in /activemq/camel/trunk/components/camel-mina/src:
main/java/org/apache/camel/component/mina/
test/java/org/apache/camel/component/mina/
Author: jstrachan
Date: Wed Sep 19 22:51:55 2007
New Revision: 577559
URL: http://svn.apache.org/viewvc?rev=577559&view=rev
Log:
applied patch from Nicky Sandhu for CAMEL-133 to support InOut for MINA - great work!
Added:
activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpTextlineProtocolTest.java (with props)
activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java (with props)
activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java (with props)
activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java (with props)
Modified:
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=577559&r1=577558&r2=577559&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Wed Sep 19 22:51:55 2007
@@ -27,8 +27,10 @@
import org.apache.camel.impl.DefaultComponent;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.support.BaseIoConnectorConfig;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
+import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.nio.DatagramAcceptor;
import org.apache.mina.transport.socket.nio.DatagramConnector;
import org.apache.mina.transport.socket.nio.DatagramConnectorConfig;
@@ -56,9 +58,9 @@
String protocol = u.getScheme();
if (protocol.equals("tcp")) {
- return createSocketEndpoint(uri, u);
+ return createSocketEndpoint(uri, u, parameters);
} else if (protocol.equals("udp") || protocol.equals("mcast") || protocol.equals("multicast")) {
- return createDatagramEndpoint(uri, u);
+ return createDatagramEndpoint(uri, u, parameters);
} else if (protocol.equals("vm")) {
return createVmEndpoint(uri, u);
} else {
@@ -73,27 +75,47 @@
return new MinaEndpoint(uri, this, address, acceptor, connector, null);
}
- protected MinaEndpoint createSocketEndpoint(String uri, URI connectUri) {
+ protected MinaEndpoint createSocketEndpoint(String uri, URI connectUri, Map parameters) {
IoAcceptor acceptor = new SocketAcceptor();
SocketAddress address = new InetSocketAddress(connectUri.getHost(), connectUri.getPort());
IoConnector connector = new SocketConnector();
// TODO customize the config via URI
SocketConnectorConfig config = new SocketConnectorConfig();
- config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
-
+ configureCodecFactory(config, parameters);
return new MinaEndpoint(uri, this, address, acceptor, connector, config);
}
-
- protected MinaEndpoint createDatagramEndpoint(String uri, URI connectUri) {
+
+ protected MinaEndpoint createDatagramEndpoint(String uri, URI connectUri, Map parameters) {
IoAcceptor acceptor = new DatagramAcceptor();
SocketAddress address = new InetSocketAddress(connectUri.getHost(), connectUri.getPort());
IoConnector connector = new DatagramConnector();
// TODO customize the config via URI
DatagramConnectorConfig config = new DatagramConnectorConfig();
- config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
+ configureCodecFactory(config, parameters);
return new MinaEndpoint(uri, this, address, acceptor, connector, config);
}
+
+ protected void configureCodecFactory(BaseIoConnectorConfig config, Map parameters){
+ boolean textline = false;
+ if (parameters != null) {
+ if (parameters.containsKey("codec")) {
+ String value = (String) parameters.get("codec");
+ if (value.equals("textline")) {
+ textline = true;
+ }
+ } else {
+ textline = false;
+ }
+ }
+
+ if (textline) {
+ config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));
+ } else {
+ config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
+ }
+ }
+
}
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java?rev=577559&r1=577558&r2=577559&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java Wed Sep 19 22:51:55 2007
@@ -29,6 +29,6 @@
public class MinaExchange extends DefaultExchange {
public MinaExchange(CamelContext camelContext, ExchangePattern pattern) {
- super(camelContext);
+ super(camelContext, pattern);
}
}
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=577559&r1=577558&r2=577559&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Wed Sep 19 22:51:55 2007
@@ -19,6 +19,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.common.ConnectFuture;
@@ -26,8 +27,11 @@
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
import java.net.SocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* A {@link Producer} implementation for MINA
@@ -36,23 +40,53 @@
*/
public class MinaProducer extends DefaultProducer {
private static final transient Log LOG = LogFactory.getLog(MinaProducer.class);
+ private static final long MAX_WAIT_RESPONSE = 10000;
private IoSession session;
private MinaEndpoint endpoint;
+ private CountDownLatch latch;
public MinaProducer(MinaEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
}
- public void process(Exchange exchange) {
+ public void process(Exchange exchange) throws Exception{
if (session == null) {
throw new IllegalStateException("Not started yet!");
}
+ if (!session.isConnected()){
+ doStart();
+ }
Object body = exchange.getIn().getBody();
if (body == null) {
LOG.warn("No payload for exchange: " + exchange);
} else {
- session.write(body);
+ if (ExchangeHelper.isOutCapable(exchange)){
+ if (LOG.isDebugEnabled()){
+ LOG.debug("Writing body : "+body);
+ }
+ latch = new CountDownLatch(1);
+ WriteFuture future = session.write(body);
+ future.join();
+ if (!future.isWritten()){
+ throw new RuntimeException("Timed out waiting for response: "+exchange);
+ }
+ latch.await(MAX_WAIT_RESPONSE, TimeUnit.MILLISECONDS);
+ if (latch.getCount()==1){
+ throw new RuntimeException("No response from server within "+MAX_WAIT_RESPONSE+" millisecs");
+ }
+ ResponseHandler handler = (ResponseHandler) session.getHandler();
+ if (handler.getCause() != null){
+ throw new Exception("Response Handler had an exception", handler.getCause());
+ }else{
+ if (LOG.isDebugEnabled()){
+ LOG.debug("Handler message: "+handler.getMessage());
+ }
+ exchange.getOut().setBody(handler.getMessage());
+ }
+ }else{
+ session.write(body);
+ }
}
}
@@ -63,13 +97,7 @@
if (LOG.isDebugEnabled()) {
LOG.debug("Creating connector to address: " + address + " using connector: " + connector);
}
- IoHandler ioHandler = new IoHandlerAdapter() {
- @Override
- public void messageReceived(IoSession ioSession, Object object) throws Exception {
- super.messageReceived(ioSession, object);
- /** TODO */
- }
- };
+ IoHandler ioHandler = new ResponseHandler(endpoint);
ConnectFuture future = connector.connect(address, ioHandler, endpoint.getConfig());
future.join();
session = future.getSession();
@@ -81,4 +109,51 @@
session.close().join(2000);
}
}
+ /**
+ * Handles response from session writes
+ *
+ * @author <a href="mailto:karajdaar@gmail.com">nsandhu</a>
+ *
+ */
+ private final class ResponseHandler extends IoHandlerAdapter {
+ private MinaEndpoint endpoint;
+ private Object message;
+ private Throwable cause;
+ /**
+ * @param endpoint
+ */
+ private ResponseHandler(MinaEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ public void messageReceived(IoSession ioSession, Object message) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Message received: "+message);
+ }
+ cause = null;
+ this.message = message;
+ latch.countDown();
+ }
+
+ @Override
+ public void exceptionCaught(IoSession ioSession, Throwable cause) {
+ LOG.error("Exception on receiving message from address: "+this.endpoint.getAddress()
+ + " using connector: "+this.endpoint.getConnector(), cause);
+ this.message = null;
+ this.cause = cause;
+ ioSession.close();
+ latch.countDown();
+ }
+
+ public Throwable getCause() {
+ return this.cause;
+ }
+
+ public Object getMessage() {
+ return this.message;
+ }
+
+ }
+
}
Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpTextlineProtocolTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpTextlineProtocolTest.java?rev=577559&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpTextlineProtocolTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpTextlineProtocolTest.java Wed Sep 19 22:51:55 2007
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+/**
+ * @version $Revision: 563665 $
+ */
+public class MinaTcpTextlineProtocolTest extends MinaVmTest {
+ @Override
+ protected void setUp() throws Exception {
+ uri = "mina:tcp://localhost:6123?codec=textline";
+ super.setUp();
+ }
+}
Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpTextlineProtocolTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java?rev=577559&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java Wed Sep 19 22:51:55 2007
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+
+/**
+ * @version $Revision: 563665 $
+ */
+public class MinaTcpWithInOutTest extends TestCase {
+ protected CamelContext container = new DefaultCamelContext();
+ protected CountDownLatch latch = new CountDownLatch(1);
+ protected Exchange receivedExchange;
+ protected String uri = "mina:tcp://localhost:6321?codec=textline";
+ protected Producer<Exchange> producer;
+ private ReverserServer server;
+
+ public void testMinaRouteWithInOut() throws Exception {
+ // now lets fire in a message
+ Endpoint<Exchange> endpoint = container.getEndpoint("direct:x");
+ Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
+ Message message = exchange.getIn();
+ String hello = "Hello!";
+ message.setBody(hello);
+ message.setHeader("cheese", 123);
+
+ producer = endpoint.createProducer();
+ producer.start();
+ producer.process(exchange);
+
+ // now lets sleep for a while
+ boolean received = latch.await(5, TimeUnit.SECONDS);
+ assertTrue("Did not receive the message!", received);
+ assertNotNull(receivedExchange.getOut());
+ assertEquals("!olleH", receivedExchange.getOut().getBody());
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ server = new ReverserServer();
+ server.start();
+ container.addRoutes(createRouteBuilder());
+ container.start();
+ }
+
+
+ @Override
+ protected void tearDown() throws Exception {
+ if (producer != null) {
+ producer.stop();
+ }
+ container.stop();
+ server.stop();
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:x").to(uri).process(new Processor() {
+ public void process(Exchange e) {
+ System.out.println("Received exchange: " + e.getIn());
+ receivedExchange = e;
+ latch.countDown();
+ }
+ });
+ }
+ };
+ }
+}
Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java?rev=577559&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java Wed Sep 19 22:51:55 2007
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.camel.component.mina;
+
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+
+/**
+ * {@link IoHandler} implementation of reverser server protocol.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 555855 $, $Date: 2007-07-12 20:19:00 -0700 (Thu, 12 Jul 2007) $,
+ */
+public class ReverseProtocolHandler extends IoHandlerAdapter {
+ public void exceptionCaught(IoSession session, Throwable cause) {
+ cause.printStackTrace();
+ // Close connection when unexpected exception is caught.
+ session.close();
+ }
+
+ public void messageReceived(IoSession session, Object message) {
+ // Reverse reveiced string
+ String str = message.toString();
+ StringBuffer buf = new StringBuffer(str.length());
+ for (int i = str.length() - 1; i >= 0; i--) {
+ buf.append(str.charAt(i));
+ }
+
+ // and write it back.
+ session.write(buf.toString());
+ }
+}
Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java?rev=577559&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java Wed Sep 19 22:51:55 2007
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.camel.component.mina;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.filter.LoggingFilter;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
+import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+
+/**
+ * (<b>Entry point</b>) Reverser server which reverses all text lines from
+ * clients.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 555855 $, $Date: 2007-07-12 20:19:00 -0700 (Thu, 12 Jul 2007) $,
+ */
+public class ReverserServer {
+ protected int port = 6321;
+ private IoAcceptor acceptor;
+
+ public void start() throws Exception {
+ acceptor = new SocketAcceptor();
+ // Prepare the configuration
+ SocketAcceptorConfig cfg = new SocketAcceptorConfig();
+ cfg.setReuseAddress(true);
+ cfg.getFilterChain().addLast("logger", new LoggingFilter());
+ cfg.getFilterChain().addLast(
+ "codec",
+ new ProtocolCodecFilter(new TextLineCodecFactory(Charset
+ .forName("UTF-8"))));
+
+ // Bind
+ acceptor.bind(new InetSocketAddress(port),
+ new ReverseProtocolHandler(), cfg);
+
+ System.out.println("Listening on port " + port);
+ }
+
+ public void stop() throws Exception{
+ acceptor.unbindAll();
+ }
+
+ public int getPort(){
+ return port;
+ }
+}
Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java
------------------------------------------------------------------------------
svn:eol-style = native