You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2012/04/20 11:51:55 UTC

svn commit: r1328287 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ test/java/org/apache/activemq/config/ test/resources/org/apache/activemq/config/sample-conf/

Author: rajdavies
Date: Fri Apr 20 09:51:55 2012
New Revision: 1328287

URL: http://svn.apache.org/viewvc?rev=1328287&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-3813

Added:
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/connector-properties.xml
      - copied, changed from r1327942, activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/memory-example.xml
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/config/ConfigTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=1328287&r1=1328286&r2=1328287&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java Fri Apr 20 09:51:55 2012
@@ -23,6 +23,7 @@ import javax.jms.JMSException;
 import org.apache.activemq.ActiveMQMessageTransformation;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
+
 public final class AdvisorySupport {
     public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
     public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX
@@ -64,7 +65,7 @@ public final class AdvisorySupport {
 
     public static final ActiveMQTopic ALL_DESTINATIONS_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
             TOPIC_ADVISORY_TOPIC.getPhysicalName() + "," + QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," +
-            TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
+                    TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
     public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
             TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
     private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
@@ -256,16 +257,16 @@ public final class AdvisorySupport {
 
     public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
         switch (destination.getDestinationType()) {
-        case ActiveMQDestination.QUEUE_TYPE:
-            return QUEUE_ADVISORY_TOPIC;
-        case ActiveMQDestination.TOPIC_TYPE:
-            return TOPIC_ADVISORY_TOPIC;
-        case ActiveMQDestination.TEMP_QUEUE_TYPE:
-            return TEMP_QUEUE_ADVISORY_TOPIC;
-        case ActiveMQDestination.TEMP_TOPIC_TYPE:
-            return TEMP_TOPIC_ADVISORY_TOPIC;
-        default:
-            throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
+            case ActiveMQDestination.QUEUE_TYPE:
+                return QUEUE_ADVISORY_TOPIC;
+            case ActiveMQDestination.TOPIC_TYPE:
+                return TOPIC_ADVISORY_TOPIC;
+            case ActiveMQDestination.TEMP_QUEUE_TYPE:
+                return TEMP_QUEUE_ADVISORY_TOPIC;
+            case ActiveMQDestination.TEMP_TOPIC_TYPE:
+                return TEMP_TOPIC_ADVISORY_TOPIC;
+            default:
+                throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
         }
     }
 
@@ -307,17 +308,20 @@ public final class AdvisorySupport {
     }
 
     public static boolean isAdvisoryTopic(ActiveMQDestination destination) {
-        if (destination.isComposite()) {
-            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
-            for (int i = 0; i < compositeDestinations.length; i++) {
-                if (isAdvisoryTopic(compositeDestinations[i])) {
-                    return true;
-                }
+        if (destination != null) {
+            if (destination.isComposite()) {
+                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+                for (int i = 0; i < compositeDestinations.length; i++) {
+                    if (isAdvisoryTopic(compositeDestinations[i])) {
+                        return true;
+                    }
+                }
+                return false;
+            } else {
+                return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
             }
-            return false;
-        } else {
-            return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
         }
+        return false;
     }
 
     public static boolean isConnectionAdvisoryTopic(Destination destination) throws JMSException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1328287&r1=1328286&r2=1328287&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Apr 20 09:51:55 2012
@@ -1265,6 +1265,24 @@ public class BrokerService implements Se
         }
     }
 
+    public TransportConnector getTransportConnectorByName(String name){
+        for (TransportConnector transportConnector:transportConnectors){
+           if (name.equals(transportConnector.getName())){
+               return transportConnector;
+           }
+        }
+        return null;
+    }
+
+    public TransportConnector getTransportConnectorByScheme(String scheme){
+        for (TransportConnector transportConnector:transportConnectors){
+            if (scheme.equals(transportConnector.getUri().getScheme())){
+                return transportConnector;
+            }
+        }
+        return null;
+    }
+
     public List<NetworkConnector> getNetworkConnectors() {
         return new ArrayList<NetworkConnector>(networkConnectors);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1328287&r1=1328286&r2=1328287&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Fri Apr 20 09:51:55 2012
@@ -16,6 +16,27 @@
  */
 package org.apache.activemq.broker;
 
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.transaction.xa.XAResource;
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.ft.MasterBroker;
 import org.apache.activemq.broker.region.ConnectionStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
@@ -49,26 +70,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
-import javax.transaction.xa.XAResource;
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.SocketException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 public class TransportConnection implements Connection, Task, CommandVisitor {
     private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
     private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
@@ -298,7 +299,7 @@ public class TransportConnection impleme
                         + " command: " + command + ", exception: " + e, e);
             }
 
-            if(e instanceof java.lang.SecurityException){
+            if (e instanceof java.lang.SecurityException) {
                 // still need to close this down - in case the peer of this transport doesn't play nice
                 delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
             }
@@ -504,12 +505,19 @@ public class TransportConnection impleme
         }
         // Avoid replaying dup commands
         if (!ss.getProducerIds().contains(info.getProducerId())) {
+            ActiveMQDestination destination = info.getDestination();
+            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
+                if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
+                    throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
+                }
+            }
             broker.addProducer(cs.getContext(), info);
             try {
                 ss.addProducer(info);
             } catch (IllegalStateException e) {
                 broker.removeProducer(cs.getContext(), info);
             }
+
         }
         return null;
     }
@@ -547,6 +555,13 @@ public class TransportConnection impleme
         }
         // Avoid replaying dup commands
         if (!ss.getConsumerIds().contains(info.getConsumerId())) {
+            ActiveMQDestination destination = info.getDestination();
+            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
+                if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){
+                    throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection());
+                }
+            }
+
             broker.addConsumer(cs.getContext(), info);
             try {
                 ss.addConsumer(info);
@@ -554,6 +569,7 @@ public class TransportConnection impleme
             } catch (IllegalStateException e) {
                 broker.removeConsumer(cs.getContext(), info);
             }
+
         }
         return null;
     }
@@ -1280,7 +1296,7 @@ public class TransportConnection impleme
         return null;
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @SuppressWarnings({"unchecked", "rawtypes"})
     private HashMap<String, String> createMap(Properties properties) {
         return new HashMap(properties);
     }
@@ -1476,4 +1492,32 @@ public class TransportConnection impleme
     protected CountDownLatch getStopped() {
         return stopped;
     }
+
+    private int getProducerCount(ConnectionId connectionId) {
+        int result = 0;
+        TransportConnectionState cs = lookupConnectionState(connectionId);
+        if (cs != null) {
+            for (SessionId sessionId : cs.getSessionIds()) {
+                SessionState sessionState = cs.getSessionState(sessionId);
+                if (sessionState != null) {
+                    result += sessionState.getProducerIds().size();
+                }
+            }
+        }
+        return result;
+    }
+
+    private int getConsumerCount(ConnectionId connectionId) {
+        int result = 0;
+        TransportConnectionState cs = lookupConnectionState(connectionId);
+        if (cs != null) {
+            for (SessionId sessionId : cs.getSessionIds()) {
+                SessionState sessionState = cs.getSessionState(sessionId);
+                if (sessionState != null) {
+                    result += sessionState.getConsumerIds().size();
+                }
+            }
+        }
+        return result;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=1328287&r1=1328286&r2=1328287&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Fri Apr 20 09:51:55 2012
@@ -16,6 +16,16 @@
  */
 package org.apache.activemq.broker;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.StringTokenizer;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.regex.Pattern;
+
+import javax.management.ObjectName;
 import org.apache.activemq.broker.jmx.ManagedTransportConnector;
 import org.apache.activemq.broker.jmx.ManagementContext;
 import org.apache.activemq.broker.region.ConnectorStatistics;
@@ -35,16 +45,6 @@ import org.apache.activemq.util.ServiceS
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.StringTokenizer;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.regex.Pattern;
-
 /**
  * @org.apache.xbean.XBean
  *
@@ -74,6 +74,8 @@ public class TransportConnector implemen
     private boolean updateClusterClientsOnRemove = false;
     private String updateClusterFilter;
     private boolean auditNetworkProducers = false;
+    private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
+    private int maximumConsumersAllowedPerConnection  = Integer.MAX_VALUE;
 
     LinkedList<String> peerBrokers = new LinkedList<String>();
 
@@ -122,6 +124,8 @@ public class TransportConnector implemen
         rc.setUpdateClusterFilter(getUpdateClusterFilter());
         rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
         rc.setAuditNetworkProducers(isAuditNetworkProducers());
+        rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
+        rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
         return rc;
     }
 
@@ -599,4 +603,21 @@ public class TransportConnector implemen
     public void setAuditNetworkProducers(boolean auditNetworkProducers) {
         this.auditNetworkProducers = auditNetworkProducers;
     }
+
+    public int getMaximumProducersAllowedPerConnection() {
+        return maximumProducersAllowedPerConnection;
+    }
+
+    public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) {
+        this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection;
+    }
+
+    public int getMaximumConsumersAllowedPerConnection() {
+        return maximumConsumersAllowedPerConnection;
+    }
+
+    public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) {
+        this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/config/ConfigTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/config/ConfigTest.java?rev=1328287&r1=1328286&r2=1328287&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/config/ConfigTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/config/ConfigTest.java Fri Apr 20 09:51:55 2012
@@ -21,11 +21,18 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
 import javax.sql.DataSource;
-
 import junit.framework.TestCase;
-
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPolicy;
@@ -47,17 +54,17 @@ import org.apache.activemq.transport.tcp
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.wireformat.ObjectStreamWireFormat;
 import org.apache.activemq.xbean.BrokerFactoryBean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.derby.jdbc.EmbeddedDataSource;
 import org.jmock.Expectations;
 import org.jmock.Mockery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.core.io.ClassPathResource;
 import org.springframework.core.io.FileSystemResource;
 import org.springframework.core.io.Resource;
 
 /**
- * 
+ *
  */
 public class ConfigTest extends TestCase {
 
@@ -94,8 +101,8 @@ public class ConfigTest extends TestCase
         recursiveDelete(journalFile);
 
         File derbyFile = new File(DERBY_ROOT + "testJournaledJDBCConfig/derbydb"); // Default
-                                                                                    // derby
-                                                                                    // name
+        // derby
+        // name
         recursiveDelete(derbyFile);
 
         BrokerService broker;
@@ -113,7 +120,7 @@ public class ConfigTest extends TestCase
             // System.out.print("Checking persistence adapter factory
             // settings... ");
             broker.getPersistenceAdapter();
-            
+
             assertTrue(broker.getSystemUsage().getStoreUsage().getStore() instanceof JournalPersistenceAdapter);
 
             LOG.info("Success");
@@ -132,10 +139,10 @@ public class ConfigTest extends TestCase
         // ");     
         File journalFile = new File(JOURNAL_ROOT + "testJDBCConfig/journal");
         recursiveDelete(journalFile);
-        
+
         File derbyFile = new File(DERBY_ROOT + "testJDBCConfig/derbydb"); // Default
-                                                                            // derby
-                                                                            // name
+        // derby
+        // name
         recursiveDelete(derbyFile);
 
         BrokerService broker;
@@ -146,9 +153,9 @@ public class ConfigTest extends TestCase
             PersistenceAdapter adapter = broker.getPersistenceAdapter();
 
             assertTrue("Should have created a jdbc persistence adapter", adapter instanceof JDBCPersistenceAdapter);
-            assertEquals("JDBC Adapter Config Error (cleanupPeriod)", 60000, ((JDBCPersistenceAdapter)adapter).getCleanupPeriod());
-            assertTrue("Should have created an EmbeddedDataSource", ((JDBCPersistenceAdapter)adapter).getDataSource() instanceof EmbeddedDataSource);
-            assertTrue("Should have created a DefaultWireFormat", ((JDBCPersistenceAdapter)adapter).getWireFormat() instanceof ObjectStreamWireFormat);
+            assertEquals("JDBC Adapter Config Error (cleanupPeriod)", 60000, ((JDBCPersistenceAdapter) adapter).getCleanupPeriod());
+            assertTrue("Should have created an EmbeddedDataSource", ((JDBCPersistenceAdapter) adapter).getDataSource() instanceof EmbeddedDataSource);
+            assertTrue("Should have created a DefaultWireFormat", ((JDBCPersistenceAdapter) adapter).getWireFormat() instanceof ObjectStreamWireFormat);
 
             LOG.info("Success");
         } finally {
@@ -159,7 +166,7 @@ public class ConfigTest extends TestCase
     }
 
     public void testJdbcLockConfigOverride() throws Exception {
-      
+
         JDBCPersistenceAdapter adapter = new JDBCPersistenceAdapter();
         Mockery context = new Mockery();
         final DataSource dataSource = context.mock(DataSource.class);
@@ -168,24 +175,27 @@ public class ConfigTest extends TestCase
         final ResultSet result = context.mock(ResultSet.class);
         adapter.setDataSource(dataSource);
         adapter.setCreateTablesOnStartup(false);
-        
+
         context.checking(new Expectations() {{
-            allowing (dataSource).getConnection(); will (returnValue(connection));
-            allowing (connection).getMetaData(); will (returnValue(metadata));
-            allowing (connection);
-            allowing (metadata).getDriverName(); will (returnValue("Microsoft_SQL_Server_2005_jdbc_driver"));
-            allowing (result).next(); will (returnValue(true));
+            allowing(dataSource).getConnection();
+            will(returnValue(connection));
+            allowing(connection).getMetaData();
+            will(returnValue(metadata));
+            allowing(connection);
+            allowing(metadata).getDriverName();
+            will(returnValue("Microsoft_SQL_Server_2005_jdbc_driver"));
+            allowing(result).next();
+            will(returnValue(true));
         }});
-        
+
         adapter.start();
         assertTrue("has the locker override", adapter.getDatabaseLocker() instanceof TransactDatabaseLocker);
         adapter.stop();
     }
 
-    
 
     public void testJdbcLockConfigDefault() throws Exception {
-      
+
         JDBCPersistenceAdapter adapter = new JDBCPersistenceAdapter();
         Mockery context = new Mockery();
         final DataSource dataSource = context.mock(DataSource.class);
@@ -194,15 +204,19 @@ public class ConfigTest extends TestCase
         final ResultSet result = context.mock(ResultSet.class);
         adapter.setDataSource(dataSource);
         adapter.setCreateTablesOnStartup(false);
-        
+
         context.checking(new Expectations() {{
-            allowing (dataSource).getConnection(); will (returnValue(connection));
-            allowing (connection).getMetaData(); will (returnValue(metadata));
-            allowing (connection);
-            allowing (metadata).getDriverName(); will (returnValue("Some_Unknown_driver"));
-            allowing (result).next(); will (returnValue(true));
+            allowing(dataSource).getConnection();
+            will(returnValue(connection));
+            allowing(connection).getMetaData();
+            will(returnValue(metadata));
+            allowing(connection);
+            allowing(metadata).getDriverName();
+            will(returnValue("Some_Unknown_driver"));
+            allowing(result).next();
+            will(returnValue(true));
         }});
-        
+
         adapter.start();
         assertEquals("has the default locker", adapter.getDatabaseLocker().getClass(), DefaultDatabaseLocker.class);
         adapter.stop();
@@ -245,9 +259,9 @@ public class ConfigTest extends TestCase
             // System.out.print("Checking transport connectors... ");
             List connectors = broker.getTransportConnectors();
             assertTrue("Should have created at least 3 connectors", connectors.size() >= 3);
-            assertTrue("1st connector should be TcpTransportServer", ((TransportConnector)connectors.get(0)).getServer() instanceof TcpTransportServer);
-            assertTrue("2nd connector should be TcpTransportServer", ((TransportConnector)connectors.get(1)).getServer() instanceof TcpTransportServer);
-            assertTrue("3rd connector should be TcpTransportServer", ((TransportConnector)connectors.get(2)).getServer() instanceof TcpTransportServer);
+            assertTrue("1st connector should be TcpTransportServer", ((TransportConnector) connectors.get(0)).getServer() instanceof TcpTransportServer);
+            assertTrue("2nd connector should be TcpTransportServer", ((TransportConnector) connectors.get(1)).getServer() instanceof TcpTransportServer);
+            assertTrue("3rd connector should be TcpTransportServer", ((TransportConnector) connectors.get(2)).getServer() instanceof TcpTransportServer);
 
             // Check network connectors
             // System.out.print("Checking network connectors... ");
@@ -260,15 +274,15 @@ public class ConfigTest extends TestCase
 
             dest = new ActiveMQTopic("Topic.SimpleDispatch");
             assertTrue("Should have a simple dispatch policy for " + dest.getTopicName(),
-                       broker.getDestinationPolicy().getEntryFor(dest).getDispatchPolicy() instanceof SimpleDispatchPolicy);
+                    broker.getDestinationPolicy().getEntryFor(dest).getDispatchPolicy() instanceof SimpleDispatchPolicy);
 
             dest = new ActiveMQTopic("Topic.RoundRobinDispatch");
             assertTrue("Should have a round robin dispatch policy for " + dest.getTopicName(),
-                       broker.getDestinationPolicy().getEntryFor(dest).getDispatchPolicy() instanceof RoundRobinDispatchPolicy);
+                    broker.getDestinationPolicy().getEntryFor(dest).getDispatchPolicy() instanceof RoundRobinDispatchPolicy);
 
             dest = new ActiveMQTopic("Topic.StrictOrderDispatch");
             assertTrue("Should have a strict order dispatch policy for " + dest.getTopicName(),
-                       broker.getDestinationPolicy().getEntryFor(dest).getDispatchPolicy() instanceof StrictOrderDispatchPolicy);
+                    broker.getDestinationPolicy().getEntryFor(dest).getDispatchPolicy() instanceof StrictOrderDispatchPolicy);
             LOG.info("Success");
 
             // Check subscription policy configuration
@@ -278,8 +292,8 @@ public class ConfigTest extends TestCase
             dest = new ActiveMQTopic("Topic.FixedSizedSubs");
             subsPolicy = broker.getDestinationPolicy().getEntryFor(dest).getSubscriptionRecoveryPolicy();
             assertTrue("Should have a fixed sized subscription recovery policy for " + dest.getTopicName(), subsPolicy instanceof FixedSizedSubscriptionRecoveryPolicy);
-            assertEquals("FixedSizedSubsPolicy Config Error (maximumSize)", 2000000, ((FixedSizedSubscriptionRecoveryPolicy)subsPolicy).getMaximumSize());
-            assertEquals("FixedSizedSubsPolicy Config Error (useSharedBuffer)", false, ((FixedSizedSubscriptionRecoveryPolicy)subsPolicy).isUseSharedBuffer());
+            assertEquals("FixedSizedSubsPolicy Config Error (maximumSize)", 2000000, ((FixedSizedSubscriptionRecoveryPolicy) subsPolicy).getMaximumSize());
+            assertEquals("FixedSizedSubsPolicy Config Error (useSharedBuffer)", false, ((FixedSizedSubscriptionRecoveryPolicy) subsPolicy).isUseSharedBuffer());
 
             dest = new ActiveMQTopic("Topic.LastImageSubs");
             subsPolicy = broker.getDestinationPolicy().getEntryFor(dest).getSubscriptionRecoveryPolicy();
@@ -292,7 +306,7 @@ public class ConfigTest extends TestCase
             dest = new ActiveMQTopic("Topic.TimedSubs");
             subsPolicy = broker.getDestinationPolicy().getEntryFor(dest).getSubscriptionRecoveryPolicy();
             assertTrue("Should have a timed subscription recovery policy for " + dest.getTopicName(), subsPolicy instanceof TimedSubscriptionRecoveryPolicy);
-            assertEquals("TimedSubsPolicy Config Error (recoverDuration)", 25000, ((TimedSubscriptionRecoveryPolicy)subsPolicy).getRecoverDuration());
+            assertEquals("TimedSubsPolicy Config Error (recoverDuration)", 25000, ((TimedSubscriptionRecoveryPolicy) subsPolicy).getRecoverDuration());
             LOG.info("Success");
 
             // Check usage manager
@@ -304,10 +318,10 @@ public class ConfigTest extends TestCase
             assertEquals("SystemUsage Config Error (TempUsage.limit)", 1024 * 1024 * 100, systemUsage.getTempUsage().getLimit());
             assertEquals("SystemUsage Config Error (StoreUsage.limit)", 1024 * 1024 * 1024, systemUsage.getStoreUsage().getLimit());
             assertEquals("SystemUsage Config Error (StoreUsage.name)", "foo", systemUsage.getStoreUsage().getName());
-            
+
             assertNotNull(systemUsage.getStoreUsage().getStore());
             assertTrue(systemUsage.getStoreUsage().getStore() instanceof MemoryPersistenceAdapter);
-                        
+
             LOG.info("Success");
 
         } finally {
@@ -379,6 +393,66 @@ public class ConfigTest extends TestCase
 
     }
 
+    public void testConnectorConfig() throws Exception {
+        // System.out.print("Checking memory persistence adapter
+        // configuration... ");
+
+        File journalFile = new File(JOURNAL_ROOT + "testMemoryConfig");
+        recursiveDelete(journalFile);
+
+        File derbyFile = new File(DERBY_ROOT + "testMemoryConfig");
+        recursiveDelete(derbyFile);
+
+        final int MAX_PRODUCERS = 5;
+        final int MAX_CONSUMERS = 10;
+
+        BrokerService broker = createBroker(new FileSystemResource(CONF_ROOT + "connector-properties.xml"));
+        broker.start();
+        try {
+
+            assertEquals(broker.getTransportConnectorByScheme("tcp").getMaximumProducersAllowedPerConnection(), MAX_PRODUCERS);
+            assertEquals(broker.getTransportConnectorByScheme("tcp").getMaximumConsumersAllowedPerConnection(), MAX_CONSUMERS);
+
+            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61631");
+            javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
+            final CountDownLatch latch = new CountDownLatch(1);
+            connection.setExceptionListener(new ExceptionListener() {
+                public void onException(JMSException e) {
+                    if (e.getCause() instanceof IllegalStateException) {
+                        latch.countDown();
+                    }
+                }
+            });
+            connection.start();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Topic topic = session.createTopic("test.foo");
+
+            for (int i = 0; i < (MAX_PRODUCERS + 1); i++) {
+                MessageProducer messageProducer = session.createProducer(topic);
+            }
+
+            latch.await(5, TimeUnit.SECONDS);
+            if (latch.getCount() > 0) {
+                fail("Should have got an exception");
+            }
+            try {
+                for (int i = 0; i < (MAX_CONSUMERS + 1); i++) {
+                    MessageConsumer consumer = session.createConsumer(topic);
+                }
+                fail("Should have caught an exception");
+            } catch (JMSException e) {
+            }
+
+
+            LOG.info("Success");
+        } finally {
+            if (broker != null) {
+                broker.stop();
+            }
+        }
+
+    }
+
     public void testXmlConfigHelper() throws Exception {
         BrokerService broker;
 

Copied: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/connector-properties.xml (from r1327942, activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/memory-example.xml)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/connector-properties.xml?p2=activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/connector-properties.xml&p1=activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/memory-example.xml&r1=1327942&r2=1328287&rev=1328287&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/memory-example.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/connector-properties.xml Fri Apr 20 09:51:55 2012
@@ -23,14 +23,14 @@
   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
 
-    <amq:broker brokerName="brokerMemoryConfigTest" persistent="false" useShutdownHook="false" deleteAllMessagesOnStartup="true">
+    <amq:broker brokerName="brokerConnectorTest" persistent="false" useShutdownHook="false" deleteAllMessagesOnStartup="true">
 
         <amq:persistenceAdapter>
-            <amq:memoryPersistenceAdapter createTransactionStore = "true"/>
+            <amq:memoryPersistenceAdapter createTransactionStore = "false"/>
         </amq:persistenceAdapter>
 
         <amq:transportConnectors>
-            <amq:transportConnector uri="tcp://localhost:61635"/>
+                <amq:transportConnector uri="tcp://localhost:61631" maximumProducersAllowedPerConnection="5"  maximumConsumersAllowedPerConnection="10"/>
         </amq:transportConnectors>
 
     </amq:broker>