You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/11/16 12:03:32 UTC
svn commit: r595624 - in /activemq/camel/trunk/components/camel-mina: ./
src/main/java/org/apache/camel/component/mina/
src/main/resources/META-INF/services/org/apache/camel/
src/test/java/org/apache/camel/component/mina/
Author: jstrachan
Date: Fri Nov 16 03:03:30 2007
New Revision: 595624
URL: http://svn.apache.org/viewvc?rev=595624&view=rev
Log:
patches for https://issues.apache.org/activemq/browse/CAMEL-225
Added:
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java (with props)
activemq/camel/trunk/components/camel-mina/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
- copied, changed from r594497, activemq/camel/trunk/components/camel-xmlbeans/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpTest.java
- copied, changed from r595348, activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java
activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpUsingTemplateTest.java (with props)
Modified:
activemq/camel/trunk/components/camel-mina/pom.xml
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java
Modified: activemq/camel/trunk/components/camel-mina/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/pom.xml?rev=595624&r1=595623&r2=595624&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/pom.xml (original)
+++ activemq/camel/trunk/components/camel-mina/pom.xml Fri Nov 16 03:03:30 2007
@@ -89,6 +89,7 @@
</includes>
<excludes>
<exclude>**/MinaMulticastTest.*</exclude>
+ <exclude>**/MinaUdpTest.*</exclude>
</excludes>
</configuration>
</plugin>
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=595624&r1=595623&r2=595624&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Fri Nov 16 03:03:30 2007
@@ -20,15 +20,25 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetEncoder;
import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
+import org.apache.camel.converter.ObjectConverter;
import org.apache.camel.impl.DefaultComponent;
+import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoSession;
import org.apache.mina.common.support.BaseIoConnectorConfig;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.nio.DatagramAcceptor;
@@ -40,11 +50,14 @@
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
+import org.apache.mina.util.CharsetUtil;
/**
* @version $Revision$
*/
public class MinaComponent extends DefaultComponent<MinaExchange> {
+ private CharsetEncoder encoder;
+
public MinaComponent() {
}
@@ -59,11 +72,14 @@
String protocol = u.getScheme();
if (protocol.equals("tcp")) {
return createSocketEndpoint(uri, u, parameters);
- } else if (protocol.equals("udp") || protocol.equals("mcast") || protocol.equals("multicast")) {
+ }
+ else if (protocol.equals("udp") || protocol.equals("mcast") || protocol.equals("multicast")) {
return createDatagramEndpoint(uri, u, parameters);
- } else if (protocol.equals("vm")) {
+ }
+ else if (protocol.equals("vm")) {
return createVmEndpoint(uri, u);
- } else {
+ }
+ else {
throw new IOException("Unrecognised MINA protocol: " + protocol + " for uri: " + uri);
}
}
@@ -82,10 +98,28 @@
// TODO customize the config via URI
SocketConnectorConfig config = new SocketConnectorConfig();
- configureCodecFactory(config, parameters);
+ configureSocketCodecFactory(config, parameters);
return new MinaEndpoint(uri, this, address, acceptor, connector, config);
}
-
+
+ protected void configureSocketCodecFactory(BaseIoConnectorConfig config, Map parameters) {
+ ProtocolCodecFactory codecFactory = getCodecFactory(parameters);
+
+ boolean textline = false;
+ if (codecFactory == null) {
+ if (parameters != null) {
+ textline = ObjectConverter.toBool(parameters.get("textline"));
+ }
+ if (textline) {
+ codecFactory = new TextLineCodecFactory();
+ }
+ else {
+ codecFactory = new ObjectSerializationCodecFactory();
+ }
+ }
+ addCodecFactory(config, codecFactory);
+ }
+
protected MinaEndpoint createDatagramEndpoint(String uri, URI connectUri, Map parameters) {
IoAcceptor acceptor = new DatagramAcceptor();
SocketAddress address = new InetSocketAddress(connectUri.getHost(), connectUri.getPort());
@@ -93,29 +127,80 @@
// TODO customize the config via URI
DatagramConnectorConfig config = new DatagramConnectorConfig();
- configureCodecFactory(config, parameters);
+
+ configureDataGramCodecFactory(config, parameters);
return new MinaEndpoint(uri, this, address, acceptor, connector, config);
}
- protected void configureCodecFactory(BaseIoConnectorConfig config, Map parameters){
- boolean textline = false;
- if (parameters != null) {
- if (parameters.containsKey("codec")) {
- String value = (String) parameters.get("codec");
- if (value.equals("textline")) {
- textline = true;
+ /**
+ * For datagrams the entire message is available as a single ByteBuffer so lets just pass those around by default
+ * and try converting whatever they payload is into ByteBuffers unless some custom converter is specified
+ */
+ protected void configureDataGramCodecFactory(BaseIoConnectorConfig config, Map parameters) {
+ ProtocolCodecFactory codecFactory = getCodecFactory(parameters);
+ if (codecFactory == null) {
+ codecFactory = new ProtocolCodecFactory() {
+ public ProtocolEncoder getEncoder() throws Exception {
+ return new ProtocolEncoder() {
+ public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
+ ByteBuffer buf = toByteBuffer(message);
+ buf.flip();
+ out.write(buf);
+ }
+
+ public void dispose(IoSession session) throws Exception {
+ }
+ };
}
- } else {
- textline = false;
+
+ public ProtocolDecoder getDecoder() throws Exception {
+ return new ProtocolDecoder() {
+ public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
+ // lets just pass the ByteBuffer in
+ out.write(in);
+ }
+
+ public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
+ }
+
+ public void dispose(IoSession session) throws Exception {
+ }
+ };
+ }
+ };
+ }
+ addCodecFactory(config, new TextLineCodecFactory());
+ }
+
+ protected ByteBuffer toByteBuffer(Object message) throws CharacterCodingException {
+ ByteBuffer answer = convertTo(ByteBuffer.class, message);
+ if (answer == null) {
+ String value = convertTo(String.class, message);
+ answer = ByteBuffer.allocate(value.length()).setAutoExpand(true);
+
+ if (value != null) {
+ if (encoder == null) {
+ encoder = CharsetUtil.getDefaultCharset().newEncoder();
+ }
+ answer.putString(value, encoder);
}
}
+ return answer;
+ }
- if (textline) {
- config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));
- } else {
- config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
+ protected ProtocolCodecFactory getCodecFactory(Map parameters) {
+ ProtocolCodecFactory codecFactory = null;
+ if (parameters != null) {
+ String codec = (String) parameters.get("codec");
+ if (codec != null) {
+ codecFactory = getCamelContext().getRegistry().lookup(codec, ProtocolCodecFactory.class);
+ }
}
+ return codecFactory;
}
+ protected void addCodecFactory(BaseIoConnectorConfig config, ProtocolCodecFactory codecFactory) {
+ config.getFilterChain().addLast("codec", new ProtocolCodecFilter(codecFactory));
+ }
}
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=595624&r1=595623&r2=595624&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Fri Nov 16 03:03:30 2007
@@ -57,7 +57,8 @@
IoHandler handler = new IoHandlerAdapter() {
@Override
public void messageReceived(IoSession session, Object object) throws Exception {
- getProcessor().process(endpoint.createExchange(session, object));
+ MinaExchange exchange = endpoint.createExchange(session, object);
+ getProcessor().process(exchange);
}
};
Added: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java?rev=595624&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java Fri Nov 16 03:03:30 2007
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+
+import org.apache.camel.Converter;
+import org.apache.camel.converter.IOConverter;
+import org.apache.mina.common.ByteBuffer;
+
+/**
+ * A set of converter methods for working with MINA types
+ *
+ * @version $Revision: 1.1 $
+ */
+@Converter
+public class MinaConverter {
+ @Converter
+ public static byte[] toByteArray(ByteBuffer buffer) {
+ // TODO is there a neater way?
+ byte[] answer = new byte[buffer.remaining()];
+ buffer.get(answer);
+ return answer;
+ }
+
+ @Converter
+ public static String toString(ByteBuffer buffer) {
+ return IOConverter.toString(toByteArray(buffer));
+ }
+
+ @Converter
+ public static InputStream toInputStream(ByteBuffer buffer) {
+ return buffer.asInputStream();
+ }
+
+ @Converter
+ public static ObjectInput toObjectInput(ByteBuffer buffer) throws IOException {
+ return IOConverter.toObjectInput(toInputStream(buffer));
+ }
+}
Propchange: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=595624&r1=595623&r2=595624&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Fri Nov 16 03:03:30 2007
@@ -16,6 +16,10 @@
*/
package org.apache.camel.component.mina;
+import java.net.SocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultProducer;
@@ -29,13 +33,9 @@
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteFuture;
-import java.net.SocketAddress;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* A {@link Producer} implementation for MINA
- *
+ *
* @version $Revision$
*/
public class MinaProducer extends DefaultProducer {
@@ -50,41 +50,44 @@
this.endpoint = endpoint;
}
- public void process(Exchange exchange) throws Exception{
+ public void process(Exchange exchange) throws Exception {
if (session == null) {
throw new IllegalStateException("Not started yet!");
}
- if (!session.isConnected()){
+ if (!session.isConnected()) {
doStart();
}
Object body = exchange.getIn().getBody();
if (body == null) {
LOG.warn("No payload for exchange: " + exchange);
- } else {
- if (ExchangeHelper.isOutCapable(exchange)){
- if (LOG.isDebugEnabled()){
- LOG.debug("Writing body : "+body);
+ }
+ else {
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing body : " + body);
}
latch = new CountDownLatch(1);
WriteFuture future = session.write(body);
future.join();
- if (!future.isWritten()){
- throw new RuntimeException("Timed out waiting for response: "+exchange);
+ if (!future.isWritten()) {
+ throw new RuntimeException("Timed out waiting for response: " + exchange);
}
latch.await(MAX_WAIT_RESPONSE, TimeUnit.MILLISECONDS);
- if (latch.getCount()==1){
- throw new RuntimeException("No response from server within "+MAX_WAIT_RESPONSE+" millisecs");
+ if (latch.getCount() == 1) {
+ throw new RuntimeException("No response from server within " + MAX_WAIT_RESPONSE + " millisecs");
}
ResponseHandler handler = (ResponseHandler) session.getHandler();
- if (handler.getCause() != null){
+ if (handler.getCause() != null) {
throw new Exception("Response Handler had an exception", handler.getCause());
- }else{
- if (LOG.isDebugEnabled()){
- LOG.debug("Handler message: "+handler.getMessage());
+ }
+ else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Handler message: " + handler.getMessage());
}
exchange.getOut().setBody(handler.getMessage());
}
- }else{
+ }
+ else {
session.write(body);
}
}
@@ -109,16 +112,17 @@
session.close().join(2000);
}
}
+
/**
* Handles response from session writes
- *
- * @author <a href="mailto:karajdaar@gmail.com">nsandhu</a>
*
+ * @author <a href="mailto:karajdaar@gmail.com">nsandhu</a>
*/
private final class ResponseHandler extends IoHandlerAdapter {
private MinaEndpoint endpoint;
private Object message;
private Throwable cause;
+
/**
* @param endpoint
*/
@@ -129,23 +133,32 @@
@Override
public void messageReceived(IoSession ioSession, Object message) throws Exception {
if (LOG.isDebugEnabled()) {
- LOG.debug("Message received: "+message);
+ LOG.debug("Message received: " + message);
}
cause = null;
this.message = message;
- latch.countDown();
+ countDown();
+ }
+
+ protected void countDown() {
+ CountDownLatch downLatch = latch;
+ if (downLatch != null) {
+ downLatch.countDown();
+ }
}
@Override
public void exceptionCaught(IoSession ioSession, Throwable cause) {
- LOG.error("Exception on receiving message from address: "+this.endpoint.getAddress()
- + " using connector: "+this.endpoint.getConnector(), cause);
+ LOG.error("Exception on receiving message from address: " + this.endpoint.getAddress()
+ + " using connector: " + this.endpoint.getConnector(), cause);
this.message = null;
this.cause = cause;
- ioSession.close();
- latch.countDown();
+ if (ioSession != null) {
+ ioSession.close();
+ }
+ countDown();
}
-
+
public Throwable getCause() {
return this.cause;
}
@@ -153,7 +166,5 @@
public Object getMessage() {
return this.message;
}
-
}
-
}
Copied: activemq/camel/trunk/components/camel-mina/src/main/resources/META-INF/services/org/apache/camel/TypeConverter (from r594497, activemq/camel/trunk/components/camel-xmlbeans/src/main/resources/META-INF/services/org/apache/camel/TypeConverter)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/resources/META-INF/services/org/apache/camel/TypeConverter?p2=activemq/camel/trunk/components/camel-mina/src/main/resources/META-INF/services/org/apache/camel/TypeConverter&p1=activemq/camel/trunk/components/camel-xmlbeans/src/main/resources/META-INF/services/org/apache/camel/TypeConverter&r1=594497&r2=595624&rev=595624&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-xmlbeans/src/main/resources/META-INF/services/org/apache/camel/TypeConverter (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/resources/META-INF/services/org/apache/camel/TypeConverter Fri Nov 16 03:03:30 2007
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.camel.converter.xmlbeans
\ No newline at end of file
+org.apache.camel.component.mina
\ No newline at end of file
Modified: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java?rev=595624&r1=595623&r2=595624&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java Fri Nov 16 03:03:30 2007
@@ -38,7 +38,7 @@
protected CamelContext container = new DefaultCamelContext();
protected CountDownLatch latch = new CountDownLatch(1);
protected Exchange receivedExchange;
- protected String uri = "mina:tcp://localhost:6321?codec=textline";
+ protected String uri = "mina:tcp://localhost:6321?textline=true";
protected Producer<Exchange> producer;
private ReverserServer server;
Copied: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpTest.java (from r595348, activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpTest.java?p2=activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpTest.java&p1=activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java&r1=595348&r2=595624&rev=595624&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpTest.java Fri Nov 16 03:03:30 2007
@@ -16,31 +16,114 @@
*/
package org.apache.camel.component.mina;
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.util.List;
+
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* @version $Revision$
*/
-public class MinaVmTest extends ContextTestSupport {
- protected String uri = "mina:vm://localhost:8080";
+public class MinaUdpTest extends ContextTestSupport {
+ private static final transient Log LOG = LogFactory.getLog(MinaUdpTest.class);
+ protected int messageCount = 3;
+ protected Thread readerThread;
+ protected int port = 4445;
+ protected boolean consume = false;
public void testMinaRoute() throws Exception {
MockEndpoint endpoint = getMockEndpoint("mock:result");
- Object body = "Hello there!";
- endpoint.expectedBodiesReceived(body);
+ endpoint.expectedMessageCount(messageCount);
+ endpoint.expectedBodiesReceived("Hello Message: 0", "Hello Message: 1", "Hello Message: 2");
- template.sendBodyAndHeader(uri, body, "cheese", 123);
+ Thread.sleep(1000);
+ sendUdpMessages();
assertMockEndpointsSatisifed();
+ List<Exchange> list = endpoint.getReceivedExchanges();
+ Exchange exchange = list.get(0);
+ Object body = exchange.getIn().getBody();
+ LOG.debug("Type: " + body.getClass().getName() + " value: " + body);
+ LOG.debug("String value: " + exchange.getIn().getBody(String.class));
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+
+ super.setUp();
+
+ if (consume) {
+ final DatagramSocket socket = new DatagramSocket(port);
+
+ readerThread = new Thread() {
+ public void run() {
+ try {
+ byte[] buffer = new byte[1024];
+ DatagramPacket incoming = new DatagramPacket(buffer, buffer.length);
+ System.out.println("starting to receive udp packets");
+ while (true) {
+ //incoming.setLength(buffer.length);
+ socket.receive(incoming);
+ byte[] data = incoming.getData();
+ System.out.println("Got data! " + data.length);
+
+ ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
+ Object value = in.readObject();
+ System.out.println("Value: " + value);
+ }
+ }
+ catch (Throwable ex) {
+ System.err.println(ex);
+ ex.printStackTrace();
+ }
+ }
+ };
+ readerThread.start();
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ protected void sendUdpMessages() throws Exception {
+ DatagramSocket socket = new DatagramSocket();
+ InetAddress address = InetAddress.getByName("127.0.0.1");
+ for (int i = 0; i < messageCount; i++) {
+/*
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(buffer);
+ out.writeObject("Hello Message: " + i);
+ out.close();
+
+ byte[] data = buffer.toByteArray();
+*/
+
+ String text = "Hello Message: " + i;
+ byte[] data = text.getBytes();
+
+ DatagramPacket packet = new DatagramPacket(data, data.length, address, port);
+ socket.send(packet);
+ Thread.sleep(1000);
+ }
+ System.out.println("Sent " + messageCount + " messages");
}
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from(uri).to("mock:result");
+ from("mina:udp://127.0.0.1:" + port).to("mock:result");
}
};
}
-}
+}
\ No newline at end of file
Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpUsingTemplateTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpUsingTemplateTest.java?rev=595624&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpUsingTemplateTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpUsingTemplateTest.java Fri Nov 16 03:03:30 2007
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 595348 $
+ */
+public class MinaUdpUsingTemplateTest extends ContextTestSupport {
+ private static final transient Log LOG = LogFactory.getLog(MinaUdpUsingTemplateTest.class);
+
+ private int messageCount = 3;
+
+ public void testMinaRoute() throws Exception {
+ MockEndpoint endpoint = getMockEndpoint("mock:result");
+ endpoint.expectedMessageCount(3);
+
+ sendUdpMessages();
+
+ assertMockEndpointsSatisifed();
+ List<Exchange> list = endpoint.getReceivedExchanges();
+ LOG.debug("Received: " + list);
+ }
+
+ protected void sendUdpMessages() throws Exception {
+ for (int i = 0; i < messageCount; i++) {
+ template.sendBody("mina:udp://127.0.0.1:4445", "Hello Message: " + i);
+ }
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("mina:udp://127.0.0.1:4445").to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpUsingTemplateTest.java
------------------------------------------------------------------------------
svn:eol-style = native