You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/02/09 13:49:43 UTC

svn commit: r742458 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: openwire/ transport/ transport/stomp/ transport/tcp/ transport/udp/ wireformat/

Author: dejanb
Date: Mon Feb  9 12:49:42 2009
New Revision: 742458

URL: http://svn.apache.org/viewvc?rev=742458&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2088

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Mon Feb  9 12:49:42 2009
@@ -22,6 +22,7 @@
 import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.command.CommandTypes;
 import org.apache.activemq.command.DataStructure;
@@ -61,6 +62,8 @@
     private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
     private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
     private WireFormatInfo preferedWireFormatInfo;
+    
+    private AtomicBoolean receivingMessage = new AtomicBoolean(false);
 
     public OpenWireFormat() {
         this(DEFAULT_VERSION);
@@ -350,6 +353,7 @@
 
     public Object doUnmarshal(DataInput dis) throws IOException {
         byte dataType = dis.readByte();
+        receivingMessage.set(true);
         if (dataType != NULL_TYPE) {
             DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
             if (dsm == null) {
@@ -363,8 +367,10 @@
             } else {
                 dsm.looseUnmarshal(this, data, dis);
             }
+            receivingMessage.set(false);
             return data;
         } else {
+            receivingMessage.set(false);
             return null;
         }
     }
@@ -589,6 +595,10 @@
     public WireFormatInfo getPreferedWireFormatInfo() {
         return preferedWireFormatInfo;
     }
+    
+    public boolean inReceive() {
+    	return receivingMessage.get();
+    }
 
     public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Mon Feb  9 12:49:42 2009
@@ -27,6 +27,7 @@
 import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.thread.SchedulerTimerTask;
+import org.apache.activemq.wireformat.WireFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -62,6 +63,8 @@
     private long writeCheckTime;
     private long initialDelayTime;
     
+    private WireFormat wireFormat;
+    
     private final Runnable readChecker = new Runnable() {
         long lastRunTime;
         public void run() {
@@ -104,8 +107,9 @@
         }
     };
 
-    public InactivityMonitor(Transport next) {
+    public InactivityMonitor(Transport next, WireFormat wireFormat) {
         super(next);
+        this.wireFormat = wireFormat;
     }
 
     public void stop() throws Exception {
@@ -114,7 +118,7 @@
     }
 
     final void writeCheck() {
-            if (inSend.get()) {
+        if (inSend.get()) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("A send is in progress");
             }
@@ -149,7 +153,7 @@
     }
 
     final void readCheck() {
-        if (inReceive.get()) {
+        if (inReceive.get() || wireFormat.inReceive()) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("A receive is in progress");
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Mon Feb  9 12:49:42 2009
@@ -202,4 +202,11 @@
         this.version = version;
     }
 
+	public boolean inReceive() {
+		//TODO implement the inactivity monitor
+		return false;
+	}
+    
+    
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java Mon Feb  9 12:49:42 2009
@@ -108,7 +108,7 @@
             }
         }
 
-        transport = new InactivityMonitor(transport);
+        transport = new InactivityMonitor(transport, format);
 
         // Only need the WireFormatNegotiator if using openwire
         if (format instanceof OpenWireFormat) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Mon Feb  9 12:49:42 2009
@@ -97,7 +97,7 @@
         }
 
         if (isUseInactivityMonitor(transport)) {
-            transport = new InactivityMonitor(transport);
+            transport = new InactivityMonitor(transport, format);
         }
 
         // Only need the WireFormatNegotiator if using openwire

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java Mon Feb  9 12:49:42 2009
@@ -91,7 +91,7 @@
             }
         }
 
-        transport = new InactivityMonitor(transport);
+        transport = new InactivityMonitor(transport, format);
 
         if (format instanceof OpenWireFormat) {
             transport = configureClientSideNegotiator(transport, format, udpTransport);
@@ -123,7 +123,7 @@
             transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
         }
 
-        transport = new InactivityMonitor(transport);
+        transport = new InactivityMonitor(transport, format);
 
         if (!acceptServer && format instanceof OpenWireFormat) {
             transport = configureClientSideNegotiator(transport, format, udpTransport);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java Mon Feb  9 12:49:42 2009
@@ -132,7 +132,7 @@
     }
 
     protected Transport configureTransport(Transport transport) {
-        transport = new InactivityMonitor(transport);
+        transport = new InactivityMonitor(transport, serverTransport.getWireFormat());
         getAcceptListener().onAccept(transport);
         return transport;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Mon Feb  9 12:49:42 2009
@@ -75,4 +75,11 @@
         return 0;
     }
 
+	public boolean inReceive() {
+		// TODO implement the inactivity monitor
+		return false;
+	}
+    
+    
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java Mon Feb  9 12:49:42 2009
@@ -61,4 +61,9 @@
      */
     int getVersion();
     
+    /**
+     * @return true if message is being received
+     */
+    boolean inReceive();
+    
 }