You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/07 20:49:25 UTC
svn commit: r383981 - in /incubator/activemq/trunk/activemq-core/src/main:
java/org/apache/activemq/transport/udp/
resources/META-INF/services/org/apache/activemq/transport/
Author: jstrachan
Date: Tue Mar 7 11:49:23 2006
New Revision: 383981
URL: http://svn.apache.org/viewcvs?rev=383981&view=rev
Log:
added test case demonstrating UDP transport working
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java (with props)
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/udp
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=383981&r1=383980&r2=383981&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java Tue Mar 7 11:49:23 2006
@@ -24,13 +24,14 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.Channels;
+import java.nio.channels.DatagramChannel;
/**
* A strategy for reading datagrams and de-fragmenting them together.
@@ -41,111 +42,144 @@
private static final Log log = LogFactory.getLog(CommandChannel.class);
- private ByteChannel channel;
+ private DatagramChannel channel;
private OpenWireFormat wireFormat;
private ByteBufferPool bufferPool;
private int datagramSize = 4 * 1024;
private DatagramReplayStrategy replayStrategy;
+ private SocketAddress targetAddress;
private DatagramHeaderMarshaller headerMarshaller = new DatagramHeaderMarshaller();
// reading
+ private Object readLock = new Object();
private ByteBuffer readBuffer;
- private DataInputStream dataIn;
private CommandReadBuffer readStack;
// writing
+ private Object writeLock = new Object();
private ByteBuffer writeBuffer;
private BooleanStream bs = new BooleanStream();
- private DataOutputStream dataOut;
private int largeMessageBufferSize = 128 * 1024;
private DatagramHeader header = new DatagramHeader();
- public CommandChannel(ByteChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, DatagramReplayStrategy replayStrategy) {
+ public CommandChannel(DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, DatagramReplayStrategy replayStrategy, SocketAddress targetAddress) {
this.channel = channel;
this.wireFormat = wireFormat;
this.bufferPool = bufferPool;
this.datagramSize = datagramSize;
this.replayStrategy = replayStrategy;
+ this.targetAddress = targetAddress;
}
public void start() throws Exception {
+ //wireFormat.setPrefixPacketSize(false);
+ wireFormat.setCacheEnabled(false);
+ wireFormat.setTightEncodingEnabled(true);
+
readStack = new CommandReadBuffer(wireFormat, replayStrategy);
bufferPool.setDefaultSize(datagramSize);
bufferPool.start();
readBuffer = bufferPool.borrowBuffer();
writeBuffer = bufferPool.borrowBuffer();
- dataIn = new DataInputStream(Channels.newInputStream(channel));
- dataOut = new DataOutputStream(Channels.newOutputStream(channel));
}
public void stop() throws Exception {
bufferPool.stop();
}
- public synchronized Command read() throws IOException {
- readBuffer.clear();
- int read = channel.read(readBuffer);
- DatagramHeader header = headerMarshaller.readHeader(readBuffer);
-
- int remaining = readBuffer.remaining();
- int size = header.getDataSize();
- if (size > remaining) {
- throw new IOException("Invalid command size: " + size + " when there are only: " + remaining + " byte(s) remaining");
- }
- else if (size < remaining) {
- log.warn("Extra bytes in buffer. Expecting: " + size + " but has: " + remaining);
- }
- if (header.isPartial()) {
- byte[] data = new byte[size];
- readBuffer.get(data);
- header.setPartialData(data);
- }
- else {
- Command command = (Command) wireFormat.unmarshal(dataIn);
- header.setCommand(command);
- }
+ public Command read() throws IOException {
+ synchronized (readLock) {
+ readBuffer.clear();
+ SocketAddress address = channel.receive(readBuffer);
+ readBuffer.flip();
- return readStack.read(header);
- }
+ if (log.isDebugEnabled()) {
+ log.debug("Read a datagram from: " + address);
+ }
+ DatagramHeader header = headerMarshaller.readHeader(readBuffer);
+
+ int remaining = readBuffer.remaining();
+ int size = header.getDataSize();
+ if (size > remaining) {
+ throw new IOException("Invalid command size: " + size + " when there are only: " + remaining + " byte(s) remaining");
+ }
+ else if (size < remaining) {
+ log.warn("Extra bytes in buffer. Expecting: " + size + " but has: " + remaining);
+ }
+ if (header.isPartial()) {
+ byte[] data = new byte[size];
+ readBuffer.get(data);
+ header.setPartialData(data);
+ }
+ else {
+ byte[] data = new byte[size];
+ readBuffer.get(data);
+
+ // TODO use a DataInput implementation that talks direct to the
+ // ByteBuffer
+ DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
+ Command command = (Command) wireFormat.doUnmarshal(dataIn);
+ header.setCommand(command);
+ }
- public synchronized void write(Command command) throws IOException {
- header.incrementCounter();
- int size = wireFormat.tightMarshalNestedObject1(command, bs);
- if (size < datagramSize) {
- header.setPartial(false);
- header.setDataSize(size);
- writeBuffer.rewind();
- headerMarshaller.writeHeader(header, writeBuffer);
- wireFormat.marshal(command, dataOut);
- dataOut.flush();
- channel.write(writeBuffer);
+ return readStack.read(header);
}
- else {
- header.setPartial(true);
- header.setComplete(false);
-
- // lets split the command up into chunks
- ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize);
- wireFormat.marshal(command, new DataOutputStream(largeBuffer));
-
- byte[] data = largeBuffer.toByteArray();
- int offset = 0;
- boolean lastFragment = false;
- for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
- // write the header
- writeBuffer.rewind();
- int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
- lastFragment = offset + chunkSize >= length;
- header.setDataSize(chunkSize);
- header.setComplete(lastFragment);
+ }
+
+ public void write(Command command) throws IOException {
+ synchronized (writeLock) {
+ header.incrementCounter();
+ int size = wireFormat.tightMarshal1(command, bs);
+ if (size < datagramSize) {
+ header.setPartial(false);
+ header.setComplete(true);
+ header.setDataSize(size);
+ writeBuffer.clear();
headerMarshaller.writeHeader(header, writeBuffer);
- // now the data
- writeBuffer.put(data, offset, chunkSize);
- offset += chunkSize;
- channel.write(writeBuffer);
+ // TODO use a DataOutput implementation that talks direct to the
+ // ByteBuffer
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ DataOutputStream dataOut = new DataOutputStream(buffer);
+ wireFormat.tightMarshal2(command, dataOut, bs);
+ dataOut.close();
+ byte[] data = buffer.toByteArray();
+ writeBuffer.put(data);
+
+ sendWriteBuffer();
+ }
+ else {
+ header.setPartial(true);
+ header.setComplete(false);
+
+ // lets split the command up into chunks
+ ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize);
+ wireFormat.marshal(command, new DataOutputStream(largeBuffer));
+
+ byte[] data = largeBuffer.toByteArray();
+ int offset = 0;
+ boolean lastFragment = false;
+ for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
+ // write the header
+ writeBuffer.rewind();
+ int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
+ lastFragment = offset + chunkSize >= length;
+ header.setDataSize(chunkSize);
+ header.setComplete(lastFragment);
+ headerMarshaller.writeHeader(header, writeBuffer);
+
+ // now the data
+ writeBuffer.put(data, offset, chunkSize);
+ offset += chunkSize;
+ sendWriteBuffer();
+ }
}
}
+ }
+
+ protected void sendWriteBuffer() throws IOException {
+ writeBuffer.flip();
+ channel.send(writeBuffer, targetAddress);
}
// Properties
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java?rev=383981&r1=383980&r2=383981&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java Tue Mar 7 11:49:23 2006
@@ -41,7 +41,7 @@
private OpenWireFormat wireFormat;
private DatagramReplayStrategy replayStrategy;
private SortedSet headers = new TreeSet();
- private long expectedCounter;
+ private long expectedCounter = 1;
private ByteArrayOutputStream out = new ByteArrayOutputStream();
public CommandReadBuffer(OpenWireFormat wireFormat, DatagramReplayStrategy replayStrategy) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java?rev=383981&r1=383980&r2=383981&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java Tue Mar 7 11:49:23 2006
@@ -122,8 +122,8 @@
}
public void setFlags(byte flags) {
- partial = (flags & 0x1) == 0;
- complete = (flags & 0x2) == 0;
+ partial = (flags & 0x1) != 0;
+ complete = (flags & 0x2) != 0;
}
public Command getCommand() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java?rev=383981&r1=383980&r2=383981&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java Tue Mar 7 11:49:23 2006
@@ -31,13 +31,16 @@
answer.setDataSize(readBuffer.getInt());
byte flags = readBuffer.get();
answer.setFlags(flags);
+ //System.out.println("Read header with counter: " + answer.getCounter() + "size: " + answer.getDataSize() + " with flags: " + flags);
return answer;
}
public void writeHeader(DatagramHeader header, ByteBuffer writeBuffer) {
writeBuffer.putLong(header.getCounter());
writeBuffer.putInt(header.getDataSize());
- writeBuffer.put(header.getFlags());
+ byte flags = header.getFlags();
+ //System.out.println("Writing header with counter: " + header.getCounter() + " size: " + header.getDataSize() + " with flags: " + flags);
+ writeBuffer.put(flags);
}
public int getHeaderSize(DatagramHeader header) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java?rev=383981&r1=383980&r2=383981&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java Tue Mar 7 11:49:23 2006
@@ -18,7 +18,6 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
/**
@@ -27,12 +26,19 @@
*
* @version $Revision$
*/
-public class DefaultBufferPool implements ByteBufferPool {
+public class DefaultBufferPool extends SimpleBufferPool implements ByteBufferPool {
- private int defaultSize;
private List buffers = new ArrayList();
private Object lock = new Object();
+ public DefaultBufferPool() {
+ super(true);
+ }
+
+ public DefaultBufferPool(boolean useDirect) {
+ super(useDirect);
+ }
+
public synchronized ByteBuffer borrowBuffer() {
synchronized (lock) {
int size = buffers.size();
@@ -40,29 +46,24 @@
return (ByteBuffer) buffers.remove(size - 1);
}
}
- return ByteBuffer.allocateDirect(defaultSize);
+ return createBuffer();
}
- public synchronized void returnBuffer(ByteBuffer buffer) {
+ public void returnBuffer(ByteBuffer buffer) {
synchronized (lock) {
buffers.add(buffer);
}
}
- public void setDefaultSize(int defaultSize) {
- this.defaultSize = defaultSize;
+ public void start() throws Exception {
}
- public synchronized void start() throws Exception {
- }
-
- public synchronized void stop() throws Exception {
+ public void stop() throws Exception {
synchronized (lock) {
/*
- for (Iterator iter = buffers.iterator(); iter.hasNext();) {
- ByteBuffer buffer = (ByteBuffer) iter.next();
- }
- */
+ * for (Iterator iter = buffers.iterator(); iter.hasNext();) {
+ * ByteBuffer buffer = (ByteBuffer) iter.next(); }
+ */
buffers.clear();
}
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java?rev=383981&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java Tue Mar 7 11:49:23 2006
@@ -0,0 +1,77 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A simple implementation of {@link BufferPool} which does no pooling and just
+ * creates new buffers each time
+ *
+ * @version $Revision$
+ */
+public class SimpleBufferPool implements ByteBufferPool {
+
+ private int defaultSize;
+ private boolean useDirect;
+
+ public SimpleBufferPool() {
+ this(false);
+ }
+
+ public SimpleBufferPool(boolean useDirect) {
+ this.useDirect = useDirect;
+ }
+
+ public synchronized ByteBuffer borrowBuffer() {
+ return createBuffer();
+ }
+
+ public void returnBuffer(ByteBuffer buffer) {
+ }
+
+ public void setDefaultSize(int defaultSize) {
+ this.defaultSize = defaultSize;
+ }
+
+ public boolean isUseDirect() {
+ return useDirect;
+ }
+
+ /**
+ * Sets whether direct buffers are used or not
+ */
+ public void setUseDirect(boolean useDirect) {
+ this.useDirect = useDirect;
+ }
+
+ public void start() throws Exception {
+ }
+
+ public void stop() throws Exception {
+ }
+
+ protected ByteBuffer createBuffer() {
+ if (useDirect) {
+ return ByteBuffer.allocateDirect(defaultSize);
+ }
+ else {
+ return ByteBuffer.allocate(defaultSize);
+ }
+ }
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/SimpleBufferPool.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=383981&r1=383980&r2=383981&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Tue Mar 7 11:49:23 2006
@@ -29,11 +29,14 @@
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
+import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.DatagramChannel;
/**
@@ -49,30 +52,34 @@
private ByteBufferPool bufferPool;
private DatagramReplayStrategy replayStrategy = new ExceptionIfDroppedPacketStrategy();
private int datagramSize = 4 * 1024;
- private long maxInactivityDuration = 0; //30000;
- private InetSocketAddress socketAddress;
+ private long maxInactivityDuration = 0; // 30000;
+ private InetSocketAddress targetAddress;
private DatagramChannel channel;
private boolean trace = false;
private boolean useLocalHost = true;
+ private int port;
- protected UdpTransport(OpenWireFormat wireFormat) {
+ protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
this.wireFormat = wireFormat;
}
public UdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException {
this(wireFormat);
- this.socketAddress = createAddress(remoteLocation);
+ this.targetAddress = createAddress(remoteLocation);
}
- public UdpTransport(OpenWireFormat wireFormat, InetSocketAddress socketAddress) {
+ public UdpTransport(OpenWireFormat wireFormat, InetSocketAddress socketAddress) throws IOException {
this(wireFormat);
- this.socketAddress = socketAddress;
+ this.targetAddress = socketAddress;
}
/**
* A one way asynchronous send
*/
public void oneway(Command command) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("Sending oneway from port: " + port + " to target: " + targetAddress);
+ }
checkStarted(command);
commandChannel.write(command);
}
@@ -81,7 +88,7 @@
* @return pretty print of 'this'
*/
public String toString() {
- return "udp://" + socketAddress;
+ return "udp://" + targetAddress + "?port=" + port;
}
/**
@@ -94,18 +101,32 @@
Command command = commandChannel.read();
doConsume(command);
}
- catch (SocketTimeoutException e) {
- }
- catch (InterruptedIOException e) {
+ /*
+ * catch (SocketTimeoutException e) { } catch
+ * (InterruptedIOException e) { }
+ */
+ catch (AsynchronousCloseException e) {
+ try {
+ stop();
+ }
+ catch (Exception e2) {
+ log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
+ }
}
- catch (IOException e) {
+ catch (Exception e) {
+ e.printStackTrace();
try {
stop();
}
catch (Exception e2) {
log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
}
- onException(e);
+ if (e instanceof IOException) {
+ onException((IOException) e);
+ }
+ else {
+ log.error(e);
+ }
}
}
}
@@ -124,12 +145,21 @@
return maxInactivityDuration;
}
+ public DatagramChannel getChannel() {
+ return channel;
+ }
+
+ public void setChannel(DatagramChannel channel) {
+ this.channel = channel;
+ }
+
/**
* Sets the maximum inactivity duration
*/
public void setMaxInactivityDuration(long maxInactivityDuration) {
this.maxInactivityDuration = maxInactivityDuration;
}
+
public boolean isUseLocalHost() {
return useLocalHost;
}
@@ -143,7 +173,6 @@
this.useLocalHost = useLocalHost;
}
-
public CommandChannel getCommandChannel() {
return commandChannel;
}
@@ -154,7 +183,7 @@
public void setCommandChannel(CommandChannel commandChannel) {
this.commandChannel = commandChannel;
}
-
+
public DatagramReplayStrategy getReplayStrategy() {
return replayStrategy;
}
@@ -166,6 +195,17 @@
this.replayStrategy = replayStrategy;
}
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Sets the port to connect on
+ */
+ public void setPort(int port) {
+ this.port = port;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
@@ -189,18 +229,26 @@
}
protected void doStart() throws Exception {
- if (socketAddress != null) {
- channel = DatagramChannel.open();
- channel.connect(socketAddress);
- }
- else if (channel == null) {
- throw new IllegalArgumentException("No channel configured");
+ SocketAddress localAddress = new InetSocketAddress(port);
+ channel = DatagramChannel.open();
+ channel.configureBlocking(true);
+
+ // TODO
+ // connect to default target address to avoid security checks each time
+ // channel = channel.connect(targetAddress);
+
+ DatagramSocket socket = channel.socket();
+ socket.bind(localAddress);
+ if (port == 0) {
+ port = socket.getLocalPort();
}
+
if (bufferPool == null) {
bufferPool = new DefaultBufferPool();
}
- commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize, replayStrategy);
+ commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress);
commandChannel.start();
+
super.doStart();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=383981&r1=383980&r2=383981&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java Tue Mar 7 11:49:23 2006
@@ -65,6 +65,8 @@
public Transport configure(Transport transport, WireFormat format, Map options) {
IntrospectionSupport.setProperties(transport, options);
UdpTransport tcpTransport = (UdpTransport) transport;
+
+ /*
if (tcpTransport.isTrace()) {
transport = new TransportLogger(transport);
}
@@ -73,8 +75,8 @@
transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
}
- transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
+ */
return transport;
}
@@ -111,7 +113,9 @@
return new UdpTransport(wf, location, localLocation);
}
*/
- return new UdpTransport((OpenWireFormat) wf, location);
+ OpenWireFormat wireFormat = (OpenWireFormat) wf;
+ wireFormat.setPrefixPacketSize(false);
+ return new UdpTransport(wireFormat, location);
}
protected ServerSocketFactory createServerSocketFactory() {
Added: incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/udp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/udp?rev=383981&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/udp (added)
+++ incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/udp Tue Mar 7 11:49:23 2006
@@ -0,0 +1 @@
+class=org.apache.activemq.transport.udp.UdpTransportFactory