You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/09/14 16:21:17 UTC

svn commit: r996911 - in /activemq/trunk: activemq-web-demo/src/main/webapp/WEB-INF/ activemq-web-demo/src/test/java/org/apache/activemq/web/ activemq-web/src/main/java/org/apache/activemq/web/

Author: dejanb
Date: Tue Sep 14 14:21:17 2010
New Revision: 996911

URL: http://svn.apache.org/viewvc?rev=996911&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-1933 - unsubscribing REST consumers

Modified:
    activemq/trunk/activemq-web-demo/src/main/webapp/WEB-INF/web.xml
    activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java

Modified: activemq/trunk/activemq-web-demo/src/main/webapp/WEB-INF/web.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/main/webapp/WEB-INF/web.xml?rev=996911&r1=996910&r2=996911&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/main/webapp/WEB-INF/web.xml (original)
+++ activemq/trunk/activemq-web-demo/src/main/webapp/WEB-INF/web.xml Tue Sep 14 14:21:17 2010
@@ -104,4 +104,8 @@
       <filter-name>session</filter-name>
       <url-pattern>/*</url-pattern>
     </filter-mapping>
+
+    <listener>
+        <listener-class>org.apache.activemq.web.SessionListener</listener-class>
+    </listener>
 </web-app>

Modified: activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java?rev=996911&r1=996910&r2=996911&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java (original)
+++ activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java Tue Sep 14 14:21:17 2010
@@ -17,13 +17,18 @@
 package org.apache.activemq.web;
 
 import javax.jms.TextMessage;
+import javax.management.ObjectName;
 
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.eclipse.jetty.client.ContentExchange;
 import org.eclipse.jetty.client.HttpClient;
 
+import java.util.Set;
+
 public class RestTest extends JettyTestSupport {
     private static final Log LOG = LogFactory.getLog(RestTest.class);
 	
@@ -106,4 +111,32 @@ public class RestTest extends JettyTestS
 	    }
 	}
 
+    public void testDisconnect() throws Exception {
+
+        producer.send(session.createTextMessage("test"));
+        HttpClient httpClient = new HttpClient();
+        httpClient.start();
+        ContentExchange contentExchange = new ContentExchange();
+        httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+        contentExchange.setURL("http://localhost:8080/message/test?readTimeout=1000&type=queue&clientId=test");
+        httpClient.send(contentExchange);
+        contentExchange.waitForDone();
+        LOG.info("Received: [" + contentExchange.getResponseStatus() + "] " + contentExchange.getResponseContent());
+
+        contentExchange = new ContentExchange();
+        contentExchange.setMethod("POST");
+        contentExchange.setURL("http://localhost:8080/message/test?clientId=test&action=unsubscribe");
+        httpClient.send(contentExchange);
+        contentExchange.waitForDone();
+
+        httpClient.stop();
+
+
+
+        ObjectName name = new ObjectName("org.apache.activemq" + ":BrokerName=localhost,Type=Queue,Destination=test");
+        ObjectName query = new ObjectName("org.apache.activemq:BrokerName=localhost,Type=Subscription,destinationType=Queue,destinationName=test,*");
+        Set subs = broker.getManagementContext().queryNames(query, null);
+        assertEquals("Consumers not closed", 0 , subs.size());
+    }
+
 }

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java?rev=996911&r1=996910&r2=996911&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java Tue Sep 14 14:21:17 2010
@@ -91,6 +91,17 @@ public class MessageServlet extends Mess
     protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
         // lets turn the HTTP post into a JMS Message
         try {
+
+            String action = request.getParameter("action");
+            String clientId = request.getParameter("clientId");
+            if (action != null && clientId != null && action.equals("unsubscribe")) {
+                LOG.info("Unsubscribing client " + clientId);
+                WebClient client = getWebClient(request);
+                client.close();
+                clients.remove(clientId);
+                return;
+            }
+
             WebClient client = getWebClient(request);
 
             String text = getPostedMessageBody(request);

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java?rev=996911&r1=996910&r2=996911&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java Tue Sep 14 14:21:17 2010
@@ -157,7 +157,9 @@ public class WebClient implements HttpSe
 
     public synchronized void close() {
         try {
-            closeConsumers();
+            if (consumers != null) {
+                closeConsumers();
+            }
             if (connection != null) {
                 connection.close();
             }
@@ -175,6 +177,7 @@ public class WebClient implements HttpSe
                 consumers.clear();
             }
             consumers = null;
+
         }
     }