You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/02/22 16:32:26 UTC
svn commit: r1449077 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/broker/
activemq-broker/src/main/java/org/apache/activemq/broker/jmx/
activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/
activemq-unit-tests/src/...
Author: tabish
Date: Fri Feb 22 15:32:26 2013
New Revision: 1449077
URL: http://svn.apache.org/r1449077
Log:
https://issues.apache.org/jira/browse/AMQ-4330
Merge back the isSlave() JMS API.
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1449077&r1=1449076&r2=1449077&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Feb 22 15:32:26 2013
@@ -232,6 +232,7 @@ public class BrokerService implements Se
private Throwable startException = null;
private boolean startAsync = false;
private Date startDate;
+ private boolean slave = true;
static {
String localHostName = "localhost";
@@ -706,6 +707,7 @@ public class BrokerService implements Se
}
}
stopAllConnectors(stopper);
+ this.slave = true;
// remove any VMTransports connected
// this has to be done after services are stopped,
// to avoid timing issue with discovery (spinning up a new instance)
@@ -2354,6 +2356,7 @@ public class BrokerService implements Se
this.transportConnectors.clear();
setTransportConnectors(al);
}
+ this.slave = false;
URI uri = getVmConnectorURI();
Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
map.put("network", "true");
@@ -2821,4 +2824,9 @@ public class BrokerService implements Se
public void setStartAsync(boolean startAsync) {
this.startAsync = startAsync;
}
+
+ public boolean isSlave() {
+ return this.slave;
+ }
+
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1449077&r1=1449076&r2=1449077&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Fri Feb 22 15:32:26 2013
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
+
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionMetaData;
@@ -67,14 +68,17 @@ public class BrokerView implements Broke
this.broker = broker;
}
+ @Override
public String getBrokerId() {
return safeGetBroker().getBrokerId().toString();
}
+ @Override
public String getBrokerName() {
return safeGetBroker().getBrokerName();
}
+ @Override
public String getBrokerVersion() {
return ActiveMQConnectionMetaData.PROVIDER_VERSION;
}
@@ -84,6 +88,7 @@ public class BrokerView implements Broke
return brokerService.getUptime();
}
+ @Override
public void gc() throws Exception {
brokerService.getBroker().gc();
try {
@@ -93,35 +98,43 @@ public class BrokerView implements Broke
}
}
+ @Override
public void start() throws Exception {
brokerService.start();
}
+ @Override
public void stop() throws Exception {
brokerService.stop();
}
+ @Override
public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
throws Exception {
brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval);
}
+ @Override
public long getTotalEnqueueCount() {
return safeGetBroker().getDestinationStatistics().getEnqueues().getCount();
}
+ @Override
public long getTotalDequeueCount() {
return safeGetBroker().getDestinationStatistics().getDequeues().getCount();
}
+ @Override
public long getTotalConsumerCount() {
return safeGetBroker().getDestinationStatistics().getConsumers().getCount();
}
+ @Override
public long getTotalProducerCount() {
return safeGetBroker().getDestinationStatistics().getProducers().getCount();
}
+ @Override
public long getTotalMessageCount() {
return safeGetBroker().getDestinationStatistics().getMessages().getCount();
}
@@ -130,138 +143,172 @@ public class BrokerView implements Broke
return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount();
}
+ @Override
public int getMemoryPercentUsage() {
return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage();
}
+ @Override
public long getMemoryLimit() {
return brokerService.getSystemUsage().getMemoryUsage().getLimit();
}
+ @Override
public void setMemoryLimit(long limit) {
brokerService.getSystemUsage().getMemoryUsage().setLimit(limit);
}
+ @Override
public long getStoreLimit() {
return brokerService.getSystemUsage().getStoreUsage().getLimit();
}
+ @Override
public int getStorePercentUsage() {
return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
}
+ @Override
public long getTempLimit() {
return brokerService.getSystemUsage().getTempUsage().getLimit();
}
+ @Override
public int getTempPercentUsage() {
return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
}
-
+
+ @Override
public long getJobSchedulerStoreLimit() {
return brokerService.getSystemUsage().getJobSchedulerUsage().getLimit();
}
-
+
+ @Override
public int getJobSchedulerStorePercentUsage() {
return brokerService.getSystemUsage().getJobSchedulerUsage().getPercentUsage();
}
+ @Override
public void setStoreLimit(long limit) {
brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
}
+ @Override
public void setTempLimit(long limit) {
brokerService.getSystemUsage().getTempUsage().setLimit(limit);
}
-
+
+ @Override
public void setJobSchedulerStoreLimit(long limit) {
brokerService.getSystemUsage().getJobSchedulerUsage().setLimit(limit);
}
+ @Override
public void resetStatistics() {
safeGetBroker().getDestinationStatistics().reset();
}
+ @Override
public void enableStatistics() {
safeGetBroker().getDestinationStatistics().setEnabled(true);
}
+ @Override
public void disableStatistics() {
safeGetBroker().getDestinationStatistics().setEnabled(false);
}
+ @Override
public boolean isStatisticsEnabled() {
return safeGetBroker().getDestinationStatistics().isEnabled();
}
+ @Override
public boolean isPersistent() {
return brokerService.isPersistent();
}
+ @Override
public void terminateJVM(int exitCode) {
System.exit(exitCode);
}
+ @Override
public ObjectName[] getTopics() {
return safeGetBroker().getTopics();
}
+ @Override
public ObjectName[] getQueues() {
return safeGetBroker().getQueues();
}
+ @Override
public ObjectName[] getTemporaryTopics() {
return safeGetBroker().getTemporaryTopics();
}
+ @Override
public ObjectName[] getTemporaryQueues() {
return safeGetBroker().getTemporaryQueues();
}
+ @Override
public ObjectName[] getTopicSubscribers() {
return safeGetBroker().getTopicSubscribers();
}
+ @Override
public ObjectName[] getDurableTopicSubscribers() {
return safeGetBroker().getDurableTopicSubscribers();
}
+ @Override
public ObjectName[] getQueueSubscribers() {
return safeGetBroker().getQueueSubscribers();
}
+ @Override
public ObjectName[] getTemporaryTopicSubscribers() {
return safeGetBroker().getTemporaryTopicSubscribers();
}
+ @Override
public ObjectName[] getTemporaryQueueSubscribers() {
return safeGetBroker().getTemporaryQueueSubscribers();
}
+ @Override
public ObjectName[] getInactiveDurableTopicSubscribers() {
return safeGetBroker().getInactiveDurableTopicSubscribers();
}
+ @Override
public ObjectName[] getTopicProducers() {
return safeGetBroker().getTopicProducers();
}
+ @Override
public ObjectName[] getQueueProducers() {
return safeGetBroker().getQueueProducers();
}
+ @Override
public ObjectName[] getTemporaryTopicProducers() {
return safeGetBroker().getTemporaryTopicProducers();
}
+ @Override
public ObjectName[] getTemporaryQueueProducers() {
return safeGetBroker().getTemporaryQueueProducers();
}
+ @Override
public ObjectName[] getDynamicDestinationProducers() {
return safeGetBroker().getDynamicDestinationProducers();
}
+ @Override
public String addConnector(String discoveryAddress) throws Exception {
TransportConnector connector = brokerService.addConnector(discoveryAddress);
if (connector == null) {
@@ -271,6 +318,7 @@ public class BrokerView implements Broke
return connector.getName();
}
+ @Override
public String addNetworkConnector(String discoveryAddress) throws Exception {
NetworkConnector connector = brokerService.addNetworkConnector(discoveryAddress);
if (connector == null) {
@@ -280,6 +328,7 @@ public class BrokerView implements Broke
return connector.getName();
}
+ @Override
public boolean removeConnector(String connectorName) throws Exception {
TransportConnector connector = brokerService.getConnectorByName(connectorName);
if (connector == null) {
@@ -289,6 +338,7 @@ public class BrokerView implements Broke
return brokerService.removeConnector(connector);
}
+ @Override
public boolean removeNetworkConnector(String connectorName) throws Exception {
NetworkConnector connector = brokerService.getNetworkConnectorByName(connectorName);
if (connector == null) {
@@ -298,22 +348,27 @@ public class BrokerView implements Broke
return brokerService.removeNetworkConnector(connector);
}
+ @Override
public void addTopic(String name) throws Exception {
safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name),true);
}
+ @Override
public void addQueue(String name) throws Exception {
safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name),true);
}
+ @Override
public void removeTopic(String name) throws Exception {
safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), 1000);
}
+ @Override
public void removeQueue(String name) throws Exception {
safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), 1000);
}
+ @Override
public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName,
String selector) throws Exception {
ConnectionContext context = new ConnectionContext();
@@ -336,6 +391,7 @@ public class BrokerView implements Broke
return null;
}
+ @Override
public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception {
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(clientId);
@@ -347,6 +403,7 @@ public class BrokerView implements Broke
}
// doc comment inherited from BrokerViewMBean
+ @Override
public void reloadLog4jProperties() throws Throwable {
// Avoid a direct dependency on log4j.. use reflection.
@@ -379,6 +436,7 @@ public class BrokerView implements Broke
}
}
+ @Override
public Map<String, String> getTransportConnectors() {
Map<String, String> answer = new HashMap<String, String>();
try {
@@ -396,6 +454,7 @@ public class BrokerView implements Broke
return brokerService.getTransportConnectorURIsAsMap().get(type);
}
+ @Override
@Deprecated
/**
* @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
@@ -405,6 +464,7 @@ public class BrokerView implements Broke
return answer != null ? answer : "";
}
+ @Override
@Deprecated
/**
* @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
@@ -414,6 +474,7 @@ public class BrokerView implements Broke
return answer != null ? answer : "";
}
+ @Override
@Deprecated
/**
* @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
@@ -423,6 +484,7 @@ public class BrokerView implements Broke
return answer != null ? answer : "";
}
+ @Override
@Deprecated
/**
* @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
@@ -432,11 +494,13 @@ public class BrokerView implements Broke
return answer != null ? answer : "";
}
+ @Override
public String getVMURL() {
URI answer = brokerService.getVmConnectorURI();
return answer != null ? answer.toString() : "";
}
+ @Override
public String getDataDirectory() {
File file = brokerService.getDataDirectoryFile();
try {
@@ -446,6 +510,7 @@ public class BrokerView implements Broke
}
}
+ @Override
public ObjectName getJMSJobScheduler() {
return this.jmsJobScheduler;
}
@@ -454,6 +519,11 @@ public class BrokerView implements Broke
this.jmsJobScheduler=name;
}
+ @Override
+ public boolean isSlave() {
+ return brokerService.isSlave();
+ }
+
private ManagedRegionBroker safeGetBroker() {
if (broker == null) {
throw new IllegalStateException("Broker is not yet started.");
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=1449077&r1=1449076&r2=1449077&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Fri Feb 22 15:32:26 2013
@@ -123,6 +123,9 @@ public interface BrokerViewMBean extends
@MBeanInfo("Messages are synchronized to disk.")
boolean isPersistent();
+ @MBeanInfo("Slave broker.")
+ boolean isSlave();
+
/**
* Shuts down the JVM.
*
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java?rev=1449077&r1=1449076&r2=1449077&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java Fri Feb 22 15:32:26 2013
@@ -49,6 +49,7 @@ abstract public class QueueMasterSlaveTe
protected int failureCount = 50;
protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false&useExponentialBackOff=false";
+ @Override
protected void setUp() throws Exception {
setMaxTestTime(TimeUnit.MINUTES.toMillis(10));
setAutoFail(true);
@@ -74,6 +75,7 @@ abstract public class QueueMasterSlaveTe
return "org/apache/activemq/broker/ft/master.xml";
}
+ @Override
protected void tearDown() throws Exception {
super.tearDown();
master.stop();
@@ -86,10 +88,12 @@ abstract public class QueueMasterSlaveTe
master.stop();
}
+ @Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(uriString);
}
+ @Override
protected void messageSent() throws Exception {
if (++inflightMessageCount == failureCount) {
Thread.sleep(1000);
@@ -123,13 +127,16 @@ abstract public class QueueMasterSlaveTe
MessageConsumer qConsumer = session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.TA1"));
assertNull("No message there yet", qConsumer.receive(1000));
qConsumer.close();
+ assertTrue(!master.isSlave());
master.stop();
assertTrue("slave started", slaveStarted.await(15, TimeUnit.SECONDS));
+ assertTrue(!slave.get().isSlave());
final String text = "ForUWhenSlaveKicksIn";
producer.send(new ActiveMQTopic("VirtualTopic.TA1"), session.createTextMessage(text));
qConsumer = session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.TA1"));
+
javax.jms.Message message = qConsumer.receive(4000);
assertNotNull("Get message after failover", message);
assertEquals("correct message", text, ((TextMessage)message).getText());
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1449077&r1=1449076&r2=1449077&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Fri Feb 22 15:32:26 2013
@@ -336,6 +336,7 @@ public class MBeanTest extends EmbeddedB
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
+ assertTrue("broker is not a slave", !broker.isSlave());
// create 2 topics
broker.addTopic(getDestinationString() + "1 ");
broker.addTopic(" " + getDestinationString() + "2");
@@ -534,6 +535,7 @@ public class MBeanTest extends EmbeddedB
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
+ assertTrue("broker is not a slave", !broker.isSlave());
// create 2 topics
broker.addTopic(getDestinationString() + "1");
broker.addTopic(getDestinationString() + "2");
@@ -585,6 +587,7 @@ public class MBeanTest extends EmbeddedB
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
+ assertTrue("broker is not a slave", !broker.isSlave());
// create 2 topics
broker.addTopic(getDestinationString() + "1");
broker.addTopic(getDestinationString() + "2");