You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/12/17 19:16:30 UTC

svn commit: r891816 - in /activemq/branches/activemq-5.3: activemq-core/src/main/java/org/apache/activemq/openwire/ activemq-core/src/main/java/org/apache/activemq/transport/ activemq-core/src/main/java/org/apache/activemq/transport/failover/ activemq-...

Author: cmacnaug
Date: Thu Dec 17 18:16:29 2009
New Revision: 891816

URL: http://svn.apache.org/viewvc?rev=891816&view=rev
Log:
Backport merge of Fix for AMQ-2511:  Inactivity monitor does not time out stale connections from https://svn.apache.org/repos/asf/activemq/trunk

Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/StubTransport.java
    activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
    activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
    activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransport.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Thu Dec 17 18:16:29 2009
@@ -22,7 +22,6 @@
 import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.command.CommandTypes;
 import org.apache.activemq.command.DataStructure;
@@ -63,8 +62,6 @@
     private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
     private WireFormatInfo preferedWireFormatInfo;
     
-    private AtomicBoolean receivingMessage = new AtomicBoolean(false);
-
     public OpenWireFormat() {
         this(DEFAULT_VERSION);
     }
@@ -353,7 +350,6 @@
 
     public Object doUnmarshal(DataInput dis) throws IOException {
         byte dataType = dis.readByte();
-        receivingMessage.set(true);
         if (dataType != NULL_TYPE) {
             DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
             if (dsm == null) {
@@ -367,10 +363,8 @@
             } else {
                 dsm.looseUnmarshal(this, data, dis);
             }
-            receivingMessage.set(false);
             return data;
         } else {
-            receivingMessage.set(false);
             return null;
         }
     }
@@ -595,10 +589,6 @@
     public WireFormatInfo getPreferedWireFormatInfo() {
         return preferedWireFormatInfo;
     }
-    
-    public boolean inReceive() {
-    	return receivingMessage.get();
-    }
 
     public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Thu Dec 17 18:16:29 2009
@@ -23,6 +23,7 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.WireFormatInfo;
@@ -56,6 +57,8 @@
 
     private final AtomicBoolean commandReceived = new AtomicBoolean(true);
     private final AtomicBoolean inReceive = new AtomicBoolean(false);
+    private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
+    
     private SchedulerTimerTask writeCheckerTask;
     private SchedulerTimerTask readCheckerTask;
     
@@ -153,7 +156,9 @@
     }
 
     final void readCheck() {
-        if (inReceive.get() || wireFormat.inReceive()) {
+        int currentCounter = next.getReceiveCounter();
+        int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
+        if (inReceive.get() || currentCounter!=previousCounter ) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("A receive is in progress");
             }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java Thu Dec 17 18:16:29 2009
@@ -154,4 +154,12 @@
      */
     void reconnect(URI uri) throws IOException;
 
+    /**
+     * Returns a counter which gets incremented as data is read from the transport.
+     * It should only be used to determine if there is progress being made in reading the next command from the transport.  
+     * The value may wrap into the negative numbers. 
+     * 
+     * @return a counter which gets incremented as data is read from the transport.
+     */
+    int getReceiveCounter();    
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java Thu Dec 17 18:16:29 2009
@@ -137,4 +137,8 @@
 	public void reconnect(URI uri) throws IOException {
 		next.reconnect(uri);
 	}
+
+    public int getReceiveCounter() {
+        return next.getReceiveCounter();
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Dec 17 18:16:29 2009
@@ -868,4 +868,12 @@
     public void reconnect(URI uri) throws IOException {
     	add(new URI[] {uri});
     }
+
+    public int getReceiveCounter() {
+        Transport transport = connectedTransport.get();
+        if( transport == null ) {
+            return 0;
+        }
+        return transport.getReceiveCounter();
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Thu Dec 17 18:16:29 2009
@@ -586,4 +586,16 @@
     public boolean isConnected() {
         return connected;
     }
+
+    public int getReceiveCounter() {
+        int rc = 0;
+        synchronized (reconnectMutex) {
+            for (FanoutTransportHandler th : transports) {
+                if (th.transport != null) {
+                    rc += th.transport.getReceiveCounter();
+                }
+            }
+        }
+        return rc;
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java Thu Dec 17 18:16:29 2009
@@ -151,4 +151,8 @@
 	public void reconnect(URI uri) throws IOException {
 		getNext().reconnect(uri);
 	}
+
+    public int getReceiveCounter() {
+        return getNext().getReceiveCounter();
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Thu Dec 17 18:16:29 2009
@@ -202,11 +202,4 @@
         this.version = version;
     }
 
-	public boolean inReceive() {
-		//TODO implement the inactivity monitor
-		return false;
-	}
-    
-    
-
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java Thu Dec 17 18:16:29 2009
@@ -43,7 +43,7 @@
         internalBuffer = new byte[size];
     }
 
-    private void fill() throws IOException {
+    protected void fill() throws IOException {
         byte[] buffer = internalBuffer;
         count = 0;
         position = 0;

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Thu Dec 17 18:16:29 2009
@@ -118,6 +118,7 @@
     private Boolean keepAlive;
     private Boolean tcpNoDelay;
     private Thread runnerThread;
+    private volatile int receiveCounter;
 
     /**
      * Connect to a remote Node - e.g. a Broker
@@ -504,7 +505,28 @@
     }
 
     protected void initializeStreams() throws Exception {
-        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
+        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
+            @Override
+            public int read() throws IOException {
+                receiveCounter++;
+                return super.read();
+            }
+            @Override
+            public int read(byte[] b, int off, int len) throws IOException {
+                receiveCounter++;
+                return super.read(b, off, len);
+            }
+            @Override
+            public long skip(long n) throws IOException {
+                receiveCounter++;
+                return super.skip(n);
+            }
+            @Override
+            protected void fill() throws IOException {
+                receiveCounter++;
+                super.fill();
+            }
+        };
         this.dataIn = new DataInputStream(buffIn);
         buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
         this.dataOut = new DataOutputStream(buffOut);
@@ -551,4 +573,9 @@
             }
         });
     }
+
+
+    public int getReceiveCounter() {
+        return receiveCounter;
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java Thu Dec 17 18:16:29 2009
@@ -50,4 +50,7 @@
     void setReplayAddress(SocketAddress address);
 
     void setReplayBuffer(ReplayBuffer replayBuffer);
+    
+    public int getReceiveCounter();
+
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java Thu Dec 17 18:16:29 2009
@@ -54,6 +54,7 @@
     // writing
     private Object writeLock = new Object();
     private int defaultMarshalBufferSize = 64 * 1024;
+    private volatile int receiveCounter;
 
     public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller,
                                   DatagramChannel channel, ByteBufferPool bufferPool) {
@@ -85,6 +86,8 @@
                 if (readBuffer.limit() == 0) {
                     continue;
                 }
+                
+                receiveCounter++;
                 from = headerMarshaller.createEndpoint(readBuffer, address);
 
                 int remaining = readBuffer.remaining();
@@ -252,4 +255,8 @@
         }
     }
 
+    public int getReceiveCounter() {
+        return receiveCounter;
+    }
+
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java Thu Dec 17 18:16:29 2009
@@ -48,6 +48,8 @@
     private Object readLock = new Object();
     private Object writeLock = new Object();
 
+    private volatile int receiveCounter;
+
     public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller,
                                  DatagramSocket channel) {
         super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
@@ -70,8 +72,9 @@
 
                 // TODO could use a DataInput implementation that talks direct
                 // to the byte[] to avoid object allocation
-                DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData()));
-
+                receiveCounter++;
+                DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData(), 0, datagram.getLength()));
+                
                 from = headerMarshaller.createEndpoint(datagram, dataIn);
                 answer = (Command)wireFormat.unmarshal(dataIn);
                 break;
@@ -232,4 +235,8 @@
     protected ByteArrayOutputStream createByteArrayOutputStream() {
         return new ByteArrayOutputStream(datagramSize);
     }
+
+    public int getReceiveCounter() {
+        return receiveCounter;
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Thu Dec 17 18:16:29 2009
@@ -462,4 +462,11 @@
         }
         return null;
     }
+
+    public int getReceiveCounter() {
+        if (commandChannel == null) {
+            return 0;
+        }
+        return commandChannel.getReceiveCounter();
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Thu Dec 17 18:16:29 2009
@@ -62,6 +62,7 @@
     private final Object lazyInitMutext = new Object();
     private final Valve enqueueValve = new Valve(true);
     private final AtomicBoolean stopping = new AtomicBoolean();
+    private volatile int receiveCounter;
     
     public VMTransport(URI location) {
         this.location = location;
@@ -112,6 +113,7 @@
             if( command == DISCONNECT ) {
                 transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
             } else {
+                peer.receiveCounter++;
                 transportListener.onCommand(command);
             }
         }
@@ -126,6 +128,7 @@
             if (messageQueue != null && !async) {
                 Object command;
                 while ((command = messageQueue.poll()) != null && !stopping.get() ) {
+                    receiveCounter++;
                     transportListener.onCommand(command);
                 }
             }
@@ -343,4 +346,8 @@
 	public void reconnect(URI uri) throws IOException {
 		throw new IOException("Not supported");
 	}
+
+    public int getReceiveCounter() {
+        return receiveCounter;
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Thu Dec 17 18:16:29 2009
@@ -75,11 +75,4 @@
         return 0;
     }
 
-	public boolean inReceive() {
-		// TODO implement the inactivity monitor
-		return false;
-	}
-    
-    
-
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java Thu Dec 17 18:16:29 2009
@@ -61,9 +61,4 @@
      */
     int getVersion();
     
-    /**
-     * @return true if message is being received
-     */
-    boolean inReceive();
-    
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/StubTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/StubTransport.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/StubTransport.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/StubTransport.java Thu Dec 17 18:16:29 2009
@@ -29,6 +29,7 @@
 public class StubTransport extends TransportSupport {
 
     private Queue<Object> queue = new ConcurrentLinkedQueue<Object>();
+    private volatile int receiveCounter;
 
     protected void doStop(ServiceStopper stopper) throws Exception {
     }
@@ -37,6 +38,7 @@
     }
 
     public void oneway(Object command) throws IOException {
+        receiveCounter++;
         queue.add(command);
     }
 
@@ -48,4 +50,8 @@
         return null;
     }
 
+    public int getReceiveCounter() {
+        return receiveCounter;
+    }
+
 }

Modified: activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java (original)
+++ activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java Thu Dec 17 18:16:29 2009
@@ -64,5 +64,9 @@
     }
 
     protected void doStop(ServiceStopper stopper) throws Exception {
+    }
+
+    public int getReceiveCounter() {
+        return 0;
     }   
 }

Modified: activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java (original)
+++ activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java Thu Dec 17 18:16:29 2009
@@ -58,6 +58,7 @@
     private final String clientID = CLIENT_ID_GENERATOR.generateId();
     private boolean trace;
     private GetMethod httpMethod;
+    private volatile int receiveCounter;
     
     public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
         super(wireFormat, remoteUrl);
@@ -135,6 +136,7 @@
                         break;
                     }
                 } else {
+                    receiveCounter++;
                     DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream());
                     Object command = (Object)getTextWireFormat().unmarshal(stream);
                     if (command == null) {
@@ -221,4 +223,8 @@
         this.trace = trace;
     }
 
+    public int getReceiveCounter() {
+        return receiveCounter;
+    }
+
 }

Modified: activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransport.java?rev=891816&r1=891815&r2=891816&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransport.java (original)
+++ activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransport.java Thu Dec 17 18:16:29 2009
@@ -47,7 +47,8 @@
     private HttpURLConnection receiveConnection;
     private URL url;
     private String clientID;
-
+    private volatile int receiveCounter;
+    
     // private String sessionID;
 
     public HttpTransport(TextWireFormat wireFormat, URI remoteUrl) throws MalformedURLException {
@@ -102,6 +103,7 @@
                     // checkSession(connection);
 
                     // Create a String for the UTF content
+                    receiveCounter++;
                     InputStream is = connection.getInputStream();
                     ByteArrayOutputStream baos = new ByteArrayOutputStream(connection.getContentLength() > 0 ? connection.getContentLength() : 1024);
                     int c = 0;
@@ -228,4 +230,8 @@
         }
     }
 
+    public int getReceiveCounter() {
+        return receiveCounter;
+    }
+
 }