You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/11/08 11:47:23 UTC
svn commit: r1032539 - in /activemq/trunk/activemq-optional/src:
main/java/org/apache/activemq/transport/http/
test/java/org/apache/activemq/transport/http/
Author: dejanb
Date: Mon Nov 8 10:47:23 2010
New Revision: 1032539
URL: http://svn.apache.org/viewvc?rev=1032539&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-3021 - HttpTunnelServlet leak
Added:
activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/BlockingQueueTransportLeakTest.java
Modified:
activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java?rev=1032539&r1=1032538&r2=1032539&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java (original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java Mon Nov 8 10:47:23 2010
@@ -24,14 +24,16 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.TransportSupport;
import org.apache.activemq.util.ServiceStopper;
+import java.util.concurrent.CountDownLatch;
/**
* A server side HTTP based TransportChannel which processes incoming packets
* and adds outgoing packets onto a {@link Queue} so that they can be dispatched
* by the HTTP GET requests from the client.
- *
+ *
* @version $Revision$
*/
public class BlockingQueueTransport extends TransportSupport {
+ public static CountDownLatch finalizeLatch;
public static final long MAX_TIMEOUT = 30000L;
private BlockingQueue<Object> queue;
@@ -40,6 +42,12 @@ public class BlockingQueueTransport exte
this.queue = channel;
}
+ @Override
+ public void finalize()
+ {
+ finalizeLatch.countDown();
+ }
+
public BlockingQueue<Object> getQueue() {
return queue;
}
@@ -55,7 +63,7 @@ public class BlockingQueueTransport exte
}
}
-
+
public String getRemoteAddress() {
return "blockingQueue_" + queue.hashCode();
}
@@ -68,5 +76,5 @@ public class BlockingQueueTransport exte
public int getReceiveCounter() {
return 0;
- }
-}
+ }
+}
\ No newline at end of file
Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java?rev=1032539&r1=1032538&r2=1032539&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java (original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java Mon Nov 8 10:47:23 2010
@@ -21,7 +21,8 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
@@ -29,14 +30,15 @@ import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.WireFormatInfo;
-import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.transport.xstream.XStreamWireFormat;
import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,7 +56,7 @@ public class HttpTunnelServlet extends H
private TransportAcceptListener listener;
private HttpTransportFactory transportFactory;
private TextWireFormat wireFormat;
- private final Map<String, BlockingQueueTransport> clients = new HashMap<String, BlockingQueueTransport>();
+ private ConcurrentMap<String, BlockingQueueTransport> clients = new ConcurrentHashMap<String, BlockingQueueTransport>();
private final long requestTimeout = 30000L;
private HashMap transportOptions;
@@ -162,18 +164,16 @@ public class HttpTunnelServlet extends H
LOG.warn("No clientID header specified");
return null;
}
- synchronized (this) {
- BlockingQueueTransport answer = clients.get(clientID);
- if (answer == null) {
- LOG.warn("The clientID header specified is invalid. Client sesion has not yet been established for it: " + clientID);
- return null;
- }
- return answer;
+ BlockingQueueTransport answer = clients.get(clientID);
+ if (answer == null) {
+ LOG.warn("The clientID header specified is invalid. Client sesion has not yet been established for it: " + clientID);
+ return null;
}
+ return answer;
}
protected BlockingQueueTransport createTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
- String clientID = request.getHeader("clientID");
+ final String clientID = request.getHeader("clientID");
if (clientID == null) {
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
@@ -181,33 +181,60 @@ public class HttpTunnelServlet extends H
return null;
}
- synchronized (this) {
- BlockingQueueTransport answer = clients.get(clientID);
- if (answer != null) {
- response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '" + clientID + "' has already been established");
- LOG.warn("A session for clientID '" + clientID + "' has already been established");
- return null;
- }
+ // Optimistically create the client's transport; this transport may be thrown away if the client has already registered.
+ BlockingQueueTransport answer = createTransportChannel();
+
+ // Record the client's transport and ensure that it has not already registered; this is thread-safe and only allows one
+ // thread to register the client
+ if (clients.putIfAbsent(clientID, answer) != null) {
+ response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '" + clientID + "' has already been established");
+ LOG.warn("A session for clientID '" + clientID + "' has already been established");
+ return null;
+ }
- answer = createTransportChannel();
- clients.put(clientID, answer);
- Transport transport = answer;
+ // Ensure that the client's transport is cleaned up when no longer
+ // needed.
+ answer.addServiceListener(new ServiceListener() {
+ @Override
+ public void started(Service service) {
+ // Nothing to do.
+ }
+
+ @Override
+ public void stopped(Service service) {
+ clients.remove(clientID);
+ }
+ });
+
+ // Configure the transport with any additional properties or filters. Although the returned transport is not explicitly
+ // persisted, if it is a filter (e.g., InactivityMonitor) it will be linked to the client's transport as a TransportListener
+ // and not GC'd until the client's transport is disposed.
+ Transport transport = answer;
+ try {
+ // Preserve the transportOptions for future use by making a copy before applying (they are removed when applied).
+ HashMap options = new HashMap(transportOptions);
+ transport = transportFactory.serverConfigure(answer, null, options);
+ } catch (Exception e) {
+ IOExceptionSupport.create(e);
+ }
+
+ // Wait for the transport to be connected or disposed.
+ listener.onAccept(transport);
+ while (!transport.isConnected() && !transport.isDisposed()) {
try {
- HashMap options = new HashMap(transportOptions);
- transport = transportFactory.serverConfigure(answer, null, options);
- } catch (Exception e) {
- IOExceptionSupport.create(e);
- }
- listener.onAccept(transport);
- //wait for the transport to connect
- while (!answer.isConnected()) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException ignore) {
- }
+ Thread.sleep(100);
+ } catch (InterruptedException ignore) {
}
- return answer;
}
+
+ // Ensure that the transport was not prematurely disposed.
+ if (transport.isDisposed()) {
+ response.sendError(HttpServletResponse.SC_BAD_REQUEST, "The session for clientID '" + clientID + "' was prematurely disposed");
+ LOG.warn("The session for clientID '" + clientID + "' was prematurely disposed");
+ return null;
+ }
+
+ return answer;
}
protected BlockingQueueTransport createTransportChannel() {
Added: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/BlockingQueueTransportLeakTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/BlockingQueueTransportLeakTest.java?rev=1032539&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/BlockingQueueTransportLeakTest.java (added)
+++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/BlockingQueueTransportLeakTest.java Mon Nov 8 10:47:23 2010
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.http;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
+import org.apache.activemq.transport.http.BlockingQueueTransport;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import java.util.logging.Logger;
+
+/**
+ * This test demonstrates that HttpTunnelServlet leaks BlockingQueueTransport
+ * objects whenever a network bridge gets created and closed over HTTP.
+ * <p>
+ * <b>NOTE:</b> This test requires a modified version of
+ * BlockingQueueTransport; the modification is for the purpose of detecting when
+ * the object is removed from memory.
+ */
+public class BlockingQueueTransportLeakTest {
+ private static final long INACTIVITY_TIMEOUT = 5000;
+ private static final Logger LOG = Logger
+ .getLogger(BlockingQueueTransportLeakTest.class.getName());
+
+ // Change this URL to be an unused port. The inactivity timeout is required
+ // per AMQ-3016.
+ private static final String REMOTE_BROKER_HTTP_URL = "http://localhost:50000?transport.useInactivityMonitor=true&transport.initialDelayTime=0&transport.readCheckTime="
+ + INACTIVITY_TIMEOUT;
+ private BrokerService localBroker = new BrokerService();
+ private BrokerService remoteBroker = new BrokerService();
+
+ @Before
+ public void init() throws Exception {
+ localBroker.setBrokerName("localBroker");
+ localBroker.setPersistent(false);
+ localBroker.setUseJmx(false);
+ localBroker.setSchedulerSupport(false);
+
+ remoteBroker.setBrokerName("remoteBroker");
+ remoteBroker.setPersistent(false);
+ remoteBroker.setUseJmx(false);
+ remoteBroker.addConnector(REMOTE_BROKER_HTTP_URL);
+ remoteBroker.setSchedulerSupport(false);
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ try {
+ localBroker.stop();
+ } finally {
+ remoteBroker.stop();
+ }
+ }
+
+ /**
+ * This test involves a local broker which establishes a network bridge to a
+ * remote broker using the HTTP protocol. The local broker stops and the
+ * remote broker cleans up the bridge connection.
+ * <p>
+ * This test demonstrates how the BlockingQueueTransport, which is created
+ * by HttpTunnelServlet for each bridge, is held in memory indefinitely.
+ */
+ @Test
+ public void httpTest() throws Exception {
+ final long BRIDGE_TIMEOUT = 10000;
+ final long GC_TIMEOUT = 30000;
+
+ // Add a network connector to the local broker that will create a bridge
+ // to the remote broker.
+ DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector();
+ SimpleDiscoveryAgent da = new SimpleDiscoveryAgent();
+ da.setServices(REMOTE_BROKER_HTTP_URL);
+ dnc.setDiscoveryAgent(da);
+ localBroker.addNetworkConnector(dnc);
+
+ // Add an interceptor to the remote broker that signals when the bridge
+ // connection has been added and removed.
+ BrokerPlugin[] plugins = new BrokerPlugin[1];
+ plugins[0] = new BrokerPlugin() {
+ public Broker installPlugin(Broker broker) throws Exception {
+ return new BrokerFilter(broker) {
+ @Override
+ public void addConnection(ConnectionContext context,
+ ConnectionInfo info) throws Exception {
+ super.addConnection(context, info);
+ synchronized (remoteBroker) {
+ remoteBroker.notifyAll();
+ }
+ }
+
+ @Override
+ public void removeConnection(ConnectionContext context,
+ ConnectionInfo info, Throwable error)
+ throws Exception {
+ super.removeConnection(context, info, error);
+ synchronized (remoteBroker) {
+ remoteBroker.notifyAll();
+ }
+ }
+ };
+ }
+ };
+ remoteBroker.setPlugins(plugins);
+
+ // Start the remote broker so that it available for the local broker to
+ // connect to.
+ remoteBroker.start();
+
+ // Start and stop the local broker. Synchronization is used to ensure
+ // that the bridge is created before the local broker stops,
+ // and that the test waits for the remote broker to remove the bridge.
+ synchronized (remoteBroker) {
+ localBroker.start();
+ remoteBroker.wait(BRIDGE_TIMEOUT);
+
+ // Verify that the remote bridge connection has been created by the
+ // remote broker.
+ Assert.assertEquals(1,
+ remoteBroker.getRegionBroker().getClients().length);
+
+ localBroker.stop();
+ remoteBroker.wait(BRIDGE_TIMEOUT);
+
+ // Verify that the remote bridge connection has been closed by the
+ // remote broker.
+ Assert.assertEquals(0,
+ remoteBroker.getRegionBroker().getClients().length);
+ }
+
+ // Initialize the countdown latch with the expected number of remote
+ // bridge connections that should be garbage collected.
+ BlockingQueueTransport.finalizeLatch = new CountDownLatch(1);
+
+ // Run the GC and verify that the remote bridge connections are no
+ // longer in memory. Some GC's are slow to respond, so give a second
+ // prod if necessary.
+ // This assertion fails with finalizeLatch.getCount() returning 1.
+ LOG.info("Triggering first GC...");
+ System.gc();
+ BlockingQueueTransport.finalizeLatch.await(GC_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+
+ if (BlockingQueueTransport.finalizeLatch.getCount() != 0) {
+ LOG.info("Triggering second GC...");
+ System.gc();
+ BlockingQueueTransport.finalizeLatch.await(GC_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ }
+ Assert.assertEquals(0, BlockingQueueTransport.finalizeLatch.getCount());
+ }
+}
\ No newline at end of file