You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2012/04/20 11:51:55 UTC
svn commit: r1328287 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/advisory/
main/java/org/apache/activemq/broker/ test/java/org/apache/activemq/config/
test/resources/org/apache/activemq/config/sample-conf/
Author: rajdavies
Date: Fri Apr 20 09:51:55 2012
New Revision: 1328287
URL: http://svn.apache.org/viewvc?rev=1328287&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-3813
Added:
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/connector-properties.xml
- copied, changed from r1327942, activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/memory-example.xml
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/config/ConfigTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=1328287&r1=1328286&r2=1328287&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java Fri Apr 20 09:51:55 2012
@@ -23,6 +23,7 @@ import javax.jms.JMSException;
import org.apache.activemq.ActiveMQMessageTransformation;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
+
public final class AdvisorySupport {
public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX
@@ -64,7 +65,7 @@ public final class AdvisorySupport {
public static final ActiveMQTopic ALL_DESTINATIONS_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
TOPIC_ADVISORY_TOPIC.getPhysicalName() + "," + QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," +
- TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
+ TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
@@ -256,16 +257,16 @@ public final class AdvisorySupport {
public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
switch (destination.getDestinationType()) {
- case ActiveMQDestination.QUEUE_TYPE:
- return QUEUE_ADVISORY_TOPIC;
- case ActiveMQDestination.TOPIC_TYPE:
- return TOPIC_ADVISORY_TOPIC;
- case ActiveMQDestination.TEMP_QUEUE_TYPE:
- return TEMP_QUEUE_ADVISORY_TOPIC;
- case ActiveMQDestination.TEMP_TOPIC_TYPE:
- return TEMP_TOPIC_ADVISORY_TOPIC;
- default:
- throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
+ case ActiveMQDestination.QUEUE_TYPE:
+ return QUEUE_ADVISORY_TOPIC;
+ case ActiveMQDestination.TOPIC_TYPE:
+ return TOPIC_ADVISORY_TOPIC;
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ return TEMP_QUEUE_ADVISORY_TOPIC;
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ return TEMP_TOPIC_ADVISORY_TOPIC;
+ default:
+ throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
}
}
@@ -307,17 +308,20 @@ public final class AdvisorySupport {
}
public static boolean isAdvisoryTopic(ActiveMQDestination destination) {
- if (destination.isComposite()) {
- ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
- for (int i = 0; i < compositeDestinations.length; i++) {
- if (isAdvisoryTopic(compositeDestinations[i])) {
- return true;
- }
+ if (destination != null) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
}
- return false;
- } else {
- return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
}
+ return false;
}
public static boolean isConnectionAdvisoryTopic(Destination destination) throws JMSException {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1328287&r1=1328286&r2=1328287&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Apr 20 09:51:55 2012
@@ -1265,6 +1265,24 @@ public class BrokerService implements Se
}
}
+ public TransportConnector getTransportConnectorByName(String name){
+ for (TransportConnector transportConnector:transportConnectors){
+ if (name.equals(transportConnector.getName())){
+ return transportConnector;
+ }
+ }
+ return null;
+ }
+
+ public TransportConnector getTransportConnectorByScheme(String scheme){
+ for (TransportConnector transportConnector:transportConnectors){
+ if (scheme.equals(transportConnector.getUri().getScheme())){
+ return transportConnector;
+ }
+ }
+ return null;
+ }
+
public List<NetworkConnector> getNetworkConnectors() {
return new ArrayList<NetworkConnector>(networkConnectors);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1328287&r1=1328286&r2=1328287&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Fri Apr 20 09:51:55 2012
@@ -16,6 +16,27 @@
*/
package org.apache.activemq.broker;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.transaction.xa.XAResource;
+import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
@@ -49,26 +70,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import javax.transaction.xa.XAResource;
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.SocketException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
public class TransportConnection implements Connection, Task, CommandVisitor {
private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
@@ -298,7 +299,7 @@ public class TransportConnection impleme
+ " command: " + command + ", exception: " + e, e);
}
- if(e instanceof java.lang.SecurityException){
+ if (e instanceof java.lang.SecurityException) {
// still need to close this down - in case the peer of this transport doesn't play nice
delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
}
@@ -504,12 +505,19 @@ public class TransportConnection impleme
}
// Avoid replaying dup commands
if (!ss.getProducerIds().contains(info.getProducerId())) {
+ ActiveMQDestination destination = info.getDestination();
+ if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
+ if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
+ throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
+ }
+ }
broker.addProducer(cs.getContext(), info);
try {
ss.addProducer(info);
} catch (IllegalStateException e) {
broker.removeProducer(cs.getContext(), info);
}
+
}
return null;
}
@@ -547,6 +555,13 @@ public class TransportConnection impleme
}
// Avoid replaying dup commands
if (!ss.getConsumerIds().contains(info.getConsumerId())) {
+ ActiveMQDestination destination = info.getDestination();
+ if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
+ if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){
+ throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection());
+ }
+ }
+
broker.addConsumer(cs.getContext(), info);
try {
ss.addConsumer(info);
@@ -554,6 +569,7 @@ public class TransportConnection impleme
} catch (IllegalStateException e) {
broker.removeConsumer(cs.getContext(), info);
}
+
}
return null;
}
@@ -1280,7 +1296,7 @@ public class TransportConnection impleme
return null;
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings({"unchecked", "rawtypes"})
private HashMap<String, String> createMap(Properties properties) {
return new HashMap(properties);
}
@@ -1476,4 +1492,32 @@ public class TransportConnection impleme
protected CountDownLatch getStopped() {
return stopped;
}
+
+ private int getProducerCount(ConnectionId connectionId) {
+ int result = 0;
+ TransportConnectionState cs = lookupConnectionState(connectionId);
+ if (cs != null) {
+ for (SessionId sessionId : cs.getSessionIds()) {
+ SessionState sessionState = cs.getSessionState(sessionId);
+ if (sessionState != null) {
+ result += sessionState.getProducerIds().size();
+ }
+ }
+ }
+ return result;
+ }
+
+ private int getConsumerCount(ConnectionId connectionId) {
+ int result = 0;
+ TransportConnectionState cs = lookupConnectionState(connectionId);
+ if (cs != null) {
+ for (SessionId sessionId : cs.getSessionIds()) {
+ SessionState sessionState = cs.getSessionState(sessionId);
+ if (sessionState != null) {
+ result += sessionState.getConsumerIds().size();
+ }
+ }
+ }
+ return result;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=1328287&r1=1328286&r2=1328287&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Fri Apr 20 09:51:55 2012
@@ -16,6 +16,16 @@
*/
package org.apache.activemq.broker;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.StringTokenizer;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.regex.Pattern;
+
+import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.ManagedTransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.ConnectorStatistics;
@@ -35,16 +45,6 @@ import org.apache.activemq.util.ServiceS
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.StringTokenizer;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.regex.Pattern;
-
/**
* @org.apache.xbean.XBean
*
@@ -74,6 +74,8 @@ public class TransportConnector implemen
private boolean updateClusterClientsOnRemove = false;
private String updateClusterFilter;
private boolean auditNetworkProducers = false;
+ private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
+ private int maximumConsumersAllowedPerConnection = Integer.MAX_VALUE;
LinkedList<String> peerBrokers = new LinkedList<String>();
@@ -122,6 +124,8 @@ public class TransportConnector implemen
rc.setUpdateClusterFilter(getUpdateClusterFilter());
rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
rc.setAuditNetworkProducers(isAuditNetworkProducers());
+ rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
+ rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
return rc;
}
@@ -599,4 +603,21 @@ public class TransportConnector implemen
public void setAuditNetworkProducers(boolean auditNetworkProducers) {
this.auditNetworkProducers = auditNetworkProducers;
}
+
+ public int getMaximumProducersAllowedPerConnection() {
+ return maximumProducersAllowedPerConnection;
+ }
+
+ public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) {
+ this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection;
+ }
+
+ public int getMaximumConsumersAllowedPerConnection() {
+ return maximumConsumersAllowedPerConnection;
+ }
+
+ public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) {
+ this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/config/ConfigTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/config/ConfigTest.java?rev=1328287&r1=1328286&r2=1328287&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/config/ConfigTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/config/ConfigTest.java Fri Apr 20 09:51:55 2012
@@ -21,11 +21,18 @@ import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
import javax.sql.DataSource;
-
import junit.framework.TestCase;
-
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPolicy;
@@ -47,17 +54,17 @@ import org.apache.activemq.transport.tcp
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.wireformat.ObjectStreamWireFormat;
import org.apache.activemq.xbean.BrokerFactoryBean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.jmock.Expectations;
import org.jmock.Mockery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
/**
- *
+ *
*/
public class ConfigTest extends TestCase {
@@ -94,8 +101,8 @@ public class ConfigTest extends TestCase
recursiveDelete(journalFile);
File derbyFile = new File(DERBY_ROOT + "testJournaledJDBCConfig/derbydb"); // Default
- // derby
- // name
+ // derby
+ // name
recursiveDelete(derbyFile);
BrokerService broker;
@@ -113,7 +120,7 @@ public class ConfigTest extends TestCase
// System.out.print("Checking persistence adapter factory
// settings... ");
broker.getPersistenceAdapter();
-
+
assertTrue(broker.getSystemUsage().getStoreUsage().getStore() instanceof JournalPersistenceAdapter);
LOG.info("Success");
@@ -132,10 +139,10 @@ public class ConfigTest extends TestCase
// ");
File journalFile = new File(JOURNAL_ROOT + "testJDBCConfig/journal");
recursiveDelete(journalFile);
-
+
File derbyFile = new File(DERBY_ROOT + "testJDBCConfig/derbydb"); // Default
- // derby
- // name
+ // derby
+ // name
recursiveDelete(derbyFile);
BrokerService broker;
@@ -146,9 +153,9 @@ public class ConfigTest extends TestCase
PersistenceAdapter adapter = broker.getPersistenceAdapter();
assertTrue("Should have created a jdbc persistence adapter", adapter instanceof JDBCPersistenceAdapter);
- assertEquals("JDBC Adapter Config Error (cleanupPeriod)", 60000, ((JDBCPersistenceAdapter)adapter).getCleanupPeriod());
- assertTrue("Should have created an EmbeddedDataSource", ((JDBCPersistenceAdapter)adapter).getDataSource() instanceof EmbeddedDataSource);
- assertTrue("Should have created a DefaultWireFormat", ((JDBCPersistenceAdapter)adapter).getWireFormat() instanceof ObjectStreamWireFormat);
+ assertEquals("JDBC Adapter Config Error (cleanupPeriod)", 60000, ((JDBCPersistenceAdapter) adapter).getCleanupPeriod());
+ assertTrue("Should have created an EmbeddedDataSource", ((JDBCPersistenceAdapter) adapter).getDataSource() instanceof EmbeddedDataSource);
+ assertTrue("Should have created a DefaultWireFormat", ((JDBCPersistenceAdapter) adapter).getWireFormat() instanceof ObjectStreamWireFormat);
LOG.info("Success");
} finally {
@@ -159,7 +166,7 @@ public class ConfigTest extends TestCase
}
public void testJdbcLockConfigOverride() throws Exception {
-
+
JDBCPersistenceAdapter adapter = new JDBCPersistenceAdapter();
Mockery context = new Mockery();
final DataSource dataSource = context.mock(DataSource.class);
@@ -168,24 +175,27 @@ public class ConfigTest extends TestCase
final ResultSet result = context.mock(ResultSet.class);
adapter.setDataSource(dataSource);
adapter.setCreateTablesOnStartup(false);
-
+
context.checking(new Expectations() {{
- allowing (dataSource).getConnection(); will (returnValue(connection));
- allowing (connection).getMetaData(); will (returnValue(metadata));
- allowing (connection);
- allowing (metadata).getDriverName(); will (returnValue("Microsoft_SQL_Server_2005_jdbc_driver"));
- allowing (result).next(); will (returnValue(true));
+ allowing(dataSource).getConnection();
+ will(returnValue(connection));
+ allowing(connection).getMetaData();
+ will(returnValue(metadata));
+ allowing(connection);
+ allowing(metadata).getDriverName();
+ will(returnValue("Microsoft_SQL_Server_2005_jdbc_driver"));
+ allowing(result).next();
+ will(returnValue(true));
}});
-
+
adapter.start();
assertTrue("has the locker override", adapter.getDatabaseLocker() instanceof TransactDatabaseLocker);
adapter.stop();
}
-
public void testJdbcLockConfigDefault() throws Exception {
-
+
JDBCPersistenceAdapter adapter = new JDBCPersistenceAdapter();
Mockery context = new Mockery();
final DataSource dataSource = context.mock(DataSource.class);
@@ -194,15 +204,19 @@ public class ConfigTest extends TestCase
final ResultSet result = context.mock(ResultSet.class);
adapter.setDataSource(dataSource);
adapter.setCreateTablesOnStartup(false);
-
+
context.checking(new Expectations() {{
- allowing (dataSource).getConnection(); will (returnValue(connection));
- allowing (connection).getMetaData(); will (returnValue(metadata));
- allowing (connection);
- allowing (metadata).getDriverName(); will (returnValue("Some_Unknown_driver"));
- allowing (result).next(); will (returnValue(true));
+ allowing(dataSource).getConnection();
+ will(returnValue(connection));
+ allowing(connection).getMetaData();
+ will(returnValue(metadata));
+ allowing(connection);
+ allowing(metadata).getDriverName();
+ will(returnValue("Some_Unknown_driver"));
+ allowing(result).next();
+ will(returnValue(true));
}});
-
+
adapter.start();
assertEquals("has the default locker", adapter.getDatabaseLocker().getClass(), DefaultDatabaseLocker.class);
adapter.stop();
@@ -245,9 +259,9 @@ public class ConfigTest extends TestCase
// System.out.print("Checking transport connectors... ");
List connectors = broker.getTransportConnectors();
assertTrue("Should have created at least 3 connectors", connectors.size() >= 3);
- assertTrue("1st connector should be TcpTransportServer", ((TransportConnector)connectors.get(0)).getServer() instanceof TcpTransportServer);
- assertTrue("2nd connector should be TcpTransportServer", ((TransportConnector)connectors.get(1)).getServer() instanceof TcpTransportServer);
- assertTrue("3rd connector should be TcpTransportServer", ((TransportConnector)connectors.get(2)).getServer() instanceof TcpTransportServer);
+ assertTrue("1st connector should be TcpTransportServer", ((TransportConnector) connectors.get(0)).getServer() instanceof TcpTransportServer);
+ assertTrue("2nd connector should be TcpTransportServer", ((TransportConnector) connectors.get(1)).getServer() instanceof TcpTransportServer);
+ assertTrue("3rd connector should be TcpTransportServer", ((TransportConnector) connectors.get(2)).getServer() instanceof TcpTransportServer);
// Check network connectors
// System.out.print("Checking network connectors... ");
@@ -260,15 +274,15 @@ public class ConfigTest extends TestCase
dest = new ActiveMQTopic("Topic.SimpleDispatch");
assertTrue("Should have a simple dispatch policy for " + dest.getTopicName(),
- broker.getDestinationPolicy().getEntryFor(dest).getDispatchPolicy() instanceof SimpleDispatchPolicy);
+ broker.getDestinationPolicy().getEntryFor(dest).getDispatchPolicy() instanceof SimpleDispatchPolicy);
dest = new ActiveMQTopic("Topic.RoundRobinDispatch");
assertTrue("Should have a round robin dispatch policy for " + dest.getTopicName(),
- broker.getDestinationPolicy().getEntryFor(dest).getDispatchPolicy() instanceof RoundRobinDispatchPolicy);
+ broker.getDestinationPolicy().getEntryFor(dest).getDispatchPolicy() instanceof RoundRobinDispatchPolicy);
dest = new ActiveMQTopic("Topic.StrictOrderDispatch");
assertTrue("Should have a strict order dispatch policy for " + dest.getTopicName(),
- broker.getDestinationPolicy().getEntryFor(dest).getDispatchPolicy() instanceof StrictOrderDispatchPolicy);
+ broker.getDestinationPolicy().getEntryFor(dest).getDispatchPolicy() instanceof StrictOrderDispatchPolicy);
LOG.info("Success");
// Check subscription policy configuration
@@ -278,8 +292,8 @@ public class ConfigTest extends TestCase
dest = new ActiveMQTopic("Topic.FixedSizedSubs");
subsPolicy = broker.getDestinationPolicy().getEntryFor(dest).getSubscriptionRecoveryPolicy();
assertTrue("Should have a fixed sized subscription recovery policy for " + dest.getTopicName(), subsPolicy instanceof FixedSizedSubscriptionRecoveryPolicy);
- assertEquals("FixedSizedSubsPolicy Config Error (maximumSize)", 2000000, ((FixedSizedSubscriptionRecoveryPolicy)subsPolicy).getMaximumSize());
- assertEquals("FixedSizedSubsPolicy Config Error (useSharedBuffer)", false, ((FixedSizedSubscriptionRecoveryPolicy)subsPolicy).isUseSharedBuffer());
+ assertEquals("FixedSizedSubsPolicy Config Error (maximumSize)", 2000000, ((FixedSizedSubscriptionRecoveryPolicy) subsPolicy).getMaximumSize());
+ assertEquals("FixedSizedSubsPolicy Config Error (useSharedBuffer)", false, ((FixedSizedSubscriptionRecoveryPolicy) subsPolicy).isUseSharedBuffer());
dest = new ActiveMQTopic("Topic.LastImageSubs");
subsPolicy = broker.getDestinationPolicy().getEntryFor(dest).getSubscriptionRecoveryPolicy();
@@ -292,7 +306,7 @@ public class ConfigTest extends TestCase
dest = new ActiveMQTopic("Topic.TimedSubs");
subsPolicy = broker.getDestinationPolicy().getEntryFor(dest).getSubscriptionRecoveryPolicy();
assertTrue("Should have a timed subscription recovery policy for " + dest.getTopicName(), subsPolicy instanceof TimedSubscriptionRecoveryPolicy);
- assertEquals("TimedSubsPolicy Config Error (recoverDuration)", 25000, ((TimedSubscriptionRecoveryPolicy)subsPolicy).getRecoverDuration());
+ assertEquals("TimedSubsPolicy Config Error (recoverDuration)", 25000, ((TimedSubscriptionRecoveryPolicy) subsPolicy).getRecoverDuration());
LOG.info("Success");
// Check usage manager
@@ -304,10 +318,10 @@ public class ConfigTest extends TestCase
assertEquals("SystemUsage Config Error (TempUsage.limit)", 1024 * 1024 * 100, systemUsage.getTempUsage().getLimit());
assertEquals("SystemUsage Config Error (StoreUsage.limit)", 1024 * 1024 * 1024, systemUsage.getStoreUsage().getLimit());
assertEquals("SystemUsage Config Error (StoreUsage.name)", "foo", systemUsage.getStoreUsage().getName());
-
+
assertNotNull(systemUsage.getStoreUsage().getStore());
assertTrue(systemUsage.getStoreUsage().getStore() instanceof MemoryPersistenceAdapter);
-
+
LOG.info("Success");
} finally {
@@ -379,6 +393,66 @@ public class ConfigTest extends TestCase
}
+ public void testConnectorConfig() throws Exception {
+ // System.out.print("Checking memory persistence adapter
+ // configuration... ");
+
+ File journalFile = new File(JOURNAL_ROOT + "testMemoryConfig");
+ recursiveDelete(journalFile);
+
+ File derbyFile = new File(DERBY_ROOT + "testMemoryConfig");
+ recursiveDelete(derbyFile);
+
+ final int MAX_PRODUCERS = 5;
+ final int MAX_CONSUMERS = 10;
+
+ BrokerService broker = createBroker(new FileSystemResource(CONF_ROOT + "connector-properties.xml"));
+ broker.start();
+ try {
+
+ assertEquals(broker.getTransportConnectorByScheme("tcp").getMaximumProducersAllowedPerConnection(), MAX_PRODUCERS);
+ assertEquals(broker.getTransportConnectorByScheme("tcp").getMaximumConsumersAllowedPerConnection(), MAX_CONSUMERS);
+
+ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61631");
+ javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
+ final CountDownLatch latch = new CountDownLatch(1);
+ connection.setExceptionListener(new ExceptionListener() {
+ public void onException(JMSException e) {
+ if (e.getCause() instanceof IllegalStateException) {
+ latch.countDown();
+ }
+ }
+ });
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("test.foo");
+
+ for (int i = 0; i < (MAX_PRODUCERS + 1); i++) {
+ MessageProducer messageProducer = session.createProducer(topic);
+ }
+
+ latch.await(5, TimeUnit.SECONDS);
+ if (latch.getCount() > 0) {
+ fail("Should have got an exception");
+ }
+ try {
+ for (int i = 0; i < (MAX_CONSUMERS + 1); i++) {
+ MessageConsumer consumer = session.createConsumer(topic);
+ }
+ fail("Should have caught an exception");
+ } catch (JMSException e) {
+ }
+
+
+ LOG.info("Success");
+ } finally {
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+
+ }
+
public void testXmlConfigHelper() throws Exception {
BrokerService broker;
Copied: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/connector-properties.xml (from r1327942, activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/memory-example.xml)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/connector-properties.xml?p2=activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/connector-properties.xml&p1=activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/memory-example.xml&r1=1327942&r2=1328287&rev=1328287&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/memory-example.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/config/sample-conf/connector-properties.xml Fri Apr 20 09:51:55 2012
@@ -23,14 +23,14 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
- <amq:broker brokerName="brokerMemoryConfigTest" persistent="false" useShutdownHook="false" deleteAllMessagesOnStartup="true">
+ <amq:broker brokerName="brokerConnectorTest" persistent="false" useShutdownHook="false" deleteAllMessagesOnStartup="true">
<amq:persistenceAdapter>
- <amq:memoryPersistenceAdapter createTransactionStore = "true"/>
+ <amq:memoryPersistenceAdapter createTransactionStore = "false"/>
</amq:persistenceAdapter>
<amq:transportConnectors>
- <amq:transportConnector uri="tcp://localhost:61635"/>
+ <amq:transportConnector uri="tcp://localhost:61631" maximumProducersAllowedPerConnection="5" maximumConsumersAllowedPerConnection="10"/>
</amq:transportConnectors>
</amq:broker>