You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/07 20:49:25 UTC

svn commit: r383981 - in /incubator/activemq/trunk/activemq-core/src/main: java/org/apache/activemq/transport/udp/ resources/META-INF/services/org/apache/activemq/transport/

Author: jstrachan
Date: Tue Mar  7 11:49:23 2006
New Revision: 383981

URL: http://svn.apache.org/viewcvs?rev=383981&view=rev
Log:
added test case demonstrating UDP transport working

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/udp
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=383981&r1=383980&r2=383981&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java Tue Mar  7 11:49:23 2006
@@ -24,13 +24,14 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.Channels;
+import java.nio.channels.DatagramChannel;
 
 /**
  * A strategy for reading datagrams and de-fragmenting them together.
@@ -41,111 +42,144 @@
 
     private static final Log log = LogFactory.getLog(CommandChannel.class);
 
-    private ByteChannel channel;
+    private DatagramChannel channel;
     private OpenWireFormat wireFormat;
     private ByteBufferPool bufferPool;
     private int datagramSize = 4 * 1024;
     private DatagramReplayStrategy replayStrategy;
+    private SocketAddress targetAddress;
     private DatagramHeaderMarshaller headerMarshaller = new DatagramHeaderMarshaller();
 
     // reading
+    private Object readLock = new Object();
     private ByteBuffer readBuffer;
-    private DataInputStream dataIn;
     private CommandReadBuffer readStack;
 
     // writing
+    private Object writeLock = new Object();
     private ByteBuffer writeBuffer;
     private BooleanStream bs = new BooleanStream();
-    private DataOutputStream dataOut;
     private int largeMessageBufferSize = 128 * 1024;
     private DatagramHeader header = new DatagramHeader();
 
-    public CommandChannel(ByteChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, DatagramReplayStrategy replayStrategy) {
+    public CommandChannel(DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, DatagramReplayStrategy replayStrategy, SocketAddress targetAddress) {
         this.channel = channel;
         this.wireFormat = wireFormat;
         this.bufferPool = bufferPool;
         this.datagramSize = datagramSize;
         this.replayStrategy = replayStrategy;
+        this.targetAddress = targetAddress;
     }
 
     public void start() throws Exception {
+        //wireFormat.setPrefixPacketSize(false);
+        wireFormat.setCacheEnabled(false);
+        wireFormat.setTightEncodingEnabled(true);
+
         readStack = new CommandReadBuffer(wireFormat, replayStrategy);
         bufferPool.setDefaultSize(datagramSize);
         bufferPool.start();
         readBuffer = bufferPool.borrowBuffer();
         writeBuffer = bufferPool.borrowBuffer();
-        dataIn = new DataInputStream(Channels.newInputStream(channel));
-        dataOut = new DataOutputStream(Channels.newOutputStream(channel));
     }
 
     public void stop() throws Exception {
         bufferPool.stop();
     }
 
-    public synchronized Command read() throws IOException {
-        readBuffer.clear();
-        int read = channel.read(readBuffer);
-        DatagramHeader header = headerMarshaller.readHeader(readBuffer);
-
-        int remaining = readBuffer.remaining();
-        int size = header.getDataSize();
-        if (size > remaining) {
-            throw new IOException("Invalid command size: " + size + " when there are only: " + remaining + " byte(s) remaining");
-        }
-        else if (size < remaining) {
-            log.warn("Extra bytes in buffer. Expecting: " + size + " but has: " + remaining);
-        }
-        if (header.isPartial()) {
-            byte[] data = new byte[size];
-            readBuffer.get(data);
-            header.setPartialData(data);
-        }
-        else {
-            Command command = (Command) wireFormat.unmarshal(dataIn);
-            header.setCommand(command);
-        }
+    public Command read() throws IOException {
+        synchronized (readLock) {
+            readBuffer.clear();
+            SocketAddress address = channel.receive(readBuffer);
+            readBuffer.flip();
 
-        return readStack.read(header);
-    }
+            if (log.isDebugEnabled()) {
+                log.debug("Read a datagram from: " + address);
+            }
+            DatagramHeader header = headerMarshaller.readHeader(readBuffer);
+
+            int remaining = readBuffer.remaining();
+            int size = header.getDataSize();
+            if (size > remaining) {
+                throw new IOException("Invalid command size: " + size + " when there are only: " + remaining + " byte(s) remaining");
+            }
+            else if (size < remaining) {
+                log.warn("Extra bytes in buffer. Expecting: " + size + " but has: " + remaining);
+            }
+            if (header.isPartial()) {
+                byte[] data = new byte[size];
+                readBuffer.get(data);
+                header.setPartialData(data);
+            }
+            else {
+                byte[] data = new byte[size];
+                readBuffer.get(data);
+
+                // TODO use a DataInput implementation that talks direct to the
+                // ByteBuffer
+                DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
+                Command command = (Command) wireFormat.doUnmarshal(dataIn);
+                header.setCommand(command);
+            }
 
-    public synchronized void write(Command command) throws IOException {
-        header.incrementCounter();
-        int size = wireFormat.tightMarshalNestedObject1(command, bs);
-        if (size < datagramSize) {
-            header.setPartial(false);
-            header.setDataSize(size);
-            writeBuffer.rewind();
-            headerMarshaller.writeHeader(header, writeBuffer);
-            wireFormat.marshal(command, dataOut);
-            dataOut.flush();
-            channel.write(writeBuffer);
+            return readStack.read(header);
         }
-        else {
-            header.setPartial(true);
-            header.setComplete(false);
-
-            // lets split the command up into chunks
-            ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize);
-            wireFormat.marshal(command, new DataOutputStream(largeBuffer));
-
-            byte[] data = largeBuffer.toByteArray();
-            int offset = 0;
-            boolean lastFragment = false;
-            for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
-                // write the header
-                writeBuffer.rewind();
-                int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
-                lastFragment = offset + chunkSize >= length;
-                header.setDataSize(chunkSize);
-                header.setComplete(lastFragment);
+    }
+
+    public void write(Command command) throws IOException {
+        synchronized (writeLock) {
+            header.incrementCounter();
+            int size = wireFormat.tightMarshal1(command, bs);
+            if (size < datagramSize) {
+                header.setPartial(false);
+                header.setComplete(true);
+                header.setDataSize(size);
+                writeBuffer.clear();
                 headerMarshaller.writeHeader(header, writeBuffer);
 
-                // now the data
-                writeBuffer.put(data, offset, chunkSize);
-                offset += chunkSize;
-                channel.write(writeBuffer);
+                // TODO use a DataOutput implementation that talks direct to the
+                // ByteBuffer
+                ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+                DataOutputStream dataOut = new DataOutputStream(buffer);
+                wireFormat.tightMarshal2(command, dataOut, bs);
+                dataOut.close();
+                byte[] data = buffer.toByteArray();
+                writeBuffer.put(data);
+
+                sendWriteBuffer();
+            }
+            else {
+                header.setPartial(true);
+                header.setComplete(false);
+
+                // lets split the command up into chunks
+                ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize);
+                wireFormat.marshal(command, new DataOutputStream(largeBuffer));
+
+                byte[] data = largeBuffer.toByteArray();
+                int offset = 0;
+                boolean lastFragment = false;
+                for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
+                    // write the header
+                    writeBuffer.rewind();
+                    int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
+                    lastFragment = offset + chunkSize >= length;
+                    header.setDataSize(chunkSize);
+                    header.setComplete(lastFragment);
+                    headerMarshaller.writeHeader(header, writeBuffer);
+
+                    // now the data
+                    writeBuffer.put(data, offset, chunkSize);
+                    offset += chunkSize;
+                    sendWriteBuffer();
+                }
             }
         }
+    }
+
+    protected void sendWriteBuffer() throws IOException {
+        writeBuffer.flip();
+        channel.send(writeBuffer, targetAddress);
     }
 
     // Properties

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java?rev=383981&r1=383980&r2=383981&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java Tue Mar  7 11:49:23 2006
@@ -41,7 +41,7 @@
     private OpenWireFormat wireFormat;
     private DatagramReplayStrategy replayStrategy;
     private SortedSet headers = new TreeSet();
-    private long expectedCounter;
+    private long expectedCounter = 1;
     private ByteArrayOutputStream out = new ByteArrayOutputStream();
 
     public CommandReadBuffer(OpenWireFormat wireFormat, DatagramReplayStrategy replayStrategy) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java?rev=383981&r1=383980&r2=383981&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java Tue Mar  7 11:49:23 2006
@@ -122,8 +122,8 @@
     }
 
     public void setFlags(byte flags) {
-        partial = (flags & 0x1) == 0;
-        complete = (flags & 0x2) == 0;
+        partial = (flags & 0x1) != 0;
+        complete = (flags & 0x2) != 0;
     }
 
     public Command getCommand() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java?rev=383981&r1=383980&r2=383981&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java Tue Mar  7 11:49:23 2006
@@ -31,13 +31,16 @@
         answer.setDataSize(readBuffer.getInt());
         byte flags = readBuffer.get();
         answer.setFlags(flags);
+        //System.out.println("Read header with counter: " + answer.getCounter() + "size: " + answer.getDataSize() + " with flags: " + flags);
         return answer;
     }
 
     public void writeHeader(DatagramHeader header, ByteBuffer writeBuffer) {
         writeBuffer.putLong(header.getCounter());
         writeBuffer.putInt(header.getDataSize());
-        writeBuffer.put(header.getFlags());
+        byte flags = header.getFlags();
+        //System.out.println("Writing header with counter: " + header.getCounter() + " size: " + header.getDataSize() + " with flags: " + flags);
+        writeBuffer.put(flags);
     }
 
     public int getHeaderSize(DatagramHeader header) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java?rev=383981&r1=383980&r2=383981&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java Tue Mar  7 11:49:23 2006
@@ -18,7 +18,6 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 /**
@@ -27,12 +26,19 @@
  * 
  * @version $Revision$
  */
-public class DefaultBufferPool implements ByteBufferPool {
+public class DefaultBufferPool extends SimpleBufferPool implements ByteBufferPool {
 
-    private int defaultSize;
     private List buffers = new ArrayList();
     private Object lock = new Object();
 
+    public DefaultBufferPool() {
+        super(true);
+    }
+
+    public DefaultBufferPool(boolean useDirect) {
+        super(useDirect);
+    }
+
     public synchronized ByteBuffer borrowBuffer() {
         synchronized (lock) {
             int size = buffers.size();
@@ -40,29 +46,24 @@
                 return (ByteBuffer) buffers.remove(size - 1);
             }
         }
-        return ByteBuffer.allocateDirect(defaultSize);
+        return createBuffer();
     }
 
-    public synchronized void returnBuffer(ByteBuffer buffer) {
+    public void returnBuffer(ByteBuffer buffer) {
         synchronized (lock) {
             buffers.add(buffer);
         }
     }
 
-    public void setDefaultSize(int defaultSize) {
-        this.defaultSize = defaultSize;
+    public void start() throws Exception {
     }
 
-    public synchronized void start() throws Exception {
-    }
-
-    public synchronized void stop() throws Exception {
+    public void stop() throws Exception {
         synchronized (lock) {
             /*
-            for (Iterator iter = buffers.iterator(); iter.hasNext();) {
-                ByteBuffer buffer = (ByteBuffer) iter.next();
-            }
-            */
+             * for (Iterator iter = buffers.iterator(); iter.hasNext();) {
+             * ByteBuffer buffer = (ByteBuffer) iter.next(); }
+             */
             buffers.clear();
         }
     }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java?rev=383981&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java Tue Mar  7 11:49:23 2006
@@ -0,0 +1,77 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A simple implementation of {@link BufferPool} which does no pooling and just
+ * creates new buffers each time
+ * 
+ * @version $Revision$
+ */
+public class SimpleBufferPool implements ByteBufferPool {
+
+    private int defaultSize;
+    private boolean useDirect;
+
+    public SimpleBufferPool() {
+        this(false);
+    }
+
+    public SimpleBufferPool(boolean useDirect) {
+        this.useDirect = useDirect;
+    }
+
+    public synchronized ByteBuffer borrowBuffer() {
+        return createBuffer();
+    }
+
+    public void returnBuffer(ByteBuffer buffer) {
+    }
+
+    public void setDefaultSize(int defaultSize) {
+        this.defaultSize = defaultSize;
+    }
+
+    public boolean isUseDirect() {
+        return useDirect;
+    }
+
+    /**
+     * Sets whether direct buffers are used or not
+     */
+    public void setUseDirect(boolean useDirect) {
+        this.useDirect = useDirect;
+    }
+
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+    }
+
+    protected ByteBuffer createBuffer() {
+        if (useDirect) {
+            return ByteBuffer.allocateDirect(defaultSize);
+        }
+        else {
+            return ByteBuffer.allocate(defaultSize);
+        }
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=383981&r1=383980&r2=383981&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Tue Mar  7 11:49:23 2006
@@ -29,11 +29,14 @@
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.DatagramChannel;
 
 /**
@@ -49,30 +52,34 @@
     private ByteBufferPool bufferPool;
     private DatagramReplayStrategy replayStrategy = new ExceptionIfDroppedPacketStrategy();
     private int datagramSize = 4 * 1024;
-    private long maxInactivityDuration = 0; //30000;
-    private InetSocketAddress socketAddress;
+    private long maxInactivityDuration = 0; // 30000;
+    private InetSocketAddress targetAddress;
     private DatagramChannel channel;
     private boolean trace = false;
     private boolean useLocalHost = true;
+    private int port;
 
-    protected UdpTransport(OpenWireFormat wireFormat) {
+    protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
         this.wireFormat = wireFormat;
     }
 
     public UdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException {
         this(wireFormat);
-        this.socketAddress = createAddress(remoteLocation);
+        this.targetAddress = createAddress(remoteLocation);
     }
 
-    public UdpTransport(OpenWireFormat wireFormat, InetSocketAddress socketAddress) {
+    public UdpTransport(OpenWireFormat wireFormat, InetSocketAddress socketAddress) throws IOException {
         this(wireFormat);
-        this.socketAddress = socketAddress;
+        this.targetAddress = socketAddress;
     }
 
     /**
      * A one way asynchronous send
      */
     public void oneway(Command command) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Sending oneway from port: " + port + " to target: " + targetAddress);
+        }
         checkStarted(command);
         commandChannel.write(command);
     }
@@ -81,7 +88,7 @@
      * @return pretty print of 'this'
      */
     public String toString() {
-        return "udp://" + socketAddress;
+        return "udp://" + targetAddress + "?port=" + port;
     }
 
     /**
@@ -94,18 +101,32 @@
                 Command command = commandChannel.read();
                 doConsume(command);
             }
-            catch (SocketTimeoutException e) {
-            }
-            catch (InterruptedIOException e) {
+            /*
+             * catch (SocketTimeoutException e) { } catch
+             * (InterruptedIOException e) { }
+             */
+            catch (AsynchronousCloseException e) {
+                try {
+                    stop();
+                }
+                catch (Exception e2) {
+                    log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
+                }
             }
-            catch (IOException e) {
+            catch (Exception e) {
+                e.printStackTrace();
                 try {
                     stop();
                 }
                 catch (Exception e2) {
                     log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
                 }
-                onException(e);
+                if (e instanceof IOException) {
+                    onException((IOException) e);
+                }
+                else {
+                    log.error(e);
+                }
             }
         }
     }
@@ -124,12 +145,21 @@
         return maxInactivityDuration;
     }
 
+    public DatagramChannel getChannel() {
+        return channel;
+    }
+
+    public void setChannel(DatagramChannel channel) {
+        this.channel = channel;
+    }
+
     /**
      * Sets the maximum inactivity duration
      */
     public void setMaxInactivityDuration(long maxInactivityDuration) {
         this.maxInactivityDuration = maxInactivityDuration;
     }
+
     public boolean isUseLocalHost() {
         return useLocalHost;
     }
@@ -143,7 +173,6 @@
         this.useLocalHost = useLocalHost;
     }
 
-
     public CommandChannel getCommandChannel() {
         return commandChannel;
     }
@@ -154,7 +183,7 @@
     public void setCommandChannel(CommandChannel commandChannel) {
         this.commandChannel = commandChannel;
     }
-    
+
     public DatagramReplayStrategy getReplayStrategy() {
         return replayStrategy;
     }
@@ -166,6 +195,17 @@
         this.replayStrategy = replayStrategy;
     }
 
+    public int getPort() {
+        return port;
+    }
+
+    /**
+     * Sets the port to connect on
+     */
+    public void setPort(int port) {
+        this.port = port;
+    }
+
     
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -189,18 +229,26 @@
     }
 
     protected void doStart() throws Exception {
-        if (socketAddress != null) {
-            channel = DatagramChannel.open();
-            channel.connect(socketAddress);
-        }
-        else if (channel == null) {
-            throw new IllegalArgumentException("No channel configured");
+        SocketAddress localAddress = new InetSocketAddress(port);
+        channel = DatagramChannel.open();
+        channel.configureBlocking(true);
+
+        // TODO
+        // connect to default target address to avoid security checks each time
+        // channel = channel.connect(targetAddress);
+        
+        DatagramSocket socket = channel.socket();
+        socket.bind(localAddress);
+        if (port == 0) {
+            port = socket.getLocalPort();
         }
+        
         if (bufferPool == null) {
             bufferPool = new DefaultBufferPool();
         }
-        commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize, replayStrategy);
+        commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress);
         commandChannel.start();
+
         super.doStart();
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=383981&r1=383980&r2=383981&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java Tue Mar  7 11:49:23 2006
@@ -65,6 +65,8 @@
     public Transport configure(Transport transport, WireFormat format, Map options) {
         IntrospectionSupport.setProperties(transport, options);
         UdpTransport tcpTransport = (UdpTransport) transport;
+        
+        /*
         if (tcpTransport.isTrace()) {
             transport = new TransportLogger(transport);
         }
@@ -73,8 +75,8 @@
             transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
         }
 
-        transport = new MutexTransport(transport);
         transport = new ResponseCorrelator(transport);
+        */
         return transport;
     }
 
@@ -111,7 +113,9 @@
             return new UdpTransport(wf, location, localLocation);
         }
         */
-        return new UdpTransport((OpenWireFormat) wf, location);
+        OpenWireFormat wireFormat = (OpenWireFormat) wf;
+        wireFormat.setPrefixPacketSize(false);
+        return new UdpTransport(wireFormat, location);
     }
 
     protected ServerSocketFactory createServerSocketFactory() {

Added: incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/udp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/udp?rev=383981&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/udp (added)
+++ incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/udp Tue Mar  7 11:49:23 2006
@@ -0,0 +1 @@
+class=org.apache.activemq.transport.udp.UdpTransportFactory