You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/01/19 19:36:42 UTC
svn commit: r497898 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/stomp/
main/java/org/apache/activemq/transport/tcp/
test/java/org/apache/activemq/broker/
test/java/org/apache/activemq/transport/stomp/
Author: jstrachan
Date: Fri Jan 19 10:36:41 2007
New Revision: 497898
URL: http://svn.apache.org/viewvc?view=rev&rev=497898
Log:
fix for AMQ-1134 so that stomp connections are cleared up by the broker if a stomp client is killed without disconnecting properly
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java?view=diff&rev=497898&r1=497897&r2=497898
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java Fri Jan 19 10:36:41 2007
@@ -38,4 +38,9 @@
transport = new StompTransportFilter(transport, new LegacyFrameTranslator());
return super.compositeConfigure(transport, format, options);
}
+
+ protected boolean isUseInactivityMonitor(Transport transport) {
+ // lets disable the inactivity monitor as stomp does not use keep alive packets
+ return false;
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?view=diff&rev=497898&r1=497897&r2=497898
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Fri Jan 19 10:36:41 2007
@@ -356,6 +356,10 @@
}
protected void doStop(ServiceStopper stopper) throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("Stopping transport " + this);
+ }
+
// Closing the streams flush the sockets before closing.. if the socket
// is hung.. then this hangs the close.
// closeStreams();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?view=diff&rev=497898&r1=497897&r2=497898
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Fri Jan 19 10:36:41 2007
@@ -87,7 +87,9 @@
transport = new TransportLogger(transport);
}
- transport = new InactivityMonitor(transport);
+ if (isUseInactivityMonitor(transport)) {
+ transport = new InactivityMonitor(transport);
+ }
// Only need the WireFormatNegotiator if using openwire
if( format instanceof OpenWireFormat ) {
@@ -95,6 +97,13 @@
}
return transport;
+ }
+
+ /**
+ * Returns true if the inactivity monitor should be used on the transport
+ */
+ protected boolean isUseInactivityMonitor(Transport transport) {
+ return true;
}
protected Transport createTransport(URI location,WireFormat wf) throws UnknownHostException,IOException{
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java?view=diff&rev=497898&r1=497897&r2=497898
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java Fri Jan 19 10:36:41 2007
@@ -20,6 +20,7 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.util.UDPTraceBrokerPlugin;
import org.apache.activemq.broker.view.ConnectionDotFilePlugin;
import org.apache.activemq.broker.view.DestinationDotFilePlugin;
@@ -52,8 +53,15 @@
// URI(brokerURI));
BrokerService broker = new BrokerService();
broker.setPersistent(false);
+
+ // for running on Java 5 without mx4j
+ ManagementContext managementContext = broker.getManagementContext();
+ managementContext.setFindTigerMbeanServer(true);
+ managementContext.setUseMBeanServer(true);
+ managementContext.setCreateConnector(false);
+
broker.setUseJmx(true);
- broker.setPlugins(new BrokerPlugin[] { new ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin() });
+ //broker.setPlugins(new BrokerPlugin[] { new ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin() });
broker.addConnector("tcp://localhost:61616");
broker.addConnector("stomp://localhost:61613");
broker.start();
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?view=diff&rev=497898&r1=497897&r2=497898
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Fri Jan 19 10:36:41 2007
@@ -19,14 +19,13 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.*;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.transport.stomp.Stomp;
import javax.jms.*;
-
+import javax.jms.Connection;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -64,8 +63,6 @@
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
queue = new ActiveMQQueue(getQueueName());
connection.start();
-
-
}
protected Socket createSocket(URI connectUri) throws IOException {
@@ -78,7 +75,9 @@
protected void tearDown() throws Exception {
connection.close();
- stompSocket.close();
+ if (stompSocket != null) {
+ stompSocket.close();
+ }
broker.stop();
}
@@ -679,6 +678,37 @@
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
assertEquals("second message", message.getText().trim());
+ }
+
+ public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception {
+ assertClients(1);
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n"+
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ // This test case is currently failing
+ waitForFrameToTakeEffect();
+
+ assertClients(2);
+
+ // now lets kill the socket
+ stompSocket.close();
+ stompSocket = null;
+
+ Thread.sleep(2000);
+
+ assertClients(1);
+ }
+
+ protected void assertClients(int expected) throws Exception {
+ org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
+ int actual = clients.length;
+
+ assertEquals("Number of clients", expected, actual);
}
protected void waitForFrameToTakeEffect() throws InterruptedException {