You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/13 16:52:14 UTC

svn commit: r385577 - in /incubator/activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/transport/multicast/ src/main/java/org/apache/activemq/transport/udp/ src/test/java/org/apache/activemq/transport/multicast/ src/test/java/org/apache...

Author: jstrachan
Date: Mon Mar 13 07:52:01 2006
New Revision: 385577

URL: http://svn.apache.org/viewcvs?rev=385577&view=rev
Log:
refactor of the UDP transport so that it can work with multicast using a DatagramSocket/MulticastSocket directly in addition to using a DatagramChannel (which only seems to work with UDP)

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
      - copied, changed from r385456, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java   (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/multicast/
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/multicast/MulticastTransportTest.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/project.xml
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java

Modified: incubator/activemq/trunk/activemq-core/project.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/project.xml?rev=385577&r1=385576&r2=385577&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/project.xml (original)
+++ incubator/activemq/trunk/activemq-core/project.xml Mon Mar 13 07:52:01 2006
@@ -363,6 +363,7 @@
                 
                 <!-- TODO FIX ME -->
                 <exclude>**/UdpSendReceiveWithTwoConnectionsTest.*</exclude>
+                <exclude>**/MulticastTransportTest.*</exclude>
             </excludes>
         </unitTest>
         <resources>

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java?rev=385577&r1=385576&r2=385577&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java Mon Mar 13 07:52:01 2006
@@ -16,12 +16,39 @@
  */
 package org.apache.activemq.transport.multicast;
 
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
+import org.apache.activemq.transport.udp.DatagramEndpoint;
 import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
 
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
 /**
- *
+ * 
  * @version $Revision$
  */
 public class MulticastDatagramHeaderMarshaller extends DatagramHeaderMarshaller {
+
+    private final String localUri;
+    private final byte[] localUriAsBytes;
+
+    public MulticastDatagramHeaderMarshaller(String localUri) {
+        this.localUri = localUri;
+        this.localUriAsBytes = localUri.getBytes();
+    }
+
+    public Endpoint createEndpoint(ByteBuffer readBuffer, SocketAddress address) {
+        int size = readBuffer.getInt();
+        byte[] data = new byte[size];
+        readBuffer.get(data);
+        return new DatagramEndpoint(new String(data), address);
+    }
+
+    public void writeHeader(Command command, ByteBuffer writeBuffer) {
+        writeBuffer.putInt(localUriAsBytes.length);
+        writeBuffer.put(localUriAsBytes);
+        super.writeHeader(command, writeBuffer);
+    }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java?rev=385577&r1=385576&r2=385577&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java Mon Mar 13 07:52:01 2006
@@ -17,12 +17,26 @@
 package org.apache.activemq.transport.multicast;
 
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.udp.CommandChannel;
+import org.apache.activemq.transport.udp.CommandDatagramChannel;
+import org.apache.activemq.transport.udp.CommandDatagramSocket;
+import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
+import org.apache.activemq.transport.udp.DefaultBufferPool;
 import org.apache.activemq.transport.udp.UdpTransport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
 import java.net.SocketAddress;
+import java.net.SocketException;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.nio.channels.DatagramChannel;
 
 /**
  * A multicast based transport.
@@ -31,22 +45,21 @@
  */
 public class MulticastTransport extends UdpTransport {
 
-    public MulticastTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException, IOException {
-        super(wireFormat, port);
-    }
+    private static final Log log = LogFactory.getLog(MulticastTransport.class);
 
-    public MulticastTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException {
-        super(wireFormat, socketAddress);
-    }
+    private static final int DEFAULT_IDLE_TIME = 5000;
+
+    private MulticastSocket socket;
+    private InetAddress mcastAddress;
+    private int mcastPort;
+    private int timeToLive = 1;
+    private boolean loopBackMode = false;
+    private long keepAliveInterval = DEFAULT_IDLE_TIME;
 
     public MulticastTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException {
         super(wireFormat, remoteLocation);
     }
 
-    public MulticastTransport(OpenWireFormat wireFormat) throws IOException {
-        super(wireFormat);
-    }
-
     protected String getProtocolName() {
         return "Multicast";
     }
@@ -54,4 +67,43 @@
     protected String getProtocolUriScheme() {
         return "multicast://";
     }
+
+    protected void bind(DatagramSocket socket, SocketAddress localAddress) throws SocketException {
+    }
+
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        super.doStop(stopper);
+        if (socket != null) {
+            try {
+                socket.leaveGroup(mcastAddress);
+            }
+            catch (IOException e) {
+                stopper.onException(this, e);
+            }
+            socket.close();
+        }
+    }
+
+    protected CommandChannel createCommandChannel() throws IOException {
+        socket = new MulticastSocket(mcastPort);
+        socket.setLoopbackMode(loopBackMode);
+        socket.setTimeToLive(timeToLive);
+
+        log.debug("Joining multicast address: " + mcastAddress);
+        socket.joinGroup(mcastAddress);
+        socket.setSoTimeout((int) keepAliveInterval);
+
+        return new CommandDatagramSocket(toString(), socket, getWireFormat(), getDatagramSize(), mcastAddress, mcastPort, createDatagramHeaderMarshaller());
+    }
+
+    protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException {
+        this.mcastAddress = InetAddress.getByName(remoteLocation.getHost());
+        this.mcastPort = remoteLocation.getPort();
+        return new InetSocketAddress(mcastAddress, mcastPort);
+    }
+
+    protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() {
+        return new MulticastDatagramHeaderMarshaller("udp://dummyHostName:" + getPort());
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=385577&r1=385576&r2=385577&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java Mon Mar 13 07:52:01 2006
@@ -16,244 +16,33 @@
  */
 package org.apache.activemq.transport.udp;
 
-import org.activeio.ByteArrayInputStream;
-import org.activeio.ByteArrayOutputStream;
 import org.apache.activemq.Service;
 import org.apache.activemq.command.Command;
-import org.apache.activemq.command.Endpoint;
-import org.apache.activemq.command.LastPartialCommand;
-import org.apache.activemq.command.PartialCommand;
-import org.apache.activemq.openwire.BooleanStream;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
 
 /**
- * A strategy for reading datagrams and de-fragmenting them together.
- * 
+ *
  * @version $Revision$
  */
-public class CommandChannel implements Service {
+public interface CommandChannel extends Service {
 
-    private static final Log log = LogFactory.getLog(CommandChannel.class);
+    public abstract Command read() throws IOException;
 
-    private final String name;
-    private DatagramChannel channel;
-    private OpenWireFormat wireFormat;
-    private ByteBufferPool bufferPool;
-    private int datagramSize = 4 * 1024;
-    private SocketAddress targetAddress;
-    private DatagramHeaderMarshaller headerMarshaller;
-
-    // reading
-    private Object readLock = new Object();
-    private ByteBuffer readBuffer;
-
-    // writing
-    private Object writeLock = new Object();
-    private ByteBuffer writeBuffer;
-    private int defaultMarshalBufferSize = 64 * 1024;
-
-    public CommandChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
-            SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) {
-        this.name = name;
-        this.channel = channel;
-        this.wireFormat = wireFormat;
-        this.bufferPool = bufferPool;
-        this.datagramSize = datagramSize;
-        this.targetAddress = targetAddress;
-        this.headerMarshaller = headerMarshaller;
-    }
-
-    public String toString() {
-        return "CommandChannel#" + name;
-    }
-
-    public void start() throws Exception {
-        bufferPool.setDefaultSize(datagramSize);
-        bufferPool.start();
-        readBuffer = bufferPool.borrowBuffer();
-        writeBuffer = bufferPool.borrowBuffer();
-    }
-
-    public void stop() throws Exception {
-        bufferPool.stop();
-    }
-
-    public Command read() throws IOException {
-        Command answer = null;
-        synchronized (readLock) {
-            readBuffer.clear();
-            SocketAddress address = channel.receive(readBuffer);
-            readBuffer.flip();
-
-            Endpoint from = headerMarshaller.createEndpoint(readBuffer, address);
-
-            int remaining = readBuffer.remaining();
-            byte[] data = new byte[remaining];
-            readBuffer.get(data);
-
-            // TODO could use a DataInput implementation that talks direct to
-            // the ByteBuffer to avoid object allocation and unnecessary
-            // buffering?
-            DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
-            answer = (Command) wireFormat.unmarshal(dataIn);
-            answer.setFrom(from);
-        }
-        if (answer != null) {
-            if (log.isDebugEnabled()) {
-                log.debug("Channel: " + name + " about to process: " + answer);
-            }
-        }
-        return answer;
-    }
-
-    public void write(Command command) throws IOException {
-        write(command, targetAddress);
-    }
-
-    public void write(Command command, SocketAddress address) throws IOException {
-        synchronized (writeLock) {
-
-            ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
-            wireFormat.marshal(command, new DataOutputStream(largeBuffer));
-            byte[] data = largeBuffer.toByteArray();
-            int size = data.length;
-
-            if (size >= datagramSize) {
-                // lets split the command up into chunks
-                int offset = 0;
-                boolean lastFragment = false;
-                for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
-                    // write the header
-                    writeBuffer.clear();
-                    headerMarshaller.writeHeader(command, writeBuffer);
-
-                    int chunkSize = writeBuffer.remaining();
-
-                    // we need to remove the amount of overhead to write the
-                    // partial command
-
-                    // lets write the flags in there
-                    BooleanStream bs = null;
-                    if (wireFormat.isTightEncodingEnabled()) {
-                        bs = new BooleanStream();
-                        bs.writeBoolean(true); // the partial data byte[] is
-                        // never null
-                    }
-
-                    // lets remove the header of the partial command
-                    // which is the byte for the type and an int for the size of
-                    // the byte[]
-                    chunkSize -= 1 // the data type
-                    + 4 // the command ID
-                    + 4; // the size of the partial data
-
-                    // the boolean flags
-                    if (bs != null) {
-                        chunkSize -= bs.marshalledSize();
-                    }
-                    else {
-                        chunkSize -= 1;
-                    }
-
-                    if (!wireFormat.isSizePrefixDisabled()) {
-                        // lets write the size of the command buffer
-                        writeBuffer.putInt(chunkSize);
-                        chunkSize -= 4;
-                    }
-
-                    lastFragment = offset + chunkSize >= length;
-                    if (chunkSize + offset > length) {
-                        chunkSize = length - offset;
-                    }
-
-                    writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE);
-
-                    if (bs != null) {
-                        bs.marshal(writeBuffer);
-                    }
-
-                    writeBuffer.putInt(command.getCommandId());
-                    if (bs == null) {
-                        writeBuffer.put((byte) 1);
-                    }
-
-                    // size of byte array
-                    writeBuffer.putInt(chunkSize);
-
-                    // now the data
-                    writeBuffer.put(data, offset, chunkSize);
-
-                    offset += chunkSize;
-                    sendWriteBuffer(address);
-                }
-                
-                // now lets write the last partial command
-                command = new LastPartialCommand(command);
-                largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
-                wireFormat.marshal(command, new DataOutputStream(largeBuffer));
-                data = largeBuffer.toByteArray();
-            }
-            
-            writeBuffer.clear();
-            headerMarshaller.writeHeader(command, writeBuffer);
-
-            writeBuffer.put(data);
-
-            sendWriteBuffer(address);
-        }
-    }
-
-    // Properties
-    // -------------------------------------------------------------------------
-
-    public int getDatagramSize() {
-        return datagramSize;
-    }
+    public abstract void write(Command command) throws IOException;
+
+    public abstract void write(Command command, SocketAddress address) throws IOException;
+
+    public abstract int getDatagramSize();
 
     /**
      * Sets the default size of a datagram on the network.
      */
-    public void setDatagramSize(int datagramSize) {
-        this.datagramSize = datagramSize;
-    }
-
-    public ByteBufferPool getBufferPool() {
-        return bufferPool;
-    }
+    public abstract void setDatagramSize(int datagramSize);
 
-    /**
-     * Sets the implementation of the byte buffer pool to use
-     */
-    public void setBufferPool(ByteBufferPool bufferPool) {
-        this.bufferPool = bufferPool;
-    }
-
-    public DatagramHeaderMarshaller getHeaderMarshaller() {
-        return headerMarshaller;
-    }
-
-    public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
-        this.headerMarshaller = headerMarshaller;
-    }
-
-    // Implementation methods
-    // -------------------------------------------------------------------------
-    protected void sendWriteBuffer(SocketAddress address) throws IOException {
-        writeBuffer.flip();
-
-        if (log.isDebugEnabled()) {
-            log.debug("Channel: " + name + " sending datagram to: " + address);
-        }
-        channel.send(writeBuffer, address);
-    }
+    public abstract DatagramHeaderMarshaller getHeaderMarshaller();
+
+    public abstract void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller);
 
-}
+}
\ No newline at end of file

Copied: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java (from r385456, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java)
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java?p2=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java&p1=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java&r1=385456&r2=385577&rev=385577&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java Mon Mar 13 07:52:01 2006
@@ -40,9 +40,9 @@
  * 
  * @version $Revision$
  */
-public class CommandChannel implements Service {
+public class CommandDatagramChannel implements CommandChannel {
 
-    private static final Log log = LogFactory.getLog(CommandChannel.class);
+    private static final Log log = LogFactory.getLog(CommandDatagramChannel.class);
 
     private final String name;
     private DatagramChannel channel;
@@ -61,7 +61,7 @@
     private ByteBuffer writeBuffer;
     private int defaultMarshalBufferSize = 64 * 1024;
 
-    public CommandChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
+    public CommandDatagramChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
             SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) {
         this.name = name;
         this.channel = channel;
@@ -87,25 +87,49 @@
         bufferPool.stop();
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.activemq.transport.udp.CommandChannel#read()
+     */
     public Command read() throws IOException {
         Command answer = null;
         synchronized (readLock) {
-            readBuffer.clear();
-            SocketAddress address = channel.receive(readBuffer);
-            readBuffer.flip();
-
-            Endpoint from = headerMarshaller.createEndpoint(readBuffer, address);
-
-            int remaining = readBuffer.remaining();
-            byte[] data = new byte[remaining];
-            readBuffer.get(data);
-
-            // TODO could use a DataInput implementation that talks direct to
-            // the ByteBuffer to avoid object allocation and unnecessary
-            // buffering?
-            DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
-            answer = (Command) wireFormat.unmarshal(dataIn);
-            answer.setFrom(from);
+            while (true) {
+                readBuffer.clear();
+                SocketAddress address = channel.receive(readBuffer);
+
+                /*
+                if (address == null) {
+                    System.out.println("No address on packet: " + readBuffer);
+                    // continue;
+                }
+                */
+
+                readBuffer.flip();
+
+                if (readBuffer.limit() == 0) {
+                    //System.out.println("Empty packet!");
+                    continue;
+                }
+
+                //log.debug("buffer: " + readBuffer + " has remaining: " + readBuffer.remaining());
+
+                Endpoint from = headerMarshaller.createEndpoint(readBuffer, address);
+
+                int remaining = readBuffer.remaining();
+                byte[] data = new byte[remaining];
+                readBuffer.get(data);
+
+                // TODO could use a DataInput implementation that talks direct
+                // to
+                // the ByteBuffer to avoid object allocation and unnecessary
+                // buffering?
+                DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
+                answer = (Command) wireFormat.unmarshal(dataIn);
+                if (answer != null) {
+                    answer.setFrom(from);
+                }
+                break;
+            }
         }
         if (answer != null) {
             if (log.isDebugEnabled()) {
@@ -115,10 +139,16 @@
         return answer;
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.activemq.transport.udp.CommandChannel#write(org.apache.activemq.command.Command)
+     */
     public void write(Command command) throws IOException {
         write(command, targetAddress);
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.activemq.transport.udp.CommandChannel#write(org.apache.activemq.command.Command, java.net.SocketAddress)
+     */
     public void write(Command command, SocketAddress address) throws IOException {
         synchronized (writeLock) {
 
@@ -127,14 +157,19 @@
             byte[] data = largeBuffer.toByteArray();
             int size = data.length;
 
-            if (size >= datagramSize) {
+            writeBuffer.clear();
+            headerMarshaller.writeHeader(command, writeBuffer);
+
+            if (size >= writeBuffer.remaining()) {
                 // lets split the command up into chunks
                 int offset = 0;
                 boolean lastFragment = false;
                 for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
                     // write the header
-                    writeBuffer.clear();
-                    headerMarshaller.writeHeader(command, writeBuffer);
+                    if (fragment > 0) {
+                        writeBuffer.clear();
+                        headerMarshaller.writeHeader(command, writeBuffer);
+                    }
 
                     int chunkSize = writeBuffer.remaining();
 
@@ -195,16 +230,16 @@
                     offset += chunkSize;
                     sendWriteBuffer(address);
                 }
-                
+
                 // now lets write the last partial command
                 command = new LastPartialCommand(command);
                 largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
                 wireFormat.marshal(command, new DataOutputStream(largeBuffer));
                 data = largeBuffer.toByteArray();
+
+                writeBuffer.clear();
+                headerMarshaller.writeHeader(command, writeBuffer);
             }
-            
-            writeBuffer.clear();
-            headerMarshaller.writeHeader(command, writeBuffer);
 
             writeBuffer.put(data);
 
@@ -215,12 +250,15 @@
     // Properties
     // -------------------------------------------------------------------------
 
+    /* (non-Javadoc)
+     * @see org.apache.activemq.transport.udp.CommandChannel#getDatagramSize()
+     */
     public int getDatagramSize() {
         return datagramSize;
     }
 
-    /**
-     * Sets the default size of a datagram on the network.
+    /* (non-Javadoc)
+     * @see org.apache.activemq.transport.udp.CommandChannel#setDatagramSize(int)
      */
     public void setDatagramSize(int datagramSize) {
         this.datagramSize = datagramSize;
@@ -237,10 +275,16 @@
         this.bufferPool = bufferPool;
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.activemq.transport.udp.CommandChannel#getHeaderMarshaller()
+     */
     public DatagramHeaderMarshaller getHeaderMarshaller() {
         return headerMarshaller;
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.activemq.transport.udp.CommandChannel#setHeaderMarshaller(org.apache.activemq.transport.udp.DatagramHeaderMarshaller)
+     */
     public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
         this.headerMarshaller = headerMarshaller;
     }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java?rev=385577&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java Mon Mar 13 07:52:01 2006
@@ -0,0 +1,263 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.activeio.ByteArrayInputStream;
+import org.activeio.ByteArrayOutputStream;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
+import org.apache.activemq.command.LastPartialCommand;
+import org.apache.activemq.command.PartialCommand;
+import org.apache.activemq.openwire.BooleanStream;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+/**
+ * A strategy for reading datagrams and de-fragmenting them together.
+ * 
+ * @version $Revision$
+ */
+public class CommandDatagramSocket implements CommandChannel {
+
+    private static final Log log = LogFactory.getLog(CommandDatagramSocket.class);
+
+    private final String name;
+    private DatagramSocket channel;
+    private InetAddress targetAddress;
+    private int targetPort;
+    private OpenWireFormat wireFormat;
+    private int datagramSize = 4 * 1024;
+    private DatagramHeaderMarshaller headerMarshaller;
+
+    // reading
+    private Object readLock = new Object();
+
+    // writing
+    private Object writeLock = new Object();
+
+
+    public CommandDatagramSocket(String name, DatagramSocket channel, OpenWireFormat wireFormat, int datagramSize, InetAddress targetAddress, int targetPort,
+            DatagramHeaderMarshaller headerMarshaller) {
+        this.name = name;
+        this.channel = channel;
+        this.wireFormat = wireFormat;
+        this.datagramSize = datagramSize;
+        this.targetAddress = targetAddress;
+        this.targetPort = targetPort;
+        this.headerMarshaller = headerMarshaller;
+    }
+
+    public String toString() {
+        return "CommandChannel#" + name;
+    }
+
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+    }
+
+    public Command read() throws IOException {
+        Command answer = null;
+        Endpoint from = null;
+        synchronized (readLock) {
+            while (true) {
+                DatagramPacket datagram = createDatagramPacket();
+                channel.receive(datagram);
+
+                // TODO could use a DataInput implementation that talks direct
+                // to the byte[] to avoid object allocation
+                DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData()));
+
+                from = headerMarshaller.createEndpoint(datagram, dataIn);
+                answer = (Command) wireFormat.unmarshal(dataIn);
+                break;
+            }
+        }
+        if (answer != null) {
+            answer.setFrom(from);
+            
+            if (log.isDebugEnabled()) {
+                log.debug("Channel: " + name + " about to process: " + answer);
+            }
+        }
+        return answer;
+    }
+
+    public void write(Command command) throws IOException {
+        write(command, targetAddress, targetPort);
+    }
+
+    public void write(Command command, SocketAddress address) throws IOException {
+        if (address instanceof InetSocketAddress) {
+            InetSocketAddress ia = (InetSocketAddress) address;
+            write(command, ia.getAddress(), ia.getPort());
+        }
+        else {
+            write(command);
+        }
+    }
+
+    public void write(Command command, InetAddress address, int port) throws IOException {
+        synchronized (writeLock) {
+
+            ByteArrayOutputStream writeBuffer = createByteArrayOutputStream();
+            DataOutputStream dataOut = new DataOutputStream(writeBuffer);
+            headerMarshaller.writeHeader(command, dataOut);
+
+            int offset = writeBuffer.size();
+
+            wireFormat.marshal(command, dataOut);
+
+            if (remaining(writeBuffer) >= 0) {
+                sendWriteBuffer(address, port, writeBuffer);
+            }
+            else {
+                // lets split the command up into chunks
+                byte[] data = writeBuffer.toByteArray();
+                boolean lastFragment = false;
+                for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
+                    writeBuffer.reset();
+                    headerMarshaller.writeHeader(command, dataOut);
+
+                    int chunkSize = remaining(writeBuffer);
+
+                    // we need to remove the amount of overhead to write the
+                    // partial command
+
+                    // lets write the flags in there
+                    BooleanStream bs = null;
+                    if (wireFormat.isTightEncodingEnabled()) {
+                        bs = new BooleanStream();
+                        bs.writeBoolean(true); // the partial data byte[] is
+                        // never null
+                    }
+
+                    // lets remove the header of the partial command
+                    // which is the byte for the type and an int for the size of
+                    // the byte[]
+                    chunkSize -= 1 // the data type
+                    + 4 // the command ID
+                    + 4; // the size of the partial data
+
+                    // the boolean flags
+                    if (bs != null) {
+                        chunkSize -= bs.marshalledSize();
+                    }
+                    else {
+                        chunkSize -= 1;
+                    }
+
+                    if (!wireFormat.isSizePrefixDisabled()) {
+                        // lets write the size of the command buffer
+                        dataOut.writeInt(chunkSize);
+                        chunkSize -= 4;
+                    }
+
+                    lastFragment = offset + chunkSize >= length;
+                    if (chunkSize + offset > length) {
+                        chunkSize = length - offset;
+                    }
+
+                    dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE);
+
+                    if (bs != null) {
+                        bs.marshal(dataOut);
+                    }
+
+                    dataOut.writeInt(command.getCommandId());
+                    if (bs == null) {
+                        dataOut.write((byte) 1);
+                    }
+
+                    // size of byte array
+                    dataOut.writeInt(chunkSize);
+
+                    // now the data
+                    dataOut.write(data, offset, chunkSize);
+
+                    offset += chunkSize;
+                    sendWriteBuffer(address, port, writeBuffer);
+                }
+
+                // now lets write the last partial command
+                command = new LastPartialCommand(command);
+
+                writeBuffer.reset();
+                headerMarshaller.writeHeader(command, dataOut);
+                wireFormat.marshal(command, dataOut);
+
+                sendWriteBuffer(address, port, writeBuffer);
+            }
+        }
+    }
+
+    // Properties
+    // -------------------------------------------------------------------------
+
+    public int getDatagramSize() {
+        return datagramSize;
+    }
+
+    /**
+     * Sets the default size of a datagram on the network.
+     */
+    public void setDatagramSize(int datagramSize) {
+        this.datagramSize = datagramSize;
+    }
+
+    public DatagramHeaderMarshaller getHeaderMarshaller() {
+        return headerMarshaller;
+    }
+
+    public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
+        this.headerMarshaller = headerMarshaller;
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+    protected void sendWriteBuffer(InetAddress address, int port, ByteArrayOutputStream writeBuffer) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Channel: " + name + " sending datagram to: " + address);
+        }
+        byte[] data = writeBuffer.toByteArray();
+        DatagramPacket packet = new DatagramPacket(data, 0, data.length, address, port);
+        channel.send(packet);
+    }
+
+    protected DatagramPacket createDatagramPacket() {
+        return new DatagramPacket(new byte[datagramSize], datagramSize);
+    }
+
+    protected int remaining(ByteArrayOutputStream buffer) {
+        return datagramSize - buffer.size();
+    }
+
+    protected ByteArrayOutputStream createByteArrayOutputStream() {
+        return new ByteArrayOutputStream(datagramSize);
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java?rev=385577&r1=385576&r2=385577&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java Mon Mar 13 07:52:01 2006
@@ -18,6 +18,7 @@
 
 import org.apache.activemq.command.BaseEndpoint;
 
+import java.net.InetAddress;
 import java.net.SocketAddress;
 
 /**
@@ -36,5 +37,5 @@
     public SocketAddress getAddress() {
         return address;
     }
-
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java?rev=385577&r1=385576&r2=385577&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java Mon Mar 13 07:52:01 2006
@@ -17,9 +17,14 @@
 package org.apache.activemq.transport.udp;
 
 
+import org.activeio.ByteArrayOutputStream;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Endpoint;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 
@@ -36,6 +41,11 @@
         return new DatagramEndpoint(address.toString(), address);
     }
 
+    public Endpoint createEndpoint(DatagramPacket datagram, DataInputStream dataIn) {
+        SocketAddress address = datagram.getSocketAddress();
+        return new DatagramEndpoint(address.toString(), address);
+    }
+
     public void writeHeader(Command command, ByteBuffer writeBuffer) {
         /*
         writeBuffer.putLong(command.getCounter());
@@ -45,4 +55,8 @@
         writeBuffer.put(flags);
         */
     }
+
+    public void writeHeader(Command command, DataOutputStream dataOut) {
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=385577&r1=385576&r2=385577&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Mon Mar 13 07:52:01 2006
@@ -33,6 +33,7 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.net.SocketException;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.channels.AsynchronousCloseException;
@@ -129,6 +130,17 @@
                 doConsume(command);
             }
             catch (AsynchronousCloseException e) {
+                // DatagramChannel closed
+                try {
+                    stop();
+                }
+                catch (Exception e2) {
+                    log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
+                }
+            }
+            catch (SocketException e) {
+                // DatagramSocket closed
+                log.debug("Socket closed: " + e, e);
                 try {
                     stop();
                 }
@@ -137,6 +149,7 @@
                 }
             }
             catch (Exception e) {
+                System.out.println("Caught exception of type: " + e.getClass());
                 e.printStackTrace();
                 try {
                     stop();
@@ -187,12 +200,12 @@
         return maxInactivityDuration;
     }
 
-    public DatagramChannel getChannel() {
-        return channel;
+    public int getDatagramSize() {
+        return datagramSize;
     }
 
-    public void setChannel(DatagramChannel channel) {
-        this.channel = channel;
+    public void setDatagramSize(int datagramSize) {
+        this.datagramSize = datagramSize;
     }
 
     /**
@@ -222,7 +235,7 @@
     /**
      * Sets the implementation of the command channel to use.
      */
-    public void setCommandChannel(CommandChannel commandChannel) {
+    public void setCommandChannel(CommandDatagramChannel commandChannel) {
         this.commandChannel = commandChannel;
     }
 
@@ -290,19 +303,20 @@
     }
 
     protected void doStart() throws Exception {
-        SocketAddress localAddress = new InetSocketAddress(port);
+        commandChannel = createCommandChannel();
+        commandChannel.start();
+
+        super.doStart();
+    }
+
+    protected CommandChannel createCommandChannel() throws IOException {
+        SocketAddress localAddress = createLocalAddress();
         channel = DatagramChannel.open();
-        channel.configureBlocking(true);
 
-        // TODO
-        // connect to default target address to avoid security checks each time
-        // channel = channel.connect(targetAddress);
+        channel = connect(channel, targetAddress);
 
         DatagramSocket socket = channel.socket();
-        if (log.isDebugEnabled()) {
-            log.debug("Binding to address: " + localAddress);
-        }
-        socket.bind(localAddress);
+        bind(socket, localAddress);
         if (port == 0) {
             port = socket.getLocalPort();
         }
@@ -310,10 +324,28 @@
         if (bufferPool == null) {
             bufferPool = new DefaultBufferPool();
         }
-        commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller());
-        commandChannel.start();
+        return new CommandDatagramChannel(toString(), channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller());
+    }
 
-        super.doStart();
+    protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException {
+        channel.configureBlocking(true);
+        
+        if (log.isDebugEnabled()) {
+            log.debug("Binding to address: " + localAddress);
+        }
+        socket.bind(localAddress);
+    }
+
+    protected DatagramChannel connect(DatagramChannel channel, SocketAddress targetAddress2) throws IOException {
+        // TODO
+        // connect to default target address to avoid security checks each time
+        // channel = channel.connect(targetAddress);
+        
+        return channel;
+    }
+
+    protected SocketAddress createLocalAddress() {
+        return new InetSocketAddress(port);
     }
 
     protected void doStop(ServiceStopper stopper) throws Exception {
@@ -332,5 +364,13 @@
 
     protected String getProtocolUriScheme() {
         return "udp://";
+    }
+
+    protected SocketAddress getTargetAddress() {
+        return targetAddress;
+    }
+
+    public void setCommandChannel(CommandChannel commandChannel) {
+        this.commandChannel = commandChannel;
     }
 }

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/multicast/MulticastTransportTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/multicast/MulticastTransportTest.java?rev=385577&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/multicast/MulticastTransportTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/multicast/MulticastTransportTest.java Mon Mar 13 07:52:01 2006
@@ -0,0 +1,55 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.multicast;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.CommandJoiner;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.udp.UdpTransport;
+import org.apache.activemq.transport.udp.UdpTransportTest;
+
+import java.net.URI;
+
+/**
+ *
+ * @version $Revision$
+ */
+public class MulticastTransportTest extends UdpTransportTest {
+    
+    private String multicastURI = "multicast://224.1.2.3:6255";
+    
+
+    protected Transport createProducer() throws Exception {
+        System.out.println("Producer using URI: " + multicastURI);
+        
+        // we are not using the TransportFactory as this assumes that
+        // transports talk to a server using a WireFormat Negotiation step
+        // rather than talking directly to each other
+        
+        OpenWireFormat wireFormat = createWireFormat();
+        MulticastTransport transport = new MulticastTransport(wireFormat, new URI(multicastURI));
+        return new CommandJoiner(transport, wireFormat);
+    }
+
+    protected Transport createConsumer() throws Exception {
+        OpenWireFormat wireFormat = createWireFormat();
+        MulticastTransport transport = new MulticastTransport(wireFormat, new URI(multicastURI));
+        return new CommandJoiner(transport, wireFormat);
+    }
+
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/multicast/MulticastTransportTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/multicast/MulticastTransportTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/multicast/MulticastTransportTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java?rev=385577&r1=385576&r2=385577&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java Mon Mar 13 07:52:01 2006
@@ -155,9 +155,11 @@
         producer = createProducer();
         producer.setTransportListener(new TransportListener() {
             public void onCommand(Command command) {
+                System.out.println("Producer received: " + command);
             }
 
             public void onException(IOException error) {
+                System.out.println("Producer exception: " + error);
             }
 
             public void transportInterupted() {