You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/09/20 07:51:56 UTC

svn commit: r577559 - in /activemq/camel/trunk/components/camel-mina/src: main/java/org/apache/camel/component/mina/ test/java/org/apache/camel/component/mina/

Author: jstrachan
Date: Wed Sep 19 22:51:55 2007
New Revision: 577559

URL: http://svn.apache.org/viewvc?rev=577559&view=rev
Log:
applied patch from Nicky Sandhu for CAMEL-133 to support InOut for MINA - great work!

Added:
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpTextlineProtocolTest.java   (with props)
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java   (with props)
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java   (with props)
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java   (with props)
Modified:
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=577559&r1=577558&r2=577559&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Wed Sep 19 22:51:55 2007
@@ -27,8 +27,10 @@
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.support.BaseIoConnectorConfig;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
+import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
 import org.apache.mina.transport.socket.nio.DatagramAcceptor;
 import org.apache.mina.transport.socket.nio.DatagramConnector;
 import org.apache.mina.transport.socket.nio.DatagramConnectorConfig;
@@ -56,9 +58,9 @@
 
         String protocol = u.getScheme();
         if (protocol.equals("tcp")) {
-            return createSocketEndpoint(uri, u);
+            return createSocketEndpoint(uri, u, parameters);
         } else if (protocol.equals("udp") || protocol.equals("mcast") || protocol.equals("multicast")) {
-            return createDatagramEndpoint(uri, u);
+            return createDatagramEndpoint(uri, u, parameters);
         } else if (protocol.equals("vm")) {
             return createVmEndpoint(uri, u);
         } else {
@@ -73,27 +75,47 @@
         return new MinaEndpoint(uri, this, address, acceptor, connector, null);
     }
 
-    protected MinaEndpoint createSocketEndpoint(String uri, URI connectUri) {
+    protected MinaEndpoint createSocketEndpoint(String uri, URI connectUri, Map parameters) {
         IoAcceptor acceptor = new SocketAcceptor();
         SocketAddress address = new InetSocketAddress(connectUri.getHost(), connectUri.getPort());
         IoConnector connector = new SocketConnector();
 
         // TODO customize the config via URI
         SocketConnectorConfig config = new SocketConnectorConfig();
-        config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
-
+        configureCodecFactory(config, parameters);
         return new MinaEndpoint(uri, this, address, acceptor, connector, config);
     }
-
-    protected MinaEndpoint createDatagramEndpoint(String uri, URI connectUri) {
+    
+    protected MinaEndpoint createDatagramEndpoint(String uri, URI connectUri, Map parameters) {
         IoAcceptor acceptor = new DatagramAcceptor();
         SocketAddress address = new InetSocketAddress(connectUri.getHost(), connectUri.getPort());
         IoConnector connector = new DatagramConnector();
 
         // TODO customize the config via URI
         DatagramConnectorConfig config = new DatagramConnectorConfig();
-        config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
+        configureCodecFactory(config, parameters);
 
         return new MinaEndpoint(uri, this, address, acceptor, connector, config);
     }
+
+    protected void configureCodecFactory(BaseIoConnectorConfig config, Map parameters){
+        boolean textline = false;
+        if (parameters != null) {
+            if (parameters.containsKey("codec")) {
+                String value = (String) parameters.get("codec");
+                if (value.equals("textline")) {
+                    textline = true;
+                }
+            } else {
+                textline = false;
+            }
+        }
+
+        if (textline) {
+            config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));
+        } else {
+            config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
+        }
+    }
+
 }

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java?rev=577559&r1=577558&r2=577559&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaExchange.java Wed Sep 19 22:51:55 2007
@@ -29,6 +29,6 @@
 public class MinaExchange extends DefaultExchange {
 
     public MinaExchange(CamelContext camelContext, ExchangePattern pattern) {
-        super(camelContext);
+        super(camelContext, pattern);
     }
 }

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=577559&r1=577558&r2=577559&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Wed Sep 19 22:51:55 2007
@@ -19,6 +19,7 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.mina.common.ConnectFuture;
@@ -26,8 +27,11 @@
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
 
 import java.net.SocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A {@link Producer} implementation for MINA
@@ -36,23 +40,53 @@
  */
 public class MinaProducer extends DefaultProducer {
     private static final transient Log LOG = LogFactory.getLog(MinaProducer.class);
+    private static final long MAX_WAIT_RESPONSE = 10000;
     private IoSession session;
     private MinaEndpoint endpoint;
+    private CountDownLatch latch;
 
     public MinaProducer(MinaEndpoint endpoint) {
         super(endpoint);
         this.endpoint = endpoint;
     }
 
-    public void process(Exchange exchange) {
+    public void process(Exchange exchange) throws Exception{
         if (session == null) {
             throw new IllegalStateException("Not started yet!");
         }
+        if (!session.isConnected()){
+            doStart();
+        }
         Object body = exchange.getIn().getBody();
         if (body == null) {
             LOG.warn("No payload for exchange: " + exchange);
         } else {
-            session.write(body);
+            if (ExchangeHelper.isOutCapable(exchange)){
+                if (LOG.isDebugEnabled()){
+                    LOG.debug("Writing body : "+body);
+                }
+                latch = new CountDownLatch(1);
+                WriteFuture future = session.write(body);
+                future.join();
+                if (!future.isWritten()){
+                    throw new RuntimeException("Timed out waiting for response: "+exchange);
+                }
+                latch.await(MAX_WAIT_RESPONSE, TimeUnit.MILLISECONDS);
+                if (latch.getCount()==1){
+                    throw new RuntimeException("No response from server within "+MAX_WAIT_RESPONSE+" millisecs");
+                }
+                ResponseHandler handler = (ResponseHandler) session.getHandler();
+                if (handler.getCause() != null){
+                    throw new Exception("Response Handler had an exception", handler.getCause());
+                }else{
+                    if (LOG.isDebugEnabled()){
+                        LOG.debug("Handler message: "+handler.getMessage());
+                    }
+                    exchange.getOut().setBody(handler.getMessage());
+                }
+            }else{
+                session.write(body);
+            }
         }
     }
 
@@ -63,13 +97,7 @@
         if (LOG.isDebugEnabled()) {
             LOG.debug("Creating connector to address: " + address + " using connector: " + connector);
         }
-        IoHandler ioHandler = new IoHandlerAdapter() {
-            @Override
-            public void messageReceived(IoSession ioSession, Object object) throws Exception {
-                super.messageReceived(ioSession, object);
-                /** TODO */
-            }
-        };
+        IoHandler ioHandler = new ResponseHandler(endpoint);
         ConnectFuture future = connector.connect(address, ioHandler, endpoint.getConfig());
         future.join();
         session = future.getSession();
@@ -81,4 +109,51 @@
             session.close().join(2000);
         }
     }
+    /**
+     * Handles response from session writes
+     * 
+     * @author <a href="mailto:karajdaar@gmail.com">nsandhu</a>
+     *
+     */
+    private final class ResponseHandler extends IoHandlerAdapter {
+        private MinaEndpoint endpoint;
+        private Object message;
+        private Throwable cause;
+        /**
+         * @param endpoint
+         */
+        private ResponseHandler(MinaEndpoint endpoint) {
+            this.endpoint = endpoint;
+        }
+
+        @Override
+        public void messageReceived(IoSession ioSession, Object message) throws Exception {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Message received: "+message);
+            }
+            cause = null;
+            this.message = message;
+            latch.countDown();
+        }
+
+        @Override
+        public void exceptionCaught(IoSession ioSession, Throwable cause) {
+            LOG.error("Exception on receiving message from address: "+this.endpoint.getAddress()
+                        + " using connector: "+this.endpoint.getConnector(), cause);
+            this.message = null;
+            this.cause = cause;
+            ioSession.close();
+            latch.countDown();
+        }
+        
+        public Throwable getCause() {
+            return this.cause;
+        }
+
+        public Object getMessage() {
+            return this.message;
+        }
+
+    }
+
 }

Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpTextlineProtocolTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpTextlineProtocolTest.java?rev=577559&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpTextlineProtocolTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpTextlineProtocolTest.java Wed Sep 19 22:51:55 2007
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+/**
+ * @version $Revision: 563665 $
+ */
+public class MinaTcpTextlineProtocolTest extends MinaVmTest {
+    @Override
+    protected void setUp() throws Exception {
+        uri = "mina:tcp://localhost:6123?codec=textline";
+        super.setUp();
+    }
+}

Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpTextlineProtocolTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java?rev=577559&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java Wed Sep 19 22:51:55 2007
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+
+/**
+ * @version $Revision: 563665 $
+ */
+public class MinaTcpWithInOutTest extends TestCase {
+    protected CamelContext container = new DefaultCamelContext();
+    protected CountDownLatch latch = new CountDownLatch(1);
+    protected Exchange receivedExchange;
+    protected String uri = "mina:tcp://localhost:6321?codec=textline";
+    protected Producer<Exchange> producer;
+    private ReverserServer server;
+
+    public void testMinaRouteWithInOut() throws Exception {
+        // now lets fire in a message
+        Endpoint<Exchange> endpoint = container.getEndpoint("direct:x");
+        Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
+        Message message = exchange.getIn();
+        String hello = "Hello!";
+        message.setBody(hello);
+        message.setHeader("cheese", 123);
+
+        producer = endpoint.createProducer();
+        producer.start();
+        producer.process(exchange);
+
+        // now lets sleep for a while
+        boolean received = latch.await(5, TimeUnit.SECONDS);
+        assertTrue("Did not receive the message!", received);
+        assertNotNull(receivedExchange.getOut());
+        assertEquals("!olleH", receivedExchange.getOut().getBody());
+    }
+    
+    @Override
+    protected void setUp() throws Exception {
+        server = new ReverserServer();
+        server.start();
+        container.addRoutes(createRouteBuilder());
+        container.start();
+    }
+
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (producer != null) {
+            producer.stop();
+        }
+        container.stop();
+        server.stop();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:x").to(uri).process(new Processor() {
+                    public void process(Exchange e) {
+                        System.out.println("Received exchange: " + e.getIn());
+                        receivedExchange = e;
+                        latch.countDown();
+                    }
+                });
+            }
+        };
+    }
+}

Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java?rev=577559&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java Wed Sep 19 22:51:55 2007
@@ -0,0 +1,50 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.camel.component.mina;
+
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+
+/**
+ * {@link IoHandler} implementation of reverser server protocol.
+ * 
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 555855 $, $Date: 2007-07-12 20:19:00 -0700 (Thu, 12 Jul 2007) $,
+ */
+public class ReverseProtocolHandler extends IoHandlerAdapter {
+    public void exceptionCaught(IoSession session, Throwable cause) {
+        cause.printStackTrace();
+        // Close connection when unexpected exception is caught.
+        session.close();
+    }
+
+    public void messageReceived(IoSession session, Object message) {
+        // Reverse reveiced string
+        String str = message.toString();
+        StringBuffer buf = new StringBuffer(str.length());
+        for (int i = str.length() - 1; i >= 0; i--) {
+            buf.append(str.charAt(i));
+        }
+
+        // and write it back.
+        session.write(buf.toString());
+    }
+}

Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java?rev=577559&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java Wed Sep 19 22:51:55 2007
@@ -0,0 +1,68 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.camel.component.mina;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.filter.LoggingFilter;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
+import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+
+/**
+ * (<b>Entry point</b>) Reverser server which reverses all text lines from
+ * clients.
+ * 
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 555855 $, $Date: 2007-07-12 20:19:00 -0700 (Thu, 12 Jul 2007) $,
+ */
+public class ReverserServer {
+    protected int port = 6321;
+    private IoAcceptor acceptor;
+
+    public void start() throws Exception {
+        acceptor = new SocketAcceptor();
+        // Prepare the configuration
+        SocketAcceptorConfig cfg = new SocketAcceptorConfig();
+        cfg.setReuseAddress(true);
+        cfg.getFilterChain().addLast("logger", new LoggingFilter());
+        cfg.getFilterChain().addLast(
+                "codec",
+                new ProtocolCodecFilter(new TextLineCodecFactory(Charset
+                        .forName("UTF-8"))));
+
+        // Bind
+        acceptor.bind(new InetSocketAddress(port),
+                new ReverseProtocolHandler(), cfg);
+
+        System.out.println("Listening on port " + port);
+    }
+    
+    public void stop() throws Exception{
+        acceptor.unbindAll();
+    }
+    
+    public int getPort(){
+        return port;
+    }
+}

Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java
------------------------------------------------------------------------------
    svn:eol-style = native