You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/01/19 19:36:42 UTC

svn commit: r497898 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/ main/java/org/apache/activemq/transport/tcp/ test/java/org/apache/activemq/broker/ test/java/org/apache/activemq/transport/stomp/

Author: jstrachan
Date: Fri Jan 19 10:36:41 2007
New Revision: 497898

URL: http://svn.apache.org/viewvc?view=rev&rev=497898
Log:
fix for AMQ-1134 so that stomp connections are cleared up by the broker if a stomp client is killed without disconnecting properly

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java?view=diff&rev=497898&r1=497897&r2=497898
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java Fri Jan 19 10:36:41 2007
@@ -38,4 +38,9 @@
     	transport = new StompTransportFilter(transport, new LegacyFrameTranslator());
     	return super.compositeConfigure(transport, format, options);
     }
+
+    protected boolean isUseInactivityMonitor(Transport transport) {
+        // lets disable the inactivity monitor as stomp does not use keep alive packets
+        return false;
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?view=diff&rev=497898&r1=497897&r2=497898
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Fri Jan 19 10:36:41 2007
@@ -356,6 +356,10 @@
     }
 
     protected void doStop(ServiceStopper stopper) throws Exception {
+        if (log.isDebugEnabled()) {
+            log.debug("Stopping transport " + this);
+        }
+
         // Closing the streams flush the sockets before closing.. if the socket
         // is hung.. then this hangs the close.
         // closeStreams();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?view=diff&rev=497898&r1=497897&r2=497898
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Fri Jan 19 10:36:41 2007
@@ -87,7 +87,9 @@
             transport = new TransportLogger(transport);
         }
 
-        transport = new InactivityMonitor(transport);
+        if (isUseInactivityMonitor(transport)) {
+            transport = new InactivityMonitor(transport);
+        }
 
         // Only need the WireFormatNegotiator if using openwire
         if( format instanceof OpenWireFormat ) {
@@ -95,6 +97,13 @@
         }
         
         return transport;
+    }
+
+    /**
+     * Returns true if the inactivity monitor should be used on the transport
+     */
+    protected boolean isUseInactivityMonitor(Transport transport) {
+        return true;
     }
 
     protected Transport createTransport(URI location,WireFormat wf) throws UnknownHostException,IOException{

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java?view=diff&rev=497898&r1=497897&r2=497898
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java Fri Jan 19 10:36:41 2007
@@ -20,6 +20,7 @@
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.ManagementContext;
 import org.apache.activemq.broker.util.UDPTraceBrokerPlugin;
 import org.apache.activemq.broker.view.ConnectionDotFilePlugin;
 import org.apache.activemq.broker.view.DestinationDotFilePlugin;
@@ -52,8 +53,15 @@
             // URI(brokerURI));
             BrokerService broker = new BrokerService();
             broker.setPersistent(false);
+
+            // for running on Java 5 without mx4j
+            ManagementContext managementContext = broker.getManagementContext();
+            managementContext.setFindTigerMbeanServer(true);
+            managementContext.setUseMBeanServer(true);
+            managementContext.setCreateConnector(false);
+
             broker.setUseJmx(true);
-            broker.setPlugins(new BrokerPlugin[] { new ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin() });
+            //broker.setPlugins(new BrokerPlugin[] { new ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin() });
             broker.addConnector("tcp://localhost:61616");
             broker.addConnector("stomp://localhost:61613");
             broker.start();

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?view=diff&rev=497898&r1=497897&r2=497898
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Fri Jan 19 10:36:41 2007
@@ -19,14 +19,13 @@
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.*;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.transport.stomp.Stomp;
 
 import javax.jms.*;
-
+import javax.jms.Connection;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -64,8 +63,6 @@
         session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         queue = new ActiveMQQueue(getQueueName());
         connection.start();
-
-
     }
 
     protected Socket createSocket(URI connectUri) throws IOException {
@@ -78,7 +75,9 @@
 
     protected void tearDown() throws Exception {
         connection.close();
-        stompSocket.close();
+        if (stompSocket != null) {
+            stompSocket.close();
+        }
         broker.stop();
     }
 
@@ -679,6 +678,37 @@
         TextMessage message = (TextMessage) consumer.receive(1000);
         assertNotNull(message);
         assertEquals("second message", message.getText().trim());
+    }
+
+    public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception {
+        assertClients(1);
+        String frame =
+            "CONNECT\n" +
+            "login: brianm\n" +
+            "passcode: wombats\n\n"+
+            Stomp.NULL;
+
+        sendFrame(frame);
+
+        // This test case is currently failing
+        waitForFrameToTakeEffect();
+
+        assertClients(2);
+
+        // now lets kill the socket
+        stompSocket.close();
+        stompSocket = null;
+
+        Thread.sleep(2000);
+
+        assertClients(1);
+    }
+
+    protected void assertClients(int expected) throws Exception {
+        org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
+        int actual = clients.length;
+
+        assertEquals("Number of clients", expected, actual);
     }
 
     protected void waitForFrameToTakeEffect() throws InterruptedException {