You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/02/09 13:49:43 UTC
svn commit: r742458 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: openwire/
transport/ transport/stomp/ transport/tcp/ transport/udp/ wireformat/
Author: dejanb
Date: Mon Feb 9 12:49:42 2009
New Revision: 742458
URL: http://svn.apache.org/viewvc?rev=742458&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2088
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Mon Feb 9 12:49:42 2009
@@ -22,6 +22,7 @@
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.DataStructure;
@@ -61,6 +62,8 @@
private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
private WireFormatInfo preferedWireFormatInfo;
+
+ private AtomicBoolean receivingMessage = new AtomicBoolean(false);
public OpenWireFormat() {
this(DEFAULT_VERSION);
@@ -350,6 +353,7 @@
public Object doUnmarshal(DataInput dis) throws IOException {
byte dataType = dis.readByte();
+ receivingMessage.set(true);
if (dataType != NULL_TYPE) {
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
if (dsm == null) {
@@ -363,8 +367,10 @@
} else {
dsm.looseUnmarshal(this, data, dis);
}
+ receivingMessage.set(false);
return data;
} else {
+ receivingMessage.set(false);
return null;
}
}
@@ -589,6 +595,10 @@
public WireFormatInfo getPreferedWireFormatInfo() {
return preferedWireFormatInfo;
}
+
+ public boolean inReceive() {
+ return receivingMessage.get();
+ }
public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Mon Feb 9 12:49:42 2009
@@ -27,6 +27,7 @@
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.thread.SchedulerTimerTask;
+import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -62,6 +63,8 @@
private long writeCheckTime;
private long initialDelayTime;
+ private WireFormat wireFormat;
+
private final Runnable readChecker = new Runnable() {
long lastRunTime;
public void run() {
@@ -104,8 +107,9 @@
}
};
- public InactivityMonitor(Transport next) {
+ public InactivityMonitor(Transport next, WireFormat wireFormat) {
super(next);
+ this.wireFormat = wireFormat;
}
public void stop() throws Exception {
@@ -114,7 +118,7 @@
}
final void writeCheck() {
- if (inSend.get()) {
+ if (inSend.get()) {
if (LOG.isTraceEnabled()) {
LOG.trace("A send is in progress");
}
@@ -149,7 +153,7 @@
}
final void readCheck() {
- if (inReceive.get()) {
+ if (inReceive.get() || wireFormat.inReceive()) {
if (LOG.isTraceEnabled()) {
LOG.trace("A receive is in progress");
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Mon Feb 9 12:49:42 2009
@@ -202,4 +202,11 @@
this.version = version;
}
+ public boolean inReceive() {
+ //TODO implement the inactivity monitor
+ return false;
+ }
+
+
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java Mon Feb 9 12:49:42 2009
@@ -108,7 +108,7 @@
}
}
- transport = new InactivityMonitor(transport);
+ transport = new InactivityMonitor(transport, format);
// Only need the WireFormatNegotiator if using openwire
if (format instanceof OpenWireFormat) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Mon Feb 9 12:49:42 2009
@@ -97,7 +97,7 @@
}
if (isUseInactivityMonitor(transport)) {
- transport = new InactivityMonitor(transport);
+ transport = new InactivityMonitor(transport, format);
}
// Only need the WireFormatNegotiator if using openwire
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java Mon Feb 9 12:49:42 2009
@@ -91,7 +91,7 @@
}
}
- transport = new InactivityMonitor(transport);
+ transport = new InactivityMonitor(transport, format);
if (format instanceof OpenWireFormat) {
transport = configureClientSideNegotiator(transport, format, udpTransport);
@@ -123,7 +123,7 @@
transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
}
- transport = new InactivityMonitor(transport);
+ transport = new InactivityMonitor(transport, format);
if (!acceptServer && format instanceof OpenWireFormat) {
transport = configureClientSideNegotiator(transport, format, udpTransport);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java Mon Feb 9 12:49:42 2009
@@ -132,7 +132,7 @@
}
protected Transport configureTransport(Transport transport) {
- transport = new InactivityMonitor(transport);
+ transport = new InactivityMonitor(transport, serverTransport.getWireFormat());
getAcceptListener().onAccept(transport);
return transport;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Mon Feb 9 12:49:42 2009
@@ -75,4 +75,11 @@
return 0;
}
+ public boolean inReceive() {
+ // TODO implement the inactivity monitor
+ return false;
+ }
+
+
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=742458&r1=742457&r2=742458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java Mon Feb 9 12:49:42 2009
@@ -61,4 +61,9 @@
*/
int getVersion();
+ /**
+ * @return true if message is being received
+ */
+ boolean inReceive();
+
}