You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/11/08 11:47:23 UTC

svn commit: r1032539 - in /activemq/trunk/activemq-optional/src: main/java/org/apache/activemq/transport/http/ test/java/org/apache/activemq/transport/http/

Author: dejanb
Date: Mon Nov  8 10:47:23 2010
New Revision: 1032539

URL: http://svn.apache.org/viewvc?rev=1032539&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-3021 - HttpTunnelServlet leak

Added:
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/BlockingQueueTransportLeakTest.java
Modified:
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java?rev=1032539&r1=1032538&r2=1032539&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java (original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java Mon Nov  8 10:47:23 2010
@@ -24,14 +24,16 @@ import java.util.concurrent.TimeUnit;
 import org.apache.activemq.transport.TransportSupport;
 import org.apache.activemq.util.ServiceStopper;
 
+import java.util.concurrent.CountDownLatch;
 /**
  * A server side HTTP based TransportChannel which processes incoming packets
  * and adds outgoing packets onto a {@link Queue} so that they can be dispatched
  * by the HTTP GET requests from the client.
- * 
+ *
  * @version $Revision$
  */
 public class BlockingQueueTransport extends TransportSupport {
+	public static CountDownLatch finalizeLatch;
     public static final long MAX_TIMEOUT = 30000L;
 
     private BlockingQueue<Object> queue;
@@ -40,6 +42,12 @@ public class BlockingQueueTransport exte
         this.queue = channel;
     }
 
+    @Override
+    public void finalize()
+    {
+    	finalizeLatch.countDown();
+    }
+
     public BlockingQueue<Object> getQueue() {
         return queue;
     }
@@ -55,7 +63,7 @@ public class BlockingQueueTransport exte
         }
     }
 
-    
+
     public String getRemoteAddress() {
         return "blockingQueue_" + queue.hashCode();
     }
@@ -68,5 +76,5 @@ public class BlockingQueueTransport exte
 
     public int getReceiveCounter() {
         return 0;
-    }   
-}
+    }
+}
\ No newline at end of file

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java?rev=1032539&r1=1032538&r2=1032539&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java (original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java Mon Nov  8 10:47:23 2010
@@ -21,7 +21,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import javax.servlet.ServletException;
@@ -29,14 +30,15 @@ import javax.servlet.ServletInputStream;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import org.apache.activemq.Service;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.WireFormatInfo;
-import org.apache.activemq.transport.InactivityMonitor;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.util.TextWireFormat;
 import org.apache.activemq.transport.xstream.XStreamWireFormat;
 import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -54,7 +56,7 @@ public class HttpTunnelServlet extends H
     private TransportAcceptListener listener;
     private HttpTransportFactory transportFactory;
     private TextWireFormat wireFormat;
-    private final Map<String, BlockingQueueTransport> clients = new HashMap<String, BlockingQueueTransport>();
+    private ConcurrentMap<String, BlockingQueueTransport> clients = new ConcurrentHashMap<String, BlockingQueueTransport>();
     private final long requestTimeout = 30000L;
     private HashMap transportOptions;
 
@@ -162,18 +164,16 @@ public class HttpTunnelServlet extends H
             LOG.warn("No clientID header specified");
             return null;
         }
-        synchronized (this) {
-            BlockingQueueTransport answer = clients.get(clientID);
-            if (answer == null) {
-                LOG.warn("The clientID header specified is invalid. Client sesion has not yet been established for it: " + clientID);
-                return null;
-            }
-            return answer;
+        BlockingQueueTransport answer = clients.get(clientID);
+        if (answer == null) {
+            LOG.warn("The clientID header specified is invalid. Client sesion has not yet been established for it: " + clientID);
+            return null;
         }
+        return answer;
     }
 
     protected BlockingQueueTransport createTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
-        String clientID = request.getHeader("clientID");
+        final String clientID = request.getHeader("clientID");
 
         if (clientID == null) {
             response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
@@ -181,33 +181,60 @@ public class HttpTunnelServlet extends H
             return null;
         }
 
-        synchronized (this) {
-            BlockingQueueTransport answer = clients.get(clientID);
-            if (answer != null) {
-                response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '" + clientID + "' has already been established");
-                LOG.warn("A session for clientID '" + clientID + "' has already been established");
-                return null;
-            }
+        // Optimistically create the client's transport; this transport may be thrown away if the client has already registered.
+        BlockingQueueTransport answer = createTransportChannel();
+
+        // Record the client's transport and ensure that it has not already registered; this is thread-safe and only allows one 
+        // thread to register the client
+        if (clients.putIfAbsent(clientID, answer) != null) {
+            response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '" + clientID + "' has already been established");
+            LOG.warn("A session for clientID '" + clientID + "' has already been established");
+            return null;
+        }
 
-            answer = createTransportChannel();
-            clients.put(clientID, answer);
-            Transport transport = answer;
+        // Ensure that the client's transport is cleaned up when no longer
+        // needed.
+        answer.addServiceListener(new ServiceListener() {
+            @Override
+            public void started(Service service) {
+                // Nothing to do.
+            }
+
+            @Override
+            public void stopped(Service service) {
+                clients.remove(clientID);
+            }
+        });
+
+        // Configure the transport with any additional properties or filters.  Although the returned transport is not explicitly
+        // persisted, if it is a filter (e.g., InactivityMonitor) it will be linked to the client's transport as a TransportListener
+        // and not GC'd until the client's transport is disposed.
+        Transport transport = answer;
+        try {
+            // Preserve the transportOptions for future use by making a copy before applying (they are removed when applied).
+            HashMap options = new HashMap(transportOptions);
+            transport = transportFactory.serverConfigure(answer, null, options);
+        } catch (Exception e) {
+            IOExceptionSupport.create(e);
+        }
+
+        // Wait for the transport to be connected or disposed.
+        listener.onAccept(transport);
+        while (!transport.isConnected() && !transport.isDisposed()) {
             try {
-                HashMap options = new HashMap(transportOptions);
-                transport = transportFactory.serverConfigure(answer, null, options);
-            } catch (Exception e) {
-                IOExceptionSupport.create(e);
-            }
-            listener.onAccept(transport);
-            //wait for the transport to connect
-            while (!answer.isConnected()) {
-            	try {
-            		Thread.sleep(100);
-            	} catch (InterruptedException ignore) {
-            	}
+                Thread.sleep(100);
+            } catch (InterruptedException ignore) {
             }
-            return answer;
         }
+
+        // Ensure that the transport was not prematurely disposed.
+        if (transport.isDisposed()) {
+            response.sendError(HttpServletResponse.SC_BAD_REQUEST, "The session for clientID '" + clientID + "' was prematurely disposed");
+            LOG.warn("The session for clientID '" + clientID + "' was prematurely disposed");
+            return null;
+        }
+
+        return answer;
     }
 
     protected BlockingQueueTransport createTransportChannel() {

Added: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/BlockingQueueTransportLeakTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/BlockingQueueTransportLeakTest.java?rev=1032539&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/BlockingQueueTransportLeakTest.java (added)
+++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/BlockingQueueTransportLeakTest.java Mon Nov  8 10:47:23 2010
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.http;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
+import org.apache.activemq.transport.http.BlockingQueueTransport;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import java.util.logging.Logger;
+
+/**
+ * This test demonstrates that HttpTunnelServlet leaks BlockingQueueTransport
+ * objects whenever a network bridge gets created and closed over HTTP.
+ * <p>
+ * <b>NOTE:</b> This test requires a modified version of
+ * BlockingQueueTransport; the modification is for the purpose of detecting when
+ * the object is removed from memory.
+ */
+public class BlockingQueueTransportLeakTest {
+	private static final long INACTIVITY_TIMEOUT = 5000;
+	private static final Logger LOG = Logger
+			.getLogger(BlockingQueueTransportLeakTest.class.getName());
+
+	// Change this URL to be an unused port. The inactivity timeout is required
+	// per AMQ-3016.
+	private static final String REMOTE_BROKER_HTTP_URL = "http://localhost:50000?transport.useInactivityMonitor=true&transport.initialDelayTime=0&transport.readCheckTime="
+			+ INACTIVITY_TIMEOUT;
+	private BrokerService localBroker = new BrokerService();
+	private BrokerService remoteBroker = new BrokerService();
+
+	@Before
+	public void init() throws Exception {
+		localBroker.setBrokerName("localBroker");
+		localBroker.setPersistent(false);
+		localBroker.setUseJmx(false);
+		localBroker.setSchedulerSupport(false);
+
+		remoteBroker.setBrokerName("remoteBroker");
+		remoteBroker.setPersistent(false);
+		remoteBroker.setUseJmx(false);
+		remoteBroker.addConnector(REMOTE_BROKER_HTTP_URL);
+		remoteBroker.setSchedulerSupport(false);
+	}
+
+	@After
+	public void cleanup() throws Exception {
+		try {
+			localBroker.stop();
+		} finally {
+			remoteBroker.stop();
+		}
+	}
+
+	/**
+	 * This test involves a local broker which establishes a network bridge to a
+	 * remote broker using the HTTP protocol. The local broker stops and the
+	 * remote broker cleans up the bridge connection.
+	 * <p>
+	 * This test demonstrates how the BlockingQueueTransport, which is created
+	 * by HttpTunnelServlet for each bridge, is held in memory indefinitely.
+	 */
+	@Test
+	public void httpTest() throws Exception {
+		final long BRIDGE_TIMEOUT = 10000;
+		final long GC_TIMEOUT = 30000;
+
+		// Add a network connector to the local broker that will create a bridge
+		// to the remote broker.
+		DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector();
+		SimpleDiscoveryAgent da = new SimpleDiscoveryAgent();
+		da.setServices(REMOTE_BROKER_HTTP_URL);
+		dnc.setDiscoveryAgent(da);
+		localBroker.addNetworkConnector(dnc);
+
+		// Add an interceptor to the remote broker that signals when the bridge
+		// connection has been added and removed.
+		BrokerPlugin[] plugins = new BrokerPlugin[1];
+		plugins[0] = new BrokerPlugin() {
+			public Broker installPlugin(Broker broker) throws Exception {
+				return new BrokerFilter(broker) {
+					@Override
+					public void addConnection(ConnectionContext context,
+							ConnectionInfo info) throws Exception {
+						super.addConnection(context, info);
+						synchronized (remoteBroker) {
+							remoteBroker.notifyAll();
+						}
+					}
+
+					@Override
+					public void removeConnection(ConnectionContext context,
+							ConnectionInfo info, Throwable error)
+							throws Exception {
+						super.removeConnection(context, info, error);
+						synchronized (remoteBroker) {
+							remoteBroker.notifyAll();
+						}
+					}
+				};
+			}
+		};
+		remoteBroker.setPlugins(plugins);
+
+		// Start the remote broker so that it available for the local broker to
+		// connect to.
+		remoteBroker.start();
+
+		// Start and stop the local broker. Synchronization is used to ensure
+		// that the bridge is created before the local broker stops,
+		// and that the test waits for the remote broker to remove the bridge.
+		synchronized (remoteBroker) {
+			localBroker.start();
+			remoteBroker.wait(BRIDGE_TIMEOUT);
+
+			// Verify that the remote bridge connection has been created by the
+			// remote broker.
+			Assert.assertEquals(1,
+					remoteBroker.getRegionBroker().getClients().length);
+
+			localBroker.stop();
+			remoteBroker.wait(BRIDGE_TIMEOUT);
+
+			// Verify that the remote bridge connection has been closed by the
+			// remote broker.
+			Assert.assertEquals(0,
+					remoteBroker.getRegionBroker().getClients().length);
+		}
+
+		// Initialize the countdown latch with the expected number of remote
+		// bridge connections that should be garbage collected.
+		BlockingQueueTransport.finalizeLatch = new CountDownLatch(1);
+
+		// Run the GC and verify that the remote bridge connections are no
+		// longer in memory. Some GC's are slow to respond, so give a second
+		// prod if necessary.
+		// This assertion fails with finalizeLatch.getCount() returning 1.
+		LOG.info("Triggering first GC...");
+		System.gc();
+		BlockingQueueTransport.finalizeLatch.await(GC_TIMEOUT,
+				TimeUnit.MILLISECONDS);
+
+		if (BlockingQueueTransport.finalizeLatch.getCount() != 0) {
+			LOG.info("Triggering second GC...");
+			System.gc();
+			BlockingQueueTransport.finalizeLatch.await(GC_TIMEOUT,
+					TimeUnit.MILLISECONDS);
+		}
+		Assert.assertEquals(0, BlockingQueueTransport.finalizeLatch.getCount());
+	}
+}
\ No newline at end of file