You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/11/16 12:03:32 UTC

svn commit: r595624 - in /activemq/camel/trunk/components/camel-mina: ./ src/main/java/org/apache/camel/component/mina/ src/main/resources/META-INF/services/org/apache/camel/ src/test/java/org/apache/camel/component/mina/

Author: jstrachan
Date: Fri Nov 16 03:03:30 2007
New Revision: 595624

URL: http://svn.apache.org/viewvc?rev=595624&view=rev
Log:
patches for https://issues.apache.org/activemq/browse/CAMEL-225

Added:
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java   (with props)
    activemq/camel/trunk/components/camel-mina/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
      - copied, changed from r594497, activemq/camel/trunk/components/camel-xmlbeans/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpTest.java
      - copied, changed from r595348, activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpUsingTemplateTest.java   (with props)
Modified:
    activemq/camel/trunk/components/camel-mina/pom.xml
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java

Modified: activemq/camel/trunk/components/camel-mina/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/pom.xml?rev=595624&r1=595623&r2=595624&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/pom.xml (original)
+++ activemq/camel/trunk/components/camel-mina/pom.xml Fri Nov 16 03:03:30 2007
@@ -89,6 +89,7 @@
           </includes>
           <excludes>
             <exclude>**/MinaMulticastTest.*</exclude>
+            <exclude>**/MinaUdpTest.*</exclude>
           </excludes>
         </configuration>
       </plugin>

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=595624&r1=595623&r2=595624&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Fri Nov 16 03:03:30 2007
@@ -20,15 +20,25 @@
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.URI;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetEncoder;
 import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
+import org.apache.camel.converter.ObjectConverter;
 import org.apache.camel.impl.DefaultComponent;
+import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoSession;
 import org.apache.mina.common.support.BaseIoConnectorConfig;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
 import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
 import org.apache.mina.transport.socket.nio.DatagramAcceptor;
@@ -40,11 +50,14 @@
 import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.mina.transport.vmpipe.VmPipeConnector;
+import org.apache.mina.util.CharsetUtil;
 
 /**
  * @version $Revision$
  */
 public class MinaComponent extends DefaultComponent<MinaExchange> {
+    private CharsetEncoder encoder;
+
     public MinaComponent() {
     }
 
@@ -59,11 +72,14 @@
         String protocol = u.getScheme();
         if (protocol.equals("tcp")) {
             return createSocketEndpoint(uri, u, parameters);
-        } else if (protocol.equals("udp") || protocol.equals("mcast") || protocol.equals("multicast")) {
+        }
+        else if (protocol.equals("udp") || protocol.equals("mcast") || protocol.equals("multicast")) {
             return createDatagramEndpoint(uri, u, parameters);
-        } else if (protocol.equals("vm")) {
+        }
+        else if (protocol.equals("vm")) {
             return createVmEndpoint(uri, u);
-        } else {
+        }
+        else {
             throw new IOException("Unrecognised MINA protocol: " + protocol + " for uri: " + uri);
         }
     }
@@ -82,10 +98,28 @@
 
         // TODO customize the config via URI
         SocketConnectorConfig config = new SocketConnectorConfig();
-        configureCodecFactory(config, parameters);
+        configureSocketCodecFactory(config, parameters);
         return new MinaEndpoint(uri, this, address, acceptor, connector, config);
     }
-    
+
+    protected void configureSocketCodecFactory(BaseIoConnectorConfig config, Map parameters) {
+        ProtocolCodecFactory codecFactory = getCodecFactory(parameters);
+
+        boolean textline = false;
+        if (codecFactory == null) {
+            if (parameters != null) {
+                textline = ObjectConverter.toBool(parameters.get("textline"));
+            }
+            if (textline) {
+                codecFactory = new TextLineCodecFactory();
+            }
+            else {
+                codecFactory = new ObjectSerializationCodecFactory();
+            }
+        }
+        addCodecFactory(config, codecFactory);
+    }
+
     protected MinaEndpoint createDatagramEndpoint(String uri, URI connectUri, Map parameters) {
         IoAcceptor acceptor = new DatagramAcceptor();
         SocketAddress address = new InetSocketAddress(connectUri.getHost(), connectUri.getPort());
@@ -93,29 +127,80 @@
 
         // TODO customize the config via URI
         DatagramConnectorConfig config = new DatagramConnectorConfig();
-        configureCodecFactory(config, parameters);
+
+        configureDataGramCodecFactory(config, parameters);
 
         return new MinaEndpoint(uri, this, address, acceptor, connector, config);
     }
 
-    protected void configureCodecFactory(BaseIoConnectorConfig config, Map parameters){
-        boolean textline = false;
-        if (parameters != null) {
-            if (parameters.containsKey("codec")) {
-                String value = (String) parameters.get("codec");
-                if (value.equals("textline")) {
-                    textline = true;
+    /**
+     * For datagrams the entire message is available as a single ByteBuffer so lets just pass those around by default
+     * and try converting whatever they payload is into ByteBuffers unless some custom converter is specified
+     */
+    protected void configureDataGramCodecFactory(BaseIoConnectorConfig config, Map parameters) {
+        ProtocolCodecFactory codecFactory = getCodecFactory(parameters);
+        if (codecFactory == null) {
+            codecFactory = new ProtocolCodecFactory() {
+                public ProtocolEncoder getEncoder() throws Exception {
+                    return new ProtocolEncoder() {
+                        public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
+                            ByteBuffer buf = toByteBuffer(message);
+                            buf.flip();
+                            out.write(buf);
+                        }
+
+                        public void dispose(IoSession session) throws Exception {
+                        }
+                    };
                 }
-            } else {
-                textline = false;
+
+                public ProtocolDecoder getDecoder() throws Exception {
+                    return new ProtocolDecoder() {
+                        public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
+                            // lets just pass the ByteBuffer in
+                            out.write(in);
+                        }
+
+                        public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
+                        }
+
+                        public void dispose(IoSession session) throws Exception {
+                        }
+                    };
+                }
+            };
+        }
+        addCodecFactory(config, new TextLineCodecFactory());
+    }
+
+    protected ByteBuffer toByteBuffer(Object message) throws CharacterCodingException {
+        ByteBuffer answer = convertTo(ByteBuffer.class, message);
+        if (answer == null) {
+            String value = convertTo(String.class, message);
+            answer = ByteBuffer.allocate(value.length()).setAutoExpand(true);
+
+            if (value != null) {
+                if (encoder == null) {
+                    encoder = CharsetUtil.getDefaultCharset().newEncoder();
+                }
+                answer.putString(value, encoder);
             }
         }
+        return answer;
+    }
 
-        if (textline) {
-            config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));
-        } else {
-            config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
+    protected ProtocolCodecFactory getCodecFactory(Map parameters) {
+        ProtocolCodecFactory codecFactory = null;
+        if (parameters != null) {
+            String codec = (String) parameters.get("codec");
+            if (codec != null) {
+                codecFactory = getCamelContext().getRegistry().lookup(codec, ProtocolCodecFactory.class);
+            }
         }
+        return codecFactory;
     }
 
+    protected void addCodecFactory(BaseIoConnectorConfig config, ProtocolCodecFactory codecFactory) {
+        config.getFilterChain().addLast("codec", new ProtocolCodecFilter(codecFactory));
+    }
 }

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=595624&r1=595623&r2=595624&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Fri Nov 16 03:03:30 2007
@@ -57,7 +57,8 @@
         IoHandler handler = new IoHandlerAdapter() {
             @Override
             public void messageReceived(IoSession session, Object object) throws Exception {
-                getProcessor().process(endpoint.createExchange(session, object));
+                MinaExchange exchange = endpoint.createExchange(session, object);
+                getProcessor().process(exchange);
             }
         };
 

Added: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java?rev=595624&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java Fri Nov 16 03:03:30 2007
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+
+import org.apache.camel.Converter;
+import org.apache.camel.converter.IOConverter;
+import org.apache.mina.common.ByteBuffer;
+
+/**
+ * A set of converter methods for working with MINA types
+ *
+ * @version $Revision: 1.1 $
+ */
+@Converter
+public class MinaConverter {
+    @Converter
+    public static byte[] toByteArray(ByteBuffer buffer) {
+        // TODO is there a neater way?
+        byte[] answer = new byte[buffer.remaining()];
+        buffer.get(answer);
+        return answer;
+    }
+
+    @Converter
+    public static String toString(ByteBuffer buffer) {
+        return IOConverter.toString(toByteArray(buffer));
+    }
+
+    @Converter
+    public static InputStream toInputStream(ByteBuffer buffer) {
+        return buffer.asInputStream();
+    }
+
+    @Converter
+    public static ObjectInput toObjectInput(ByteBuffer buffer) throws IOException {
+        return IOConverter.toObjectInput(toInputStream(buffer));
+    }
+}

Propchange: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=595624&r1=595623&r2=595624&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Fri Nov 16 03:03:30 2007
@@ -16,6 +16,10 @@
  */
 package org.apache.camel.component.mina;
 
+import java.net.SocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultProducer;
@@ -29,13 +33,9 @@
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.WriteFuture;
 
-import java.net.SocketAddress;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 /**
  * A {@link Producer} implementation for MINA
- * 
+ *
  * @version $Revision$
  */
 public class MinaProducer extends DefaultProducer {
@@ -50,41 +50,44 @@
         this.endpoint = endpoint;
     }
 
-    public void process(Exchange exchange) throws Exception{
+    public void process(Exchange exchange) throws Exception {
         if (session == null) {
             throw new IllegalStateException("Not started yet!");
         }
-        if (!session.isConnected()){
+        if (!session.isConnected()) {
             doStart();
         }
         Object body = exchange.getIn().getBody();
         if (body == null) {
             LOG.warn("No payload for exchange: " + exchange);
-        } else {
-            if (ExchangeHelper.isOutCapable(exchange)){
-                if (LOG.isDebugEnabled()){
-                    LOG.debug("Writing body : "+body);
+        }
+        else {
+            if (ExchangeHelper.isOutCapable(exchange)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Writing body : " + body);
                 }
                 latch = new CountDownLatch(1);
                 WriteFuture future = session.write(body);
                 future.join();
-                if (!future.isWritten()){
-                    throw new RuntimeException("Timed out waiting for response: "+exchange);
+                if (!future.isWritten()) {
+                    throw new RuntimeException("Timed out waiting for response: " + exchange);
                 }
                 latch.await(MAX_WAIT_RESPONSE, TimeUnit.MILLISECONDS);
-                if (latch.getCount()==1){
-                    throw new RuntimeException("No response from server within "+MAX_WAIT_RESPONSE+" millisecs");
+                if (latch.getCount() == 1) {
+                    throw new RuntimeException("No response from server within " + MAX_WAIT_RESPONSE + " millisecs");
                 }
                 ResponseHandler handler = (ResponseHandler) session.getHandler();
-                if (handler.getCause() != null){
+                if (handler.getCause() != null) {
                     throw new Exception("Response Handler had an exception", handler.getCause());
-                }else{
-                    if (LOG.isDebugEnabled()){
-                        LOG.debug("Handler message: "+handler.getMessage());
+                }
+                else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Handler message: " + handler.getMessage());
                     }
                     exchange.getOut().setBody(handler.getMessage());
                 }
-            }else{
+            }
+            else {
                 session.write(body);
             }
         }
@@ -109,16 +112,17 @@
             session.close().join(2000);
         }
     }
+
     /**
      * Handles response from session writes
-     * 
-     * @author <a href="mailto:karajdaar@gmail.com">nsandhu</a>
      *
+     * @author <a href="mailto:karajdaar@gmail.com">nsandhu</a>
      */
     private final class ResponseHandler extends IoHandlerAdapter {
         private MinaEndpoint endpoint;
         private Object message;
         private Throwable cause;
+
         /**
          * @param endpoint
          */
@@ -129,23 +133,32 @@
         @Override
         public void messageReceived(IoSession ioSession, Object message) throws Exception {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Message received: "+message);
+                LOG.debug("Message received: " + message);
             }
             cause = null;
             this.message = message;
-            latch.countDown();
+            countDown();
+        }
+
+        protected void countDown() {
+            CountDownLatch downLatch = latch;
+            if (downLatch != null) {
+                downLatch.countDown();
+            }
         }
 
         @Override
         public void exceptionCaught(IoSession ioSession, Throwable cause) {
-            LOG.error("Exception on receiving message from address: "+this.endpoint.getAddress()
-                        + " using connector: "+this.endpoint.getConnector(), cause);
+            LOG.error("Exception on receiving message from address: " + this.endpoint.getAddress()
+                    + " using connector: " + this.endpoint.getConnector(), cause);
             this.message = null;
             this.cause = cause;
-            ioSession.close();
-            latch.countDown();
+            if (ioSession != null) {
+                ioSession.close();
+            }
+            countDown();
         }
-        
+
         public Throwable getCause() {
             return this.cause;
         }
@@ -153,7 +166,5 @@
         public Object getMessage() {
             return this.message;
         }
-
     }
-
 }

Copied: activemq/camel/trunk/components/camel-mina/src/main/resources/META-INF/services/org/apache/camel/TypeConverter (from r594497, activemq/camel/trunk/components/camel-xmlbeans/src/main/resources/META-INF/services/org/apache/camel/TypeConverter)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/resources/META-INF/services/org/apache/camel/TypeConverter?p2=activemq/camel/trunk/components/camel-mina/src/main/resources/META-INF/services/org/apache/camel/TypeConverter&p1=activemq/camel/trunk/components/camel-xmlbeans/src/main/resources/META-INF/services/org/apache/camel/TypeConverter&r1=594497&r2=595624&rev=595624&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-xmlbeans/src/main/resources/META-INF/services/org/apache/camel/TypeConverter (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/resources/META-INF/services/org/apache/camel/TypeConverter Fri Nov 16 03:03:30 2007
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.camel.converter.xmlbeans
\ No newline at end of file
+org.apache.camel.component.mina
\ No newline at end of file

Modified: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java?rev=595624&r1=595623&r2=595624&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java Fri Nov 16 03:03:30 2007
@@ -38,7 +38,7 @@
     protected CamelContext container = new DefaultCamelContext();
     protected CountDownLatch latch = new CountDownLatch(1);
     protected Exchange receivedExchange;
-    protected String uri = "mina:tcp://localhost:6321?codec=textline";
+    protected String uri = "mina:tcp://localhost:6321?textline=true";
     protected Producer<Exchange> producer;
     private ReverserServer server;
 

Copied: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpTest.java (from r595348, activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpTest.java?p2=activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpTest.java&p1=activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java&r1=595348&r2=595624&rev=595624&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpTest.java Fri Nov 16 03:03:30 2007
@@ -16,31 +16,114 @@
  */
 package org.apache.camel.component.mina;
 
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.util.List;
+
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision$
  */
-public class MinaVmTest extends ContextTestSupport {
-    protected String uri = "mina:vm://localhost:8080";
+public class MinaUdpTest extends ContextTestSupport {
+    private static final transient Log LOG = LogFactory.getLog(MinaUdpTest.class);
+    protected int messageCount = 3;
+    protected Thread readerThread;
+    protected int port = 4445;
+    protected boolean consume = false;
 
     public void testMinaRoute() throws Exception {
         MockEndpoint endpoint = getMockEndpoint("mock:result");
-        Object body = "Hello there!";
-        endpoint.expectedBodiesReceived(body);
+        endpoint.expectedMessageCount(messageCount);
+        endpoint.expectedBodiesReceived("Hello Message: 0", "Hello Message: 1", "Hello Message: 2");
 
-        template.sendBodyAndHeader(uri, body, "cheese", 123);
+        Thread.sleep(1000);
+        sendUdpMessages();
 
         assertMockEndpointsSatisifed();
+        List<Exchange> list = endpoint.getReceivedExchanges();
+        Exchange exchange = list.get(0);
+        Object body = exchange.getIn().getBody();
+        LOG.debug("Type: " + body.getClass().getName() + " value: " + body);
+        LOG.debug("String value: " + exchange.getIn().getBody(String.class));
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+
+        super.setUp();
+
+        if (consume) {
+            final DatagramSocket socket = new DatagramSocket(port);
+
+            readerThread = new Thread() {
+                public void run() {
+                    try {
+                        byte[] buffer = new byte[1024];
+                        DatagramPacket incoming = new DatagramPacket(buffer, buffer.length);
+                        System.out.println("starting to receive udp packets");
+                        while (true) {
+                            //incoming.setLength(buffer.length);
+                            socket.receive(incoming);
+                            byte[] data = incoming.getData();
+                            System.out.println("Got data! " + data.length);
+
+                            ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
+                            Object value = in.readObject();
+                            System.out.println("Value: " + value);
+                        }
+                    }
+                    catch (Throwable ex) {
+                        System.err.println(ex);
+                        ex.printStackTrace();
+                    }
+                }
+            };
+            readerThread.start();
+        }
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    protected void sendUdpMessages() throws Exception {
+        DatagramSocket socket = new DatagramSocket();
+        InetAddress address = InetAddress.getByName("127.0.0.1");
+        for (int i = 0; i < messageCount; i++) {
+/*
+            ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+            ObjectOutputStream out = new ObjectOutputStream(buffer);
+            out.writeObject("Hello Message: " + i);
+            out.close();
+
+            byte[] data = buffer.toByteArray();
+*/
+
+            String text = "Hello Message: " + i;
+            byte[] data = text.getBytes();
+
+            DatagramPacket packet = new DatagramPacket(data, data.length, address, port);
+            socket.send(packet);
+            Thread.sleep(1000);
+        }
+        System.out.println("Sent " + messageCount + " messages");
     }
 
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from(uri).to("mock:result");
+                from("mina:udp://127.0.0.1:" + port).to("mock:result");
             }
         };
     }
-}
+}
\ No newline at end of file

Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpUsingTemplateTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpUsingTemplateTest.java?rev=595624&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpUsingTemplateTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpUsingTemplateTest.java Fri Nov 16 03:03:30 2007
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 595348 $
+ */
+public class MinaUdpUsingTemplateTest extends ContextTestSupport {
+    private static final transient Log LOG = LogFactory.getLog(MinaUdpUsingTemplateTest.class);
+    
+    private int messageCount = 3;
+
+    public void testMinaRoute() throws Exception {
+        MockEndpoint endpoint = getMockEndpoint("mock:result");
+        endpoint.expectedMessageCount(3);
+
+        sendUdpMessages();
+
+        assertMockEndpointsSatisifed();
+        List<Exchange> list = endpoint.getReceivedExchanges();
+        LOG.debug("Received: " + list);
+    }
+
+    protected void sendUdpMessages() throws Exception {
+        for (int i = 0; i < messageCount; i++) {
+            template.sendBody("mina:udp://127.0.0.1:4445", "Hello Message: " + i);
+        }
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("mina:udp://127.0.0.1:4445").to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpUsingTemplateTest.java
------------------------------------------------------------------------------
    svn:eol-style = native