You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2011/12/02 22:34:14 UTC

svn commit: r1209700 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ test/java/org/apache/activemq/security/ test/resources/org/apache/activemq/security/

Author: rajdavies
Date: Fri Dec  2 21:34:14 2011
New Revision: 1209700

URL: http://svn.apache.org/viewvc?rev=1209700&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-3294 and https://issues.apache.org/jira/browse/AMQ-1928

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/DoSTest.java
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/dos-broker.xml
      - copied, changed from r1209414, activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/jaas-broker.xml
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1209700&r1=1209699&r2=1209700&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Fri Dec  2 21:34:14 2011
@@ -1307,6 +1307,11 @@ public class ActiveMQConnection implemen
                         }catch(Throwable e) {
                             LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
                         }
+                        //dispose of transport
+                        Transport t = this.transport;
+                        if (null != t){
+                            ServiceSupport.dispose(t);
+                        }
                         if(jmsEx !=null) {
                             throw jmsEx;
                         }
@@ -1515,6 +1520,12 @@ public class ActiveMQConnection implemen
         started.set(false);
     }
 
+    public void finalize() throws Throwable{
+        if (scheduler != null){
+            scheduler.stop();
+        }
+    }
+
     /**
      * Changes the associated username/password that is associated with this
      * connection. If the connection has been used, you must called cleanup()
@@ -2229,10 +2240,17 @@ public class ActiveMQConnection implemen
     protected void onControlCommand(ControlCommand command) {
         String text = command.getCommand();
         if (text != null) {
-            if (text.equals("shutdown")) {
+            if ("shutdown".equals(text)) {
                 LOG.info("JVM told to shutdown");
                 System.exit(0);
             }
+            if (false && "close".equals(text)){
+                LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
+                try {
+                    close();
+                } catch (JMSException e) {
+                }
+            }
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1209700&r1=1209699&r2=1209700&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Fri Dec  2 21:34:14 2011
@@ -35,47 +35,15 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import javax.management.ObjectName;
 import javax.transaction.xa.XAResource;
-
 import org.apache.activemq.broker.ft.MasterBroker;
 import org.apache.activemq.broker.region.ConnectionStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.command.ConnectionControl;
-import org.apache.activemq.command.ConnectionError;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.ControlCommand;
-import org.apache.activemq.command.DataArrayResponse;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.FlushCommand;
-import org.apache.activemq.command.IntegerResponse;
-import org.apache.activemq.command.KeepAliveInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.ProducerAck;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.TransactionInfo;
-import org.apache.activemq.command.WireFormatInfo;
-import org.apache.activemq.network.*;
+import org.apache.activemq.command.*;
+import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.network.MBeanNetworkListener;
+import org.apache.activemq.network.NetworkBridgeConfiguration;
+import org.apache.activemq.network.NetworkBridgeFactory;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.state.ConnectionState;
@@ -93,7 +61,10 @@ import org.apache.activemq.transport.Res
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.util.*;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -150,15 +121,11 @@ public class TransportConnection impleme
     private String duplexNetworkConnectorId;
 
     /**
-     * @param connector
-     * @param transport
-     * @param broker
-     * @param taskRunnerFactory
-     *            - can be null if you want direct dispatch to the transport
-     *            else commands are sent async.
+     * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
+     *                          else commands are sent async.
      */
     public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
-            TaskRunnerFactory taskRunnerFactory) {
+                               TaskRunnerFactory taskRunnerFactory) {
         this.connector = connector;
         this.broker = broker;
         this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
@@ -238,7 +205,7 @@ public class TransportConnection impleme
     }
 
     private boolean expected(IOException e) {
-        return  isStomp() &&
+        return isStomp() &&
                 ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
     }
 
@@ -251,8 +218,6 @@ public class TransportConnection impleme
      * Calls the serviceException method in an async thread. Since handling a
      * service exception closes a socket, we should not tie up broker threads
      * since client sockets may hang or cause deadlocks.
-     *
-     * @param e
      */
     public void serviceExceptionAsync(final IOException e) {
         if (asyncException.compareAndSet(false, true)) {
@@ -321,8 +286,12 @@ public class TransportConnection impleme
                 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
                         + " command: " + command + ", exception: " + e, e);
             }
+
             if (responseRequired) {
+
                 response = new ExceptionResponse(e);
+                //still need to close this down - incase the peer of this transport doesn't play nice
+                delayedStop(2000);
             } else {
                 serviceException(e);
             }
@@ -619,7 +588,7 @@ public class TransportConnection impleme
         // this down.
         session.shutdown();
         // Cascade the connection stop to the consumers and producers.
-        for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) {
+        for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext(); ) {
             ConsumerId consumerId = (ConsumerId) iter.next();
             try {
                 processRemoveConsumer(consumerId, lastDeliveredSequenceId);
@@ -627,7 +596,7 @@ public class TransportConnection impleme
                 LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
             }
         }
-        for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) {
+        for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext(); ) {
             ProducerId producerId = (ProducerId) iter.next();
             try {
                 processRemoveProducer(producerId);
@@ -679,7 +648,7 @@ public class TransportConnection impleme
         }
         registerConnectionState(info.getConnectionId(), state);
         LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress() + ", info: " + info);
-        this.faultTolerantConnection=info.isFaultTolerant();
+        this.faultTolerantConnection = info.isFaultTolerant();
         // Setup the context.
         String clientId = info.getClientId();
         context = new ConnectionContext();
@@ -710,7 +679,7 @@ public class TransportConnection impleme
                 brokerConnectionStates.remove(info.getConnectionId());
             }
             unregisterConnectionState(info.getConnectionId());
-            LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " +  e.toString());
+            LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " + e.toString());
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Exception detail:", e);
             }
@@ -735,7 +704,7 @@ public class TransportConnection impleme
             // shutting down.
             cs.shutdown();
             // Cascade the connection stop to the sessions.
-            for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
+            for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext(); ) {
                 SessionId sessionId = (SessionId) iter.next();
                 try {
                     processRemoveSession(sessionId, lastDeliveredSequenceId);
@@ -744,7 +713,7 @@ public class TransportConnection impleme
                 }
             }
             // Cascade the connection stop to temp destinations.
-            for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext();) {
+            for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
                 DestinationInfo di = (DestinationInfo) iter.next();
                 try {
                     broker.removeDestination(cs.getContext(), di.getDestination(), 0);
@@ -898,7 +867,7 @@ public class TransportConnection impleme
     public void start() throws Exception {
         try {
             synchronized (this) {
-                starting  = true;
+                starting = true;
                 if (taskRunnerFactory != null) {
                     taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
                             + getRemoteAddress());
@@ -940,6 +909,25 @@ public class TransportConnection impleme
         }
     }
 
+    public void delayedStop(final int waitTime) {
+        if (waitTime > 0) {
+            try {
+                DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
+                    public void run() {
+                        try {
+                            Thread.sleep(waitTime);
+                            stopAsync();
+                        } catch (InterruptedException e) {
+                        }
+                    }
+                }, "delayedStop:" + transport.getRemoteAddress());
+            } catch (Throwable t) {
+                LOG.warn("cannot create stopAsync :", t);
+            }
+        }
+    }
+
+
     public void stopAsync() {
         // If we're in the middle of starting then go no further... for now.
         synchronized (this) {
@@ -957,7 +945,7 @@ public class TransportConnection impleme
                 cs.getContext().getStopping().set(true);
             }
             try {
-                DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable(){
+                DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
                     public void run() {
                         serviceLock.writeLock().lock();
                         try {
@@ -1010,7 +998,7 @@ public class TransportConnection impleme
         // Run the MessageDispatch callbacks so that message references get
         // cleaned up.
         synchronized (dispatchQueue) {
-            for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
+            for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
                 Command command = iter.next();
                 if (command.isMessageDispatch()) {
                     MessageDispatch md = (MessageDispatch) command;
@@ -1050,8 +1038,7 @@ public class TransportConnection impleme
     }
 
     /**
-     * @param blockedCandidate
-     *            The blockedCandidate to set.
+     * @param blockedCandidate The blockedCandidate to set.
      */
     public void setBlockedCandidate(boolean blockedCandidate) {
         this.blockedCandidate = blockedCandidate;
@@ -1065,8 +1052,7 @@ public class TransportConnection impleme
     }
 
     /**
-     * @param markedCandidate
-     *            The markedCandidate to set.
+     * @param markedCandidate The markedCandidate to set.
      */
     public void setMarkedCandidate(boolean markedCandidate) {
         this.markedCandidate = markedCandidate;
@@ -1077,8 +1063,7 @@ public class TransportConnection impleme
     }
 
     /**
-     * @param slow
-     *            The slow to set.
+     * @param slow The slow to set.
      */
     public void setSlow(boolean slow) {
         this.slow = slow;
@@ -1122,16 +1107,14 @@ public class TransportConnection impleme
     }
 
     /**
-     * @param blocked
-     *            The blocked to set.
+     * @param blocked The blocked to set.
      */
     public void setBlocked(boolean blocked) {
         this.blocked = blocked;
     }
 
     /**
-     * @param connected
-     *            The connected to set.
+     * @param connected The connected to set.
      */
     public void setConnected(boolean connected) {
         this.connected = connected;
@@ -1145,8 +1128,7 @@ public class TransportConnection impleme
     }
 
     /**
-     * @param active
-     *            The active to set.
+     * @param active The active to set.
      */
     public void setActive(boolean active) {
         this.active = active;
@@ -1164,7 +1146,7 @@ public class TransportConnection impleme
     }
 
     public boolean isFaultTolerantConnection() {
-       return this.faultTolerantConnection;
+        return this.faultTolerantConnection;
     }
 
     protected synchronized void setStarting(boolean starting) {
@@ -1196,7 +1178,7 @@ public class TransportConnection impleme
                 masterBroker = new MasterBroker(parent, transport);
                 masterBroker.startProcessing();
             }
-            LOG.info((passive?"Passive":"Active")+" Slave Broker " + info.getBrokerName() + " is attached");
+            LOG.info((passive ? "Passive" : "Active") + " Slave Broker " + info.getBrokerName() + " is attached");
             bService.slaveConnectionEstablished();
         } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
             // so this TransportConnection is the rear end of a network bridge
@@ -1217,7 +1199,7 @@ public class TransportConnection impleme
                 String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
                 CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
                 synchronized (connections) {
-                    for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
+                    for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
                         TransportConnection c = iter.next();
                         if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
                             LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ").");
@@ -1253,7 +1235,7 @@ public class TransportConnection impleme
                 LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started.");
                 return null;
             } catch (Exception e) {
-                LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId , e);
+                LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId, e);
                 return null;
             }
         }
@@ -1406,7 +1388,7 @@ public class TransportConnection impleme
     }
 
     protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
-            TransportConnectionState state) {
+                                                                            TransportConnectionState state) {
         TransportConnectionState cs = null;
         if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
             // swap implementations

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/DoSTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/DoSTest.java?rev=1209700&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/DoSTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/DoSTest.java Fri Dec  2 21:34:14 2011
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.security;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The configuration is set to except a maximum of 2 concurrent connections
+ * As the exception is delibrately ignored, the ActiveMQConnection would continue to
+ * attempt to connect unless the connection's transport was also stopped on an error.
+ * <p/>
+ * As the maximum connections allowed is 2, no more connections would be allowed unless
+ * the transport was adequately destroyed on the broker side.
+ */
+
+public class DoSTest extends JmsTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DoSTest.class);
+
+    public void testInvalidAuthentication() throws Throwable {
+
+        for (int i = 0; i < 1000; i++) {
+
+            try {
+                // Bad password
+                ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+                Connection c = factory.createConnection("bad", "krap");
+                c.start();
+                fail("Expected exception.");
+            } catch (JMSException e) {
+
+            }
+        }
+
+
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return createBroker("org/apache/activemq/security/dos-broker.xml");
+    }
+
+    protected BrokerService createBroker(String uri) throws Exception {
+        LOG.info("Loading broker configuration from the classpath with URI: " + uri);
+        return BrokerFactory.createBroker(new URI("xbean:" + uri));
+    }
+
+}

Copied: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/dos-broker.xml (from r1209414, activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/jaas-broker.xml)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/dos-broker.xml?p2=activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/dos-broker.xml&p1=activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/jaas-broker.xml&r1=1209414&r2=1209700&rev=1209700&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/jaas-broker.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/dos-broker.xml Fri Dec  2 21:34:14 2011
@@ -28,7 +28,6 @@
   <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
 
   <broker useJmx="false" persistent="false" xmlns="http://activemq.apache.org/schema/core" populateJMSXUserID="true">
-
     <plugins>
       <!--  use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
       <jaasAuthenticationPlugin configuration="activemq-domain" />
@@ -57,6 +56,9 @@
         </map>
       </authorizationPlugin>
     </plugins>
+      <transportConnectors>
+            <transportConnector uri="tcp://localhost:61616?maximumConnections=2"/>
+        </transportConnectors>
   </broker>
 
 </beans>