You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2011/12/02 22:34:14 UTC
svn commit: r1209700 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/
test/java/org/apache/activemq/security/
test/resources/org/apache/activemq/security/
Author: rajdavies
Date: Fri Dec 2 21:34:14 2011
New Revision: 1209700
URL: http://svn.apache.org/viewvc?rev=1209700&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-3294 and https://issues.apache.org/jira/browse/AMQ-1928
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/DoSTest.java
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/dos-broker.xml
- copied, changed from r1209414, activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/jaas-broker.xml
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1209700&r1=1209699&r2=1209700&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Fri Dec 2 21:34:14 2011
@@ -1307,6 +1307,11 @@ public class ActiveMQConnection implemen
}catch(Throwable e) {
LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
}
+ //dispose of transport
+ Transport t = this.transport;
+ if (null != t){
+ ServiceSupport.dispose(t);
+ }
if(jmsEx !=null) {
throw jmsEx;
}
@@ -1515,6 +1520,12 @@ public class ActiveMQConnection implemen
started.set(false);
}
+ public void finalize() throws Throwable{
+ if (scheduler != null){
+ scheduler.stop();
+ }
+ }
+
/**
* Changes the associated username/password that is associated with this
* connection. If the connection has been used, you must called cleanup()
@@ -2229,10 +2240,17 @@ public class ActiveMQConnection implemen
protected void onControlCommand(ControlCommand command) {
String text = command.getCommand();
if (text != null) {
- if (text.equals("shutdown")) {
+ if ("shutdown".equals(text)) {
LOG.info("JVM told to shutdown");
System.exit(0);
}
+ if (false && "close".equals(text)){
+ LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
+ try {
+ close();
+ } catch (JMSException e) {
+ }
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1209700&r1=1209699&r2=1209700&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Fri Dec 2 21:34:14 2011
@@ -35,47 +35,15 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.management.ObjectName;
import javax.transaction.xa.XAResource;
-
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.command.ConnectionControl;
-import org.apache.activemq.command.ConnectionError;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.ControlCommand;
-import org.apache.activemq.command.DataArrayResponse;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.FlushCommand;
-import org.apache.activemq.command.IntegerResponse;
-import org.apache.activemq.command.KeepAliveInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.ProducerAck;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.TransactionInfo;
-import org.apache.activemq.command.WireFormatInfo;
-import org.apache.activemq.network.*;
+import org.apache.activemq.command.*;
+import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.network.MBeanNetworkListener;
+import org.apache.activemq.network.NetworkBridgeConfiguration;
+import org.apache.activemq.network.NetworkBridgeFactory;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConnectionState;
@@ -93,7 +61,10 @@ import org.apache.activemq.transport.Res
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.util.*;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -150,15 +121,11 @@ public class TransportConnection impleme
private String duplexNetworkConnectorId;
/**
- * @param connector
- * @param transport
- * @param broker
- * @param taskRunnerFactory
- * - can be null if you want direct dispatch to the transport
- * else commands are sent async.
+ * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
+ * else commands are sent async.
*/
public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
- TaskRunnerFactory taskRunnerFactory) {
+ TaskRunnerFactory taskRunnerFactory) {
this.connector = connector;
this.broker = broker;
this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
@@ -238,7 +205,7 @@ public class TransportConnection impleme
}
private boolean expected(IOException e) {
- return isStomp() &&
+ return isStomp() &&
((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
}
@@ -251,8 +218,6 @@ public class TransportConnection impleme
* Calls the serviceException method in an async thread. Since handling a
* service exception closes a socket, we should not tie up broker threads
* since client sockets may hang or cause deadlocks.
- *
- * @param e
*/
public void serviceExceptionAsync(final IOException e) {
if (asyncException.compareAndSet(false, true)) {
@@ -321,8 +286,12 @@ public class TransportConnection impleme
SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
+ " command: " + command + ", exception: " + e, e);
}
+
if (responseRequired) {
+
response = new ExceptionResponse(e);
+ //still need to close this down - incase the peer of this transport doesn't play nice
+ delayedStop(2000);
} else {
serviceException(e);
}
@@ -619,7 +588,7 @@ public class TransportConnection impleme
// this down.
session.shutdown();
// Cascade the connection stop to the consumers and producers.
- for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) {
+ for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext(); ) {
ConsumerId consumerId = (ConsumerId) iter.next();
try {
processRemoveConsumer(consumerId, lastDeliveredSequenceId);
@@ -627,7 +596,7 @@ public class TransportConnection impleme
LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
}
}
- for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) {
+ for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext(); ) {
ProducerId producerId = (ProducerId) iter.next();
try {
processRemoveProducer(producerId);
@@ -679,7 +648,7 @@ public class TransportConnection impleme
}
registerConnectionState(info.getConnectionId(), state);
LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress() + ", info: " + info);
- this.faultTolerantConnection=info.isFaultTolerant();
+ this.faultTolerantConnection = info.isFaultTolerant();
// Setup the context.
String clientId = info.getClientId();
context = new ConnectionContext();
@@ -710,7 +679,7 @@ public class TransportConnection impleme
brokerConnectionStates.remove(info.getConnectionId());
}
unregisterConnectionState(info.getConnectionId());
- LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " + e.toString());
+ LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " + e.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Exception detail:", e);
}
@@ -735,7 +704,7 @@ public class TransportConnection impleme
// shutting down.
cs.shutdown();
// Cascade the connection stop to the sessions.
- for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
+ for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext(); ) {
SessionId sessionId = (SessionId) iter.next();
try {
processRemoveSession(sessionId, lastDeliveredSequenceId);
@@ -744,7 +713,7 @@ public class TransportConnection impleme
}
}
// Cascade the connection stop to temp destinations.
- for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext();) {
+ for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
DestinationInfo di = (DestinationInfo) iter.next();
try {
broker.removeDestination(cs.getContext(), di.getDestination(), 0);
@@ -898,7 +867,7 @@ public class TransportConnection impleme
public void start() throws Exception {
try {
synchronized (this) {
- starting = true;
+ starting = true;
if (taskRunnerFactory != null) {
taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
+ getRemoteAddress());
@@ -940,6 +909,25 @@ public class TransportConnection impleme
}
}
+ public void delayedStop(final int waitTime) {
+ if (waitTime > 0) {
+ try {
+ DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
+ public void run() {
+ try {
+ Thread.sleep(waitTime);
+ stopAsync();
+ } catch (InterruptedException e) {
+ }
+ }
+ }, "delayedStop:" + transport.getRemoteAddress());
+ } catch (Throwable t) {
+ LOG.warn("cannot create stopAsync :", t);
+ }
+ }
+ }
+
+
public void stopAsync() {
// If we're in the middle of starting then go no further... for now.
synchronized (this) {
@@ -957,7 +945,7 @@ public class TransportConnection impleme
cs.getContext().getStopping().set(true);
}
try {
- DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable(){
+ DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
public void run() {
serviceLock.writeLock().lock();
try {
@@ -1010,7 +998,7 @@ public class TransportConnection impleme
// Run the MessageDispatch callbacks so that message references get
// cleaned up.
synchronized (dispatchQueue) {
- for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
+ for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
Command command = iter.next();
if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) command;
@@ -1050,8 +1038,7 @@ public class TransportConnection impleme
}
/**
- * @param blockedCandidate
- * The blockedCandidate to set.
+ * @param blockedCandidate The blockedCandidate to set.
*/
public void setBlockedCandidate(boolean blockedCandidate) {
this.blockedCandidate = blockedCandidate;
@@ -1065,8 +1052,7 @@ public class TransportConnection impleme
}
/**
- * @param markedCandidate
- * The markedCandidate to set.
+ * @param markedCandidate The markedCandidate to set.
*/
public void setMarkedCandidate(boolean markedCandidate) {
this.markedCandidate = markedCandidate;
@@ -1077,8 +1063,7 @@ public class TransportConnection impleme
}
/**
- * @param slow
- * The slow to set.
+ * @param slow The slow to set.
*/
public void setSlow(boolean slow) {
this.slow = slow;
@@ -1122,16 +1107,14 @@ public class TransportConnection impleme
}
/**
- * @param blocked
- * The blocked to set.
+ * @param blocked The blocked to set.
*/
public void setBlocked(boolean blocked) {
this.blocked = blocked;
}
/**
- * @param connected
- * The connected to set.
+ * @param connected The connected to set.
*/
public void setConnected(boolean connected) {
this.connected = connected;
@@ -1145,8 +1128,7 @@ public class TransportConnection impleme
}
/**
- * @param active
- * The active to set.
+ * @param active The active to set.
*/
public void setActive(boolean active) {
this.active = active;
@@ -1164,7 +1146,7 @@ public class TransportConnection impleme
}
public boolean isFaultTolerantConnection() {
- return this.faultTolerantConnection;
+ return this.faultTolerantConnection;
}
protected synchronized void setStarting(boolean starting) {
@@ -1196,7 +1178,7 @@ public class TransportConnection impleme
masterBroker = new MasterBroker(parent, transport);
masterBroker.startProcessing();
}
- LOG.info((passive?"Passive":"Active")+" Slave Broker " + info.getBrokerName() + " is attached");
+ LOG.info((passive ? "Passive" : "Active") + " Slave Broker " + info.getBrokerName() + " is attached");
bService.slaveConnectionEstablished();
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
// so this TransportConnection is the rear end of a network bridge
@@ -1217,7 +1199,7 @@ public class TransportConnection impleme
String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
synchronized (connections) {
- for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
+ for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
TransportConnection c = iter.next();
if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ").");
@@ -1253,7 +1235,7 @@ public class TransportConnection impleme
LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started.");
return null;
} catch (Exception e) {
- LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId , e);
+ LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId, e);
return null;
}
}
@@ -1406,7 +1388,7 @@ public class TransportConnection impleme
}
protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
- TransportConnectionState state) {
+ TransportConnectionState state) {
TransportConnectionState cs = null;
if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
// swap implementations
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/DoSTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/DoSTest.java?rev=1209700&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/DoSTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/DoSTest.java Fri Dec 2 21:34:14 2011
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.security;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The configuration is set to except a maximum of 2 concurrent connections
+ * As the exception is delibrately ignored, the ActiveMQConnection would continue to
+ * attempt to connect unless the connection's transport was also stopped on an error.
+ * <p/>
+ * As the maximum connections allowed is 2, no more connections would be allowed unless
+ * the transport was adequately destroyed on the broker side.
+ */
+
+public class DoSTest extends JmsTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DoSTest.class);
+
+ public void testInvalidAuthentication() throws Throwable {
+
+ for (int i = 0; i < 1000; i++) {
+
+ try {
+ // Bad password
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+ Connection c = factory.createConnection("bad", "krap");
+ c.start();
+ fail("Expected exception.");
+ } catch (JMSException e) {
+
+ }
+ }
+
+
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ return createBroker("org/apache/activemq/security/dos-broker.xml");
+ }
+
+ protected BrokerService createBroker(String uri) throws Exception {
+ LOG.info("Loading broker configuration from the classpath with URI: " + uri);
+ return BrokerFactory.createBroker(new URI("xbean:" + uri));
+ }
+
+}
Copied: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/dos-broker.xml (from r1209414, activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/jaas-broker.xml)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/dos-broker.xml?p2=activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/dos-broker.xml&p1=activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/jaas-broker.xml&r1=1209414&r2=1209700&rev=1209700&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/jaas-broker.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/security/dos-broker.xml Fri Dec 2 21:34:14 2011
@@ -28,7 +28,6 @@
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker useJmx="false" persistent="false" xmlns="http://activemq.apache.org/schema/core" populateJMSXUserID="true">
-
<plugins>
<!-- use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
<jaasAuthenticationPlugin configuration="activemq-domain" />
@@ -57,6 +56,9 @@
</map>
</authorizationPlugin>
</plugins>
+ <transportConnectors>
+ <transportConnector uri="tcp://localhost:61616?maximumConnections=2"/>
+ </transportConnectors>
</broker>
</beans>