You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/10 11:21:08 UTC
svn commit: r384755 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp:
CommandChannel.java DatagramHeaderMarshaller.java UdpTransport.java
Author: jstrachan
Date: Fri Mar 10 02:21:07 2006
New Revision: 384755
URL: http://svn.apache.org/viewcvs?rev=384755&view=rev
Log:
minor refactorings to make it easier to derive from
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=384755&r1=384754&r2=384755&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java Fri Mar 10 02:21:07 2006
@@ -29,7 +29,6 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
@@ -50,7 +49,7 @@
private int datagramSize = 4 * 1024;
private DatagramReplayStrategy replayStrategy;
private SocketAddress targetAddress;
- private DatagramHeaderMarshaller headerMarshaller = new DatagramHeaderMarshaller();
+ private DatagramHeaderMarshaller headerMarshaller;
private final boolean checkSequenceNumbers;
// reading
@@ -67,7 +66,7 @@
private DatagramHeader header = new DatagramHeader();
public CommandChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
- DatagramReplayStrategy replayStrategy, SocketAddress targetAddress, boolean checkSequenceNumbers) {
+ DatagramReplayStrategy replayStrategy, SocketAddress targetAddress, boolean checkSequenceNumbers, DatagramHeaderMarshaller headerMarshaller) {
this.name = name;
this.channel = channel;
this.wireFormat = wireFormat;
@@ -76,6 +75,7 @@
this.replayStrategy = replayStrategy;
this.targetAddress = targetAddress;
this.checkSequenceNumbers = checkSequenceNumbers;
+ this.headerMarshaller = headerMarshaller;
}
public String toString() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java?rev=384755&r1=384754&r2=384755&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java Fri Mar 10 02:21:07 2006
@@ -25,8 +25,12 @@
*/
public class DatagramHeaderMarshaller {
+ public DatagramHeader createDatagramHeader() {
+ return new DatagramHeader();
+ }
+
public DatagramHeader readHeader(ByteBuffer readBuffer) {
- DatagramHeader answer = new DatagramHeader();
+ DatagramHeader answer = createDatagramHeader();
answer.setCounter(readBuffer.getLong());
answer.setDataSize(readBuffer.getInt());
byte flags = readBuffer.get();
@@ -46,5 +50,4 @@
public int getHeaderSize(DatagramHeader header) {
return 8 + 4 + 1;
}
-
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=384755&r1=384754&r2=384755&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Fri Mar 10 02:21:07 2006
@@ -82,7 +82,7 @@
public UdpTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException {
this(wireFormat);
this.targetAddress = socketAddress;
- this.description = "UdpServerConnection@";
+ this.description = getProtocolName() + "ServerConnection@";
}
/**
@@ -92,7 +92,7 @@
this(wireFormat);
this.port = port;
this.targetAddress = null;
- this.description = "UdpServer@";
+ this.description = getProtocolName() + "Server@";
}
/**
@@ -113,7 +113,6 @@
commandChannel.write(command, address);
}
-
public void receivedHeader(DatagramHeader header) {
wireFormatHeader = header;
}
@@ -126,7 +125,7 @@
return description + port;
}
else {
- return "udp://" + targetAddress + "@" + port;
+ return getProtocolUriScheme() + targetAddress + "@" + port;
}
}
@@ -169,6 +168,22 @@
}
}
+ /**
+ * We have received the WireFormatInfo from the server on the actual channel
+ * we should use for all future communication with the server, so lets set
+ * the target to be the actual channel that the server has chosen for us to
+ * talk on.
+ */
+ public void useLastInboundDatagramAsNewTarget() {
+ if (originalTargetAddress == null) {
+ originalTargetAddress = targetAddress;
+ }
+ SocketAddress lastAddress = commandChannel.getLastReadDatagramAddress();
+ if (lastAddress != null) {
+ targetAddress = lastAddress;
+ }
+ }
+
// Properties
// -------------------------------------------------------------------------
public boolean isTrace() {
@@ -255,7 +270,7 @@
public OpenWireFormat getWireFormat() {
return wireFormat;
}
-
+
public boolean isCheckSequenceNumbers() {
return checkSequenceNumbers;
}
@@ -313,7 +328,7 @@
if (bufferPool == null) {
bufferPool = new DefaultBufferPool();
}
- commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress, isCheckSequenceNumbers());
+ commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress, isCheckSequenceNumbers(), createDatagramHeaderMarshaller());
commandChannel.start();
// lets pass the header & address into the channel so it avoids a
@@ -331,21 +346,15 @@
}
}
- /**
- * We have received the WireFormatInfo from the server on the actual channel
- * we should use for all future communication with the server, so lets set
- * the target to be the actual channel that the server has chosen for us to
- * talk on.
- */
- public void useLastInboundDatagramAsNewTarget() {
- if (originalTargetAddress == null) {
- originalTargetAddress = targetAddress;
- }
- SocketAddress lastAddress = commandChannel.getLastReadDatagramAddress();
- if (lastAddress != null) {
- targetAddress = lastAddress;
- }
+ protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() {
+ return new DatagramHeaderMarshaller();
}
+ protected String getProtocolName() {
+ return "Udp";
+ }
+ protected String getProtocolUriScheme() {
+ return "udp://";
+ }
}