You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/07 17:24:07 UTC
svn commit: r383918 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp:
./ replay/
Author: jstrachan
Date: Tue Mar 7 08:24:05 2006
New Revision: 383918
URL: http://svn.apache.org/viewcvs?rev=383918&view=rev
Log:
added a default implementation of BufferPool and a default replay strategy
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java (with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=383918&r1=383917&r2=383918&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java Tue Mar 7 08:24:05 2006
@@ -20,6 +20,7 @@
import org.apache.activemq.command.Command;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.udp.replay.DatagramReplayStrategy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,35 +40,36 @@
public class CommandChannel implements Service {
private static final Log log = LogFactory.getLog(CommandChannel.class);
-
+
private ByteChannel channel;
private OpenWireFormat wireFormat;
private ByteBufferPool bufferPool;
private int datagramSize = 4 * 1024;
+ private DatagramReplayStrategy replayStrategy;
private DatagramHeaderMarshaller headerMarshaller = new DatagramHeaderMarshaller();
-
+
// reading
private ByteBuffer readBuffer;
private DataInputStream dataIn;
private CommandReadBuffer readStack;
-
+
// writing
private ByteBuffer writeBuffer;
- private BooleanStream bs = new BooleanStream();
+ private BooleanStream bs = new BooleanStream();
private DataOutputStream dataOut;
private int largeMessageBufferSize = 128 * 1024;
private DatagramHeader header = new DatagramHeader();
-
- public CommandChannel(ByteChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize) {
+ public CommandChannel(ByteChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, DatagramReplayStrategy replayStrategy) {
this.channel = channel;
this.wireFormat = wireFormat;
this.bufferPool = bufferPool;
this.datagramSize = datagramSize;
+ this.replayStrategy = replayStrategy;
}
public void start() throws Exception {
- readStack = new CommandReadBuffer(wireFormat);
+ readStack = new CommandReadBuffer(wireFormat, replayStrategy);
bufferPool.setDefaultSize(datagramSize);
bufferPool.start();
readBuffer = bufferPool.borrowBuffer();
@@ -79,7 +81,7 @@
public void stop() throws Exception {
bufferPool.stop();
}
-
+
public synchronized Command read() throws IOException {
readBuffer.clear();
int read = channel.read(readBuffer);
@@ -109,10 +111,11 @@
public synchronized void write(Command command) throws IOException {
header.incrementCounter();
int size = wireFormat.tightMarshalNestedObject1(command, bs);
- if (size < datagramSize ) {
+ if (size < datagramSize) {
header.setPartial(false);
header.setDataSize(size);
writeBuffer.rewind();
+ headerMarshaller.writeHeader(header, writeBuffer);
wireFormat.marshal(command, dataOut);
dataOut.flush();
channel.write(writeBuffer);
@@ -120,15 +123,15 @@
else {
header.setPartial(true);
header.setComplete(false);
-
+
// lets split the command up into chunks
ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize);
wireFormat.marshal(command, new DataOutputStream(largeBuffer));
-
+
byte[] data = largeBuffer.toByteArray();
int offset = 0;
boolean lastFragment = false;
- for (int fragment = 0, length = data.length; !lastFragment; fragment++ ) {
+ for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
// write the header
writeBuffer.rewind();
int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
@@ -144,7 +147,7 @@
}
}
}
-
+
// Properties
// -------------------------------------------------------------------------
@@ -153,7 +156,7 @@
}
/**
- * Sets the default size of a datagram on the network.
+ * Sets the default size of a datagram on the network.
*/
public void setDatagramSize(int datagramSize) {
this.datagramSize = datagramSize;
@@ -168,6 +171,14 @@
*/
public void setBufferPool(ByteBufferPool bufferPool) {
this.bufferPool = bufferPool;
+ }
+
+ public DatagramHeaderMarshaller getHeaderMarshaller() {
+ return headerMarshaller;
+ }
+
+ public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
+ this.headerMarshaller = headerMarshaller;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java?rev=383918&r1=383917&r2=383918&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java Tue Mar 7 08:24:05 2006
@@ -18,6 +18,9 @@
import org.apache.activemq.command.Command;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.udp.replay.DatagramReplayStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -33,20 +36,32 @@
* @version $Revision$
*/
public class CommandReadBuffer {
+ private static final Log log = LogFactory.getLog(CommandReadBuffer.class);
private OpenWireFormat wireFormat;
+ private DatagramReplayStrategy replayStrategy;
private SortedSet headers = new TreeSet();
- private int expectedCounter;
+ private long expectedCounter;
private ByteArrayOutputStream out = new ByteArrayOutputStream();
- public CommandReadBuffer(OpenWireFormat wireFormat) {
+ public CommandReadBuffer(OpenWireFormat wireFormat, DatagramReplayStrategy replayStrategy) {
this.wireFormat = wireFormat;
+ this.replayStrategy = replayStrategy;
}
+
public Command read(DatagramHeader header) throws IOException {
- if (expectedCounter != header.getCounter()) {
- // lets add it to the list for later on
- headers.add(header);
+ long actualCounter = header.getCounter();
+ if (expectedCounter != actualCounter) {
+ if (actualCounter < expectedCounter) {
+ log.warn("Ignoring out of step packet: " + header);
+ }
+ else {
+ replayStrategy.onDroppedPackets(expectedCounter, actualCounter);
+
+ // lets add it to the list for later on
+ headers.add(header);
+ }
// lets see if the first item in the set is the next header
header = (DatagramHeader) headers.first();
@@ -56,6 +71,7 @@
}
// we've got a valid header so increment counter
+ replayStrategy.onReceivedPacket(expectedCounter);
expectedCounter++;
Command answer = null;
@@ -75,7 +91,6 @@
}
}
return answer;
-
}
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java?rev=383918&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java Tue Mar 7 08:24:05 2006
@@ -0,0 +1,70 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A default implementation of {@link BufferPool} which keeps a pool of direct
+ * byte buffers.
+ *
+ * @version $Revision$
+ */
+public class DefaultBufferPool implements ByteBufferPool {
+
+ private int defaultSize;
+ private List buffers = new ArrayList();
+ private Object lock = new Object();
+
+ public synchronized ByteBuffer borrowBuffer() {
+ synchronized (lock) {
+ int size = buffers.size();
+ if (size > 0) {
+ return (ByteBuffer) buffers.remove(size - 1);
+ }
+ }
+ return ByteBuffer.allocateDirect(defaultSize);
+ }
+
+ public synchronized void returnBuffer(ByteBuffer buffer) {
+ synchronized (lock) {
+ buffers.add(buffer);
+ }
+ }
+
+ public void setDefaultSize(int defaultSize) {
+ this.defaultSize = defaultSize;
+ }
+
+ public synchronized void start() throws Exception {
+ }
+
+ public synchronized void stop() throws Exception {
+ synchronized (lock) {
+ /*
+ for (Iterator iter = buffers.iterator(); iter.hasNext();) {
+ ByteBuffer buffer = (ByteBuffer) iter.next();
+ }
+ */
+ buffers.clear();
+ }
+ }
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=383918&r1=383917&r2=383918&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Tue Mar 7 08:24:05 2006
@@ -21,6 +21,8 @@
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportThreadSupport;
+import org.apache.activemq.transport.udp.replay.DatagramReplayStrategy;
+import org.apache.activemq.transport.udp.replay.ExceptionIfDroppedPacketStrategy;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +47,7 @@
private CommandChannel commandChannel;
private OpenWireFormat wireFormat;
private ByteBufferPool bufferPool;
+ private DatagramReplayStrategy replayStrategy = new ExceptionIfDroppedPacketStrategy();
private int datagramSize = 4 * 1024;
private long maxInactivityDuration = 0; //30000;
private InetSocketAddress socketAddress;
@@ -85,7 +88,7 @@
* reads packets from a Socket
*/
public void run() {
- log.trace("TCP consumer thread starting");
+ log.trace("Consumer thread starting for: " + toString());
while (!isClosed()) {
try {
Command command = commandChannel.read();
@@ -152,10 +155,21 @@
this.commandChannel = commandChannel;
}
+ public DatagramReplayStrategy getReplayStrategy() {
+ return replayStrategy;
+ }
+
+ /**
+ * Sets the strategy used to replay missed datagrams
+ */
+ public void setReplayStrategy(DatagramReplayStrategy replayStrategy) {
+ this.replayStrategy = replayStrategy;
+ }
+
+
// Implementation methods
// -------------------------------------------------------------------------
-
/**
* Creates an address from the given URI
*/
@@ -182,7 +196,10 @@
else if (channel == null) {
throw new IllegalArgumentException("No channel configured");
}
- commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize);
+ if (bufferPool == null) {
+ bufferPool = new DefaultBufferPool();
+ }
+ commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize, replayStrategy);
commandChannel.start();
super.doStart();
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java?rev=383918&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java Tue Mar 7 08:24:05 2006
@@ -0,0 +1,32 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp.replay;
+
+import java.io.IOException;
+
+/**
+ * A pluggable strategy for how to deal with dropped packets.
+ *
+ * @version $Revision$
+ */
+public interface DatagramReplayStrategy {
+
+ void onDroppedPackets(long expectedCounter, long actualCounter) throws IOException;
+
+ void onReceivedPacket(long expectedCounter);
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java?rev=383918&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java Tue Mar 7 08:24:05 2006
@@ -0,0 +1,36 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp.replay;
+
+import java.io.IOException;
+
+/**
+ * Throws an exception if packets are dropped causing the transport to be closed.
+ *
+ * @version $Revision$
+ */
+public class ExceptionIfDroppedPacketStrategy implements DatagramReplayStrategy {
+
+ public void onDroppedPackets(long expectedCounter, long actualCounter) throws IOException {
+ long count = actualCounter - expectedCounter;
+ throw new IOException("" + count + " packet(s) dropped. Expected: " + expectedCounter + " but was: " + actualCounter);
+ }
+
+ public void onReceivedPacket(long expectedCounter) {
+ }
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
------------------------------------------------------------------------------
svn:mime-type = text/plain