You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/07 17:24:07 UTC

svn commit: r383918 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp: ./ replay/

Author: jstrachan
Date: Tue Mar  7 08:24:05 2006
New Revision: 383918

URL: http://svn.apache.org/viewcvs?rev=383918&view=rev
Log:
added a default implementation of BufferPool and a default replay strategy

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=383918&r1=383917&r2=383918&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java Tue Mar  7 08:24:05 2006
@@ -20,6 +20,7 @@
 import org.apache.activemq.command.Command;
 import org.apache.activemq.openwire.BooleanStream;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.udp.replay.DatagramReplayStrategy;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -39,35 +40,36 @@
 public class CommandChannel implements Service {
 
     private static final Log log = LogFactory.getLog(CommandChannel.class);
-    
+
     private ByteChannel channel;
     private OpenWireFormat wireFormat;
     private ByteBufferPool bufferPool;
     private int datagramSize = 4 * 1024;
+    private DatagramReplayStrategy replayStrategy;
     private DatagramHeaderMarshaller headerMarshaller = new DatagramHeaderMarshaller();
-    
+
     // reading
     private ByteBuffer readBuffer;
     private DataInputStream dataIn;
     private CommandReadBuffer readStack;
-    
+
     // writing
     private ByteBuffer writeBuffer;
-    private BooleanStream bs = new BooleanStream(); 
+    private BooleanStream bs = new BooleanStream();
     private DataOutputStream dataOut;
     private int largeMessageBufferSize = 128 * 1024;
     private DatagramHeader header = new DatagramHeader();
 
-
-    public CommandChannel(ByteChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize) {
+    public CommandChannel(ByteChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, DatagramReplayStrategy replayStrategy) {
         this.channel = channel;
         this.wireFormat = wireFormat;
         this.bufferPool = bufferPool;
         this.datagramSize = datagramSize;
+        this.replayStrategy = replayStrategy;
     }
 
     public void start() throws Exception {
-        readStack = new CommandReadBuffer(wireFormat);
+        readStack = new CommandReadBuffer(wireFormat, replayStrategy);
         bufferPool.setDefaultSize(datagramSize);
         bufferPool.start();
         readBuffer = bufferPool.borrowBuffer();
@@ -79,7 +81,7 @@
     public void stop() throws Exception {
         bufferPool.stop();
     }
-    
+
     public synchronized Command read() throws IOException {
         readBuffer.clear();
         int read = channel.read(readBuffer);
@@ -109,10 +111,11 @@
     public synchronized void write(Command command) throws IOException {
         header.incrementCounter();
         int size = wireFormat.tightMarshalNestedObject1(command, bs);
-        if (size < datagramSize ) {
+        if (size < datagramSize) {
             header.setPartial(false);
             header.setDataSize(size);
             writeBuffer.rewind();
+            headerMarshaller.writeHeader(header, writeBuffer);
             wireFormat.marshal(command, dataOut);
             dataOut.flush();
             channel.write(writeBuffer);
@@ -120,15 +123,15 @@
         else {
             header.setPartial(true);
             header.setComplete(false);
-            
+
             // lets split the command up into chunks
             ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize);
             wireFormat.marshal(command, new DataOutputStream(largeBuffer));
-            
+
             byte[] data = largeBuffer.toByteArray();
             int offset = 0;
             boolean lastFragment = false;
-            for (int fragment = 0, length = data.length; !lastFragment; fragment++ ) {
+            for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
                 // write the header
                 writeBuffer.rewind();
                 int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
@@ -144,7 +147,7 @@
             }
         }
     }
-    
+
     // Properties
     // -------------------------------------------------------------------------
 
@@ -153,7 +156,7 @@
     }
 
     /**
-     * Sets the default size of a datagram on the network. 
+     * Sets the default size of a datagram on the network.
      */
     public void setDatagramSize(int datagramSize) {
         this.datagramSize = datagramSize;
@@ -168,6 +171,14 @@
      */
     public void setBufferPool(ByteBufferPool bufferPool) {
         this.bufferPool = bufferPool;
+    }
+
+    public DatagramHeaderMarshaller getHeaderMarshaller() {
+        return headerMarshaller;
+    }
+
+    public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
+        this.headerMarshaller = headerMarshaller;
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java?rev=383918&r1=383917&r2=383918&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java Tue Mar  7 08:24:05 2006
@@ -18,6 +18,9 @@
 
 import org.apache.activemq.command.Command;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.udp.replay.DatagramReplayStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -33,20 +36,32 @@
  * @version $Revision$
  */
 public class CommandReadBuffer {
+    private static final Log log = LogFactory.getLog(CommandReadBuffer.class);
 
     private OpenWireFormat wireFormat;
+    private DatagramReplayStrategy replayStrategy;
     private SortedSet headers = new TreeSet();
-    private int expectedCounter;
+    private long expectedCounter;
     private ByteArrayOutputStream out = new ByteArrayOutputStream();
 
-    public CommandReadBuffer(OpenWireFormat wireFormat) {
+    public CommandReadBuffer(OpenWireFormat wireFormat, DatagramReplayStrategy replayStrategy) {
         this.wireFormat = wireFormat;
+        this.replayStrategy = replayStrategy;
     }
 
+
     public Command read(DatagramHeader header) throws IOException {
-        if (expectedCounter != header.getCounter()) {
-            // lets add it to the list for later on
-            headers.add(header);
+        long actualCounter = header.getCounter();
+        if (expectedCounter != actualCounter) {
+            if (actualCounter < expectedCounter) {
+                log.warn("Ignoring out of step packet: " + header);
+            }
+            else {
+                replayStrategy.onDroppedPackets(expectedCounter, actualCounter);
+                
+                // lets add it to the list for later on
+                headers.add(header);
+            }
 
             // lets see if the first item in the set is the next header
             header = (DatagramHeader) headers.first();
@@ -56,6 +71,7 @@
         }
 
         // we've got a valid header so increment counter
+        replayStrategy.onReceivedPacket(expectedCounter);
         expectedCounter++;
 
         Command answer = null;
@@ -75,7 +91,6 @@
             }
         }
         return answer;
-
     }
 
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java?rev=383918&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java Tue Mar  7 08:24:05 2006
@@ -0,0 +1,70 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A default implementation of {@link BufferPool} which keeps a pool of direct
+ * byte buffers.
+ * 
+ * @version $Revision$
+ */
+public class DefaultBufferPool implements ByteBufferPool {
+
+    private int defaultSize;
+    private List buffers = new ArrayList();
+    private Object lock = new Object();
+
+    public synchronized ByteBuffer borrowBuffer() {
+        synchronized (lock) {
+            int size = buffers.size();
+            if (size > 0) {
+                return (ByteBuffer) buffers.remove(size - 1);
+            }
+        }
+        return ByteBuffer.allocateDirect(defaultSize);
+    }
+
+    public synchronized void returnBuffer(ByteBuffer buffer) {
+        synchronized (lock) {
+            buffers.add(buffer);
+        }
+    }
+
+    public void setDefaultSize(int defaultSize) {
+        this.defaultSize = defaultSize;
+    }
+
+    public synchronized void start() throws Exception {
+    }
+
+    public synchronized void stop() throws Exception {
+        synchronized (lock) {
+            /*
+            for (Iterator iter = buffers.iterator(); iter.hasNext();) {
+                ByteBuffer buffer = (ByteBuffer) iter.next();
+            }
+            */
+            buffers.clear();
+        }
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DefaultBufferPool.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=383918&r1=383917&r2=383918&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Tue Mar  7 08:24:05 2006
@@ -21,6 +21,8 @@
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportThreadSupport;
+import org.apache.activemq.transport.udp.replay.DatagramReplayStrategy;
+import org.apache.activemq.transport.udp.replay.ExceptionIfDroppedPacketStrategy;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,6 +47,7 @@
     private CommandChannel commandChannel;
     private OpenWireFormat wireFormat;
     private ByteBufferPool bufferPool;
+    private DatagramReplayStrategy replayStrategy = new ExceptionIfDroppedPacketStrategy();
     private int datagramSize = 4 * 1024;
     private long maxInactivityDuration = 0; //30000;
     private InetSocketAddress socketAddress;
@@ -85,7 +88,7 @@
      * reads packets from a Socket
      */
     public void run() {
-        log.trace("TCP consumer thread starting");
+        log.trace("Consumer thread starting for: " + toString());
         while (!isClosed()) {
             try {
                 Command command = commandChannel.read();
@@ -152,10 +155,21 @@
         this.commandChannel = commandChannel;
     }
     
+    public DatagramReplayStrategy getReplayStrategy() {
+        return replayStrategy;
+    }
+
+    /**
+     * Sets the strategy used to replay missed datagrams
+     */
+    public void setReplayStrategy(DatagramReplayStrategy replayStrategy) {
+        this.replayStrategy = replayStrategy;
+    }
+
+    
     // Implementation methods
     // -------------------------------------------------------------------------
 
-
     /**
      * Creates an address from the given URI
      */
@@ -182,7 +196,10 @@
         else if (channel == null) {
             throw new IllegalArgumentException("No channel configured");
         }
-        commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize);
+        if (bufferPool == null) {
+            bufferPool = new DefaultBufferPool();
+        }
+        commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize, replayStrategy);
         commandChannel.start();
         super.doStart();
     }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java?rev=383918&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java Tue Mar  7 08:24:05 2006
@@ -0,0 +1,32 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp.replay;
+
+import java.io.IOException;
+
+/**
+ * A pluggable strategy for how to deal with dropped packets.
+ * 
+ * @version $Revision$
+ */
+public interface DatagramReplayStrategy {
+
+    void onDroppedPackets(long expectedCounter, long actualCounter) throws IOException;
+
+    void onReceivedPacket(long expectedCounter);
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java?rev=383918&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java Tue Mar  7 08:24:05 2006
@@ -0,0 +1,36 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp.replay;
+
+import java.io.IOException;
+
+/**
+ * Throws an exception if packets are dropped causing the transport to be closed.
+ * 
+ * @version $Revision$
+ */
+public class ExceptionIfDroppedPacketStrategy implements DatagramReplayStrategy {
+
+    public void onDroppedPackets(long expectedCounter, long actualCounter) throws IOException {
+        long count = actualCounter - expectedCounter;
+        throw new IOException("" + count +  " packet(s) dropped. Expected: " + expectedCounter + " but was: " + actualCounter);
+    }
+
+    public void onReceivedPacket(long expectedCounter) {
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain