You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2016/11/07 12:37:55 UTC
[1/3] activemq-artemis git commit: remove JMS JMX Objects and add new
Address JMX objects
Repository: activemq-artemis
Updated Branches:
refs/heads/ARTEMIS-780 dc9c07878 -> 3858b1cfc
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 5cc55c3..52800a8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -99,6 +99,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
}
@Override
+ public void createAddress(@Parameter(name = "name", desc = "The name of the address") String name, @Parameter(name = "routingType", desc = "the routing type of the address either 0 for multicast or 1 for anycast") int routingType, @Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers, @Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception {
+ proxy.invokeOperation("createAddress", name, routingType, defaultDeleteOnNoConsumers, defaultMaxConsumers);
+ }
+
+ @Override
public void createQueue(final String address,
final String name,
final String filter,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlTest.java
index 7311727..109e008 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlTest.java
@@ -45,9 +45,9 @@ public class DivertControlTest extends ManagementTestBase {
@Test
public void testAttributes() throws Exception {
- checkResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(divertConfig.getName()));
+ checkResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(divertConfig.getName(), divertConfig.getAddress()));
- DivertControl divertControl = createManagementControl(divertConfig.getName());
+ DivertControl divertControl = createDivertManagementControl(divertConfig.getName(), divertConfig.getAddress());
Assert.assertEquals(divertConfig.getFilterString(), divertControl.getFilter());
@@ -86,7 +86,7 @@ public class DivertControlTest extends ManagementTestBase {
server.start();
}
- protected DivertControl createManagementControl(final String name) throws Exception {
- return ManagementControlHelper.createDivertControl(name, mbeanServer);
+ protected DivertControl createDivertManagementControl(final String name, final String address) throws Exception {
+ return ManagementControlHelper.createDivertControl(name, address, mbeanServer);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
index cf92cf1..ecf4142 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
@@ -32,7 +32,7 @@ public class DivertControlUsingCoreTest extends DivertControlTest {
// DivertControlTest overrides --------------------------------
@Override
- protected DivertControl createManagementControl(final String name) throws Exception {
+ protected DivertControl createDivertManagementControl(final String name, final String address) throws Exception {
return new DivertControl() {
private final CoreMessagingProxy proxy = new CoreMessagingProxy(addServerLocator(createInVMNonHALocator()), ResourceNames.CORE_DIVERT + name);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java
index a41c908..641d97c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java
@@ -59,8 +59,8 @@ public class ManagementControlHelper {
return (BridgeControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getBridgeObjectName(name), BridgeControl.class, mbeanServer);
}
- public static DivertControl createDivertControl(final String name, final MBeanServer mbeanServer) throws Exception {
- return (DivertControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getDivertObjectName(name), DivertControl.class, mbeanServer);
+ public static DivertControl createDivertControl(final String name, String address, final MBeanServer mbeanServer) throws Exception {
+ return (DivertControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getDivertObjectName(name, address), DivertControl.class, mbeanServer);
}
public static ClusterConnectionControl createClusterConnectionControl(final String name,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
index ce95046..2004ee8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
import org.apache.activemq.artemis.tests.integration.server.FakeStorageManager;
@@ -134,7 +135,7 @@ public class ManagementServiceImplTest extends ActiveMQTestBase {
managementService.setStorageManager(new NullStorageManager());
SimpleString address = RandomUtil.randomSimpleString();
- managementService.registerAddress(address);
+ managementService.registerAddress(new AddressInfo(address));
Queue queue = new FakeQueue(RandomUtil.randomSimpleString());
managementService.registerQueue(queue, RandomUtil.randomSimpleString(), new FakeStorageManager());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index fa84c16..235b1f8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -2108,7 +2108,7 @@ public class QueueControlTest extends ManagementTestBase {
QueueControl queueControl = createManagementControl(address, queue);
- queueControl.sendMessage(new HashMap<String, String>(), Message.TEXT_TYPE, Base64.encodeBytes("theBody".getBytes()), "myID", true, "myUser", "myPassword");
+ queueControl.sendMessage(new HashMap<String, String>(), Message.TEXT_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword");
Assert.assertEquals(1, getMessageCount(queueControl));
@@ -2133,7 +2133,7 @@ public class QueueControlTest extends ManagementTestBase {
QueueControl queueControl = createManagementControl(address, queue);
- queueControl.sendMessage(new HashMap<String, String>(), Message.TEXT_TYPE, null, "myID", true, "myUser", "myPassword");
+ queueControl.sendMessage(new HashMap<String, String>(), Message.TEXT_TYPE, null, true, "myUser", "myPassword");
Assert.assertEquals(1, getMessageCount(queueControl));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 9b901fc..4dd418b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -316,11 +316,10 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
public String sendMessage(Map<String, String> headers,
int type,
String body,
- String userID,
boolean durable,
String user,
String password) throws Exception {
- return (String) proxy.invokeOperation("sendMessage", headers, type, body, userID, durable, user, password);
+ return (String) proxy.invokeOperation("sendMessage", headers, type, body, durable, user, password);
}
public void setDeadLetterAddress(final String deadLetterAddress) throws Exception {
@@ -352,6 +351,17 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
}
@Override
+ public CompositeData[] browse() throws Exception {
+ Map map = (Map) proxy.invokeOperation("browse");
+ CompositeData[] compositeDatas = (CompositeData[]) map.get(CompositeData.class.getName());
+ if (compositeDatas == null) {
+ compositeDatas = new CompositeData[0];
+ }
+ return compositeDatas;
+ }
+
+
+ @Override
public CompositeData[] browse(String filter) throws Exception {
Map map = (Map) proxy.invokeOperation("browse", filter);
CompositeData[] compositeDatas = (CompositeData[]) map.get(CompositeData.class.getName());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/tools/container/LocalTestServer.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/tools/container/LocalTestServer.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/tools/container/LocalTestServer.java
index 0fe7b47..9f15229 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/tools/container/LocalTestServer.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/tools/container/LocalTestServer.java
@@ -28,11 +28,13 @@ import java.util.List;
import java.util.Properties;
import java.util.Set;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
-import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;
import org.apache.activemq.artemis.api.jms.management.TopicControl;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
@@ -328,7 +330,7 @@ public class LocalTestServer implements Server, Runnable {
@Override
public Long getMessageCountForQueue(final String queueName) throws Exception {
- JMSQueueControl queue = (JMSQueueControl) getActiveMQServer().getManagementService().getResource(queueName);
+ QueueControl queue = (QueueControl) getActiveMQServer().getManagementService().getResource("queue." + queueName);
if (queue != null) {
queue.flushExecutor();
return queue.getMessageCount();
@@ -340,7 +342,7 @@ public class LocalTestServer implements Server, Runnable {
@Override
public void removeAllMessages(final String destination, final boolean isQueue) throws Exception {
if (isQueue) {
- JMSQueueControl queue = (JMSQueueControl) getActiveMQServer().getManagementService().getResource(destination);
+ QueueControl queue = (QueueControl) getActiveMQServer().getManagementService().getResource("queue." + destination);
queue.removeMessages(null);
} else {
TopicControl topic = (TopicControl) getActiveMQServer().getManagementService().getResource(destination);
@@ -350,13 +352,12 @@ public class LocalTestServer implements Server, Runnable {
@Override
public List<String> listAllSubscribersForTopic(final String s) throws Exception {
- ObjectName objectName = ObjectNameBuilder.DEFAULT.getJMSTopicObjectName(s);
- TopicControl topic = MBeanServerInvocationHandler.newProxyInstance(ManagementFactory.getPlatformMBeanServer(), objectName, TopicControl.class, false);
- Object[] subInfos = topic.listAllSubscriptions();
+ ObjectName objectName = ObjectNameBuilder.DEFAULT.getAddressObjectName(new SimpleString(s));
+ AddressControl topic = MBeanServerInvocationHandler.newProxyInstance(ManagementFactory.getPlatformMBeanServer(), objectName, AddressControl.class, false);
+ Object[] subInfos = topic.getQueueNames();
List<String> subs = new ArrayList<>();
for (Object o : subInfos) {
- Object[] data = (Object[]) o;
- subs.add((String) data[2]);
+ subs.add( ((String) o).split("\\.")[1]);
}
return subs;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java
----------------------------------------------------------------------
diff --git a/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java b/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java
index 63ae052..7d9d24a 100644
--- a/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java
+++ b/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
import org.junit.Assert;
import org.objectweb.jtests.jms.admin.Admin;
@@ -120,8 +121,7 @@ public class AbstractAdmin implements Admin {
public void createQueue(final String name) {
Boolean result;
try {
- result = (Boolean) invokeSyncOperation(ResourceNames.JMS_SERVER, "createQueue", name, name);
- Assert.assertEquals(true, result.booleanValue());
+ invokeSyncOperation(ResourceNames.CORE_SERVER, "createQueue", name, name);
} catch (Exception e) {
throw new IllegalStateException(e);
}
@@ -131,8 +131,7 @@ public class AbstractAdmin implements Admin {
public void deleteQueue(final String name) {
Boolean result;
try {
- result = (Boolean) invokeSyncOperation(ResourceNames.JMS_SERVER, "destroyQueue", name);
- Assert.assertEquals(true, result.booleanValue());
+ invokeSyncOperation(ResourceNames.CORE_SERVER, "destroyQueue", name);
} catch (Exception e) {
throw new IllegalStateException(e);
}
@@ -152,8 +151,7 @@ public class AbstractAdmin implements Admin {
public void createTopic(final String name) {
Boolean result;
try {
- result = (Boolean) invokeSyncOperation(ResourceNames.JMS_SERVER, "createTopic", name, name);
- Assert.assertEquals(true, result.booleanValue());
+ invokeSyncOperation(ResourceNames.CORE_SERVER, "createAddress", name, (int)AddressInfo.RoutingType.MULTICAST.getType(), false, -1);
} catch (Exception e) {
throw new IllegalStateException(e);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/tests/joram-tests/src/test/java/org/apache/activemq/artemis/jms/ActiveMQCoreAdmin.java
----------------------------------------------------------------------
diff --git a/tests/joram-tests/src/test/java/org/apache/activemq/artemis/jms/ActiveMQCoreAdmin.java b/tests/joram-tests/src/test/java/org/apache/activemq/artemis/jms/ActiveMQCoreAdmin.java
index 910f141..321bdca 100644
--- a/tests/joram-tests/src/test/java/org/apache/activemq/artemis/jms/ActiveMQCoreAdmin.java
+++ b/tests/joram-tests/src/test/java/org/apache/activemq/artemis/jms/ActiveMQCoreAdmin.java
@@ -57,7 +57,6 @@ public class ActiveMQCoreAdmin extends AbstractAdmin {
@Override
public void createConnectionFactory(final String name) {
- createConnection(name, 0);
jndiProps.put("connectionFactory." + name, "tcp://127.0.0.1:61616?type=CF");
}
@@ -84,7 +83,6 @@ public class ActiveMQCoreAdmin extends AbstractAdmin {
@Override
public void createQueueConnectionFactory(final String name) {
- createConnection(name, 1);
jndiProps.put("connectionFactory." + name, "tcp://127.0.0.1:61616?type=QUEUE_CF");
}
@@ -96,7 +94,6 @@ public class ActiveMQCoreAdmin extends AbstractAdmin {
@Override
public void createTopicConnectionFactory(final String name) {
- createConnection(name, 2);
jndiProps.put("connectionFactory." + name, "tcp://127.0.0.1:61616?type=TOPIC_CF");
}
[3/3] activemq-artemis git commit: remove JMS JMX Objects and add new
Address JMX objects
Posted by an...@apache.org.
remove JMS JMX Objects and add new Address JMX objects
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3858b1cf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3858b1cf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3858b1cf
Branch: refs/heads/ARTEMIS-780
Commit: 3858b1cfc0b23b87bd73ded968bb9c0f3c20ba49
Parents: dc9c078
Author: Andy Taylor <an...@gmail.com>
Authored: Sun Nov 6 10:43:16 2016 +0000
Committer: Andy Taylor <an...@gmail.com>
Committed: Mon Nov 7 12:36:52 2016 +0000
----------------------------------------------------------------------
.../core/management/ActiveMQServerControl.java | 5 +
.../api/core/management/AddressControl.java | 27 +
.../api/core/management/ObjectNameBuilder.java | 34 +-
.../api/core/management/QueueControl.java | 5 +-
.../api/core/management/ResourceNames.java | 5 +-
.../impl/JMSConnectionFactoryControlImpl.java | 471 ----------
.../management/impl/JMSQueueControlImpl.java | 532 -----------
.../management/impl/JMSServerControlImpl.java | 876 -------------------
.../management/impl/JMSTopicControlImpl.java | 367 --------
.../openmbean/JMSCompositeDataConstants.java | 57 --
.../impl/openmbean/JMSOpenTypeSupport.java | 357 --------
.../jms/server/impl/JMSServerManagerImpl.java | 30 -
.../server/management/JMSManagementService.java | 48 -
.../impl/JMSManagementServiceImpl.java | 155 ----
.../impl/ActiveMQServerControlImpl.java | 13 +
.../management/impl/AddressControlImpl.java | 105 ++-
.../core/management/impl/QueueControlImpl.java | 12 +-
.../core/postoffice/impl/PostOfficeImpl.java | 4 +-
.../core/server/impl/ActiveMQServerImpl.java | 3 -
.../artemis/core/server/impl/AddressInfo.java | 7 +
.../server/impl/PostOfficeJournalLoader.java | 4 +-
.../server/management/ManagementService.java | 3 +-
.../management/impl/ManagementServiceImpl.java | 15 +-
.../group/impl/ClusteredResetMockTest.java | 3 +-
.../tests/extras/jms/bridge/BridgeTestBase.java | 6 +-
.../crossprotocol/AMQPToOpenwireTest.java | 1 -
.../management/ActiveMQServerControlTest.java | 12 +-
.../ActiveMQServerControlUsingCoreTest.java | 5 +
.../management/DivertControlTest.java | 8 +-
.../management/DivertControlUsingCoreTest.java | 2 +-
.../management/ManagementControlHelper.java | 4 +-
.../management/ManagementServiceImplTest.java | 3 +-
.../management/QueueControlTest.java | 4 +-
.../management/QueueControlUsingCoreTest.java | 14 +-
.../tests/tools/container/LocalTestServer.java | 17 +-
.../activemq/artemis/common/AbstractAdmin.java | 10 +-
.../activemq/artemis/jms/ActiveMQCoreAdmin.java | 3 -
37 files changed, 237 insertions(+), 2990 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 87a4a79..30e8bc5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -424,6 +424,11 @@ public interface ActiveMQServerControl {
// Operations ----------------------------------------------------
+ @Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION)
+ void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
+ @Parameter(name = "routingType", desc = "the routing type of the address either 0 for multicast or 1 for anycast") int routingType,
+ @Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers,
+ @Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception;
/**
* Create a durable queue.
* <br>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
index fbecf25..e7a02ad 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.api.core.management;
+import javax.management.MBeanOperationInfo;
+import java.util.Map;
+
/**
* An AddressControl is used to manage an address.
*/
@@ -27,6 +30,12 @@ public interface AddressControl {
@Attribute(desc = "managed address")
String getAddress();
+ /*
+ * The routing type of this address, either multicast (topic subscriptions) or anycast (queue semantics).
+ * */
+ @Attribute(desc = "The routing type of this address")
+ String getRoutingType();
+
/**
* Returns the roles (name and permissions) associated with this address.
*/
@@ -85,4 +94,22 @@ public interface AddressControl {
*/
@Attribute(desc = "names of all bindings (both queues and diverts) bound to this address")
String[] getBindingNames() throws Exception;
+
+
+ /**
+ * @param headers the message headers and properties to set. Can only
+ * container Strings maped to primitive types.
+ * @param body the text to send
+ * @param durable
+ * @param user
+ * @param password @return
+ * @throws Exception
+ */
+ @Operation(desc = "Sends a TextMessage to a password-protected address.", impact = MBeanOperationInfo.ACTION)
+ String sendMessage(@Parameter(name = "headers", desc = "The headers to add to the message") Map<String, String> headers,
+ @Parameter(name = "headers", desc = "A type for the message") final int type,
+ @Parameter(name = "body", desc = "The body (byte[]) of the message encoded as a string using Base64") String body,
+ @Parameter(name = "durable", desc = "Whether the message is durable") boolean durable,
+ @Parameter(name = "user", desc = "The user to authenticate with") String user,
+ @Parameter(name = "password", desc = "The users password to authenticate with") String password) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java
index ef7b483..019996a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java
@@ -33,10 +33,6 @@ public final class ObjectNameBuilder {
*/
public static final ObjectNameBuilder DEFAULT = new ObjectNameBuilder(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "localhost", true);
- static final String JMS_MODULE = "JMS";
-
- static final String CORE_MODULE = "Core";
-
// Attributes ----------------------------------------------------
private final String domain;
@@ -85,7 +81,7 @@ public final class ObjectNameBuilder {
* Returns the ObjectName used by the single {@link ActiveMQServerControl}.
*/
public ObjectName getActiveMQServerObjectName() throws Exception {
- return ObjectName.getInstance(domain + ":" + getBrokerProperties() + "module=Core," + getObjectType() + "=Server");
+ return ObjectName.getInstance(domain + ":" + getBrokerProperties() + getObjectType() + "=Broker");
}
/**
@@ -94,7 +90,7 @@ public final class ObjectNameBuilder {
* @see AddressControl
*/
public ObjectName getAddressObjectName(final SimpleString address) throws Exception {
- return createObjectName(ObjectNameBuilder.CORE_MODULE, "Address", address.toString());
+ return createObjectName("Address", address.toString());
}
/**
@@ -103,7 +99,7 @@ public final class ObjectNameBuilder {
* @see QueueControl
*/
public ObjectName getQueueObjectName(final SimpleString address, final SimpleString name) throws Exception {
- return ObjectName.getInstance(String.format("%s:" + getBrokerProperties() + "module=%s," + getObjectType() + "=%s,address=%s,name=%s", domain, ObjectNameBuilder.CORE_MODULE, "Queue", ObjectName.quote(address.toString()), ObjectName.quote(name.toString())));
+ return ObjectName.getInstance(String.format("%s:" + getBrokerProperties() + "parentType=%s,parentName=%s," + getObjectType() + "=%s,name=%s", domain, "Address", ObjectName.quote(address.toString()), "Queue", ObjectName.quote(name.toString())));
}
/**
@@ -111,8 +107,8 @@ public final class ObjectNameBuilder {
*
* @see DivertControl
*/
- public ObjectName getDivertObjectName(final String name) throws Exception {
- return createObjectName(ObjectNameBuilder.CORE_MODULE, "Divert", name);
+ public ObjectName getDivertObjectName(final String name, String address) throws Exception {
+ return ObjectName.getInstance(String.format("%s:" + getBrokerProperties() + "parentType=%s,parentName=%s," + getObjectType() + "=%s,name=%s", domain, "Address", ObjectName.quote(address.toString()), "Divert", ObjectName.quote(name.toString())));
}
/**
@@ -121,7 +117,7 @@ public final class ObjectNameBuilder {
* @see AcceptorControl
*/
public ObjectName getAcceptorObjectName(final String name) throws Exception {
- return createObjectName(ObjectNameBuilder.CORE_MODULE, "Acceptor", name);
+ return createObjectName("Acceptor", name);
}
/**
@@ -130,7 +126,7 @@ public final class ObjectNameBuilder {
* @see BroadcastGroupControl
*/
public ObjectName getBroadcastGroupObjectName(final String name) throws Exception {
- return createObjectName(ObjectNameBuilder.CORE_MODULE, "BroadcastGroup", name);
+ return createObjectName("BroadcastGroup", name);
}
/**
@@ -139,7 +135,7 @@ public final class ObjectNameBuilder {
* @see BridgeControl
*/
public ObjectName getBridgeObjectName(final String name) throws Exception {
- return createObjectName(ObjectNameBuilder.CORE_MODULE, "Bridge", name);
+ return createObjectName("Bridge", name);
}
/**
@@ -148,14 +144,14 @@ public final class ObjectNameBuilder {
* @see ClusterConnectionControl
*/
public ObjectName getClusterConnectionObjectName(final String name) throws Exception {
- return createObjectName(ObjectNameBuilder.CORE_MODULE, "ClusterConnection", name);
+ return createObjectName("ClusterConnection", name);
}
/**
* Returns the ObjectName used by DiscoveryGroupControl.
*/
public ObjectName getDiscoveryGroupObjectName(final String name) throws Exception {
- return createObjectName(ObjectNameBuilder.CORE_MODULE, "DiscoveryGroup", name);
+ return createObjectName("DiscoveryGroup", name);
}
/**
@@ -169,25 +165,25 @@ public final class ObjectNameBuilder {
* Returns the ObjectName used by JMSQueueControl.
*/
public ObjectName getJMSQueueObjectName(final String name) throws Exception {
- return createObjectName(ObjectNameBuilder.JMS_MODULE, "Queue", name);
+ return createObjectName("Queue", name);
}
/**
* Returns the ObjectName used by TopicControl.
*/
public ObjectName getJMSTopicObjectName(final String name) throws Exception {
- return createObjectName(ObjectNameBuilder.JMS_MODULE, "Topic", name);
+ return createObjectName("Topic", name);
}
/**
* Returns the ObjectName used by ConnectionFactoryControl.
*/
public ObjectName getConnectionFactoryObjectName(final String name) throws Exception {
- return createObjectName(ObjectNameBuilder.JMS_MODULE, "ConnectionFactory", name);
+ return createObjectName("ConnectionFactory", name);
}
- private ObjectName createObjectName(final String module, final String type, final String name) throws Exception {
- String format = String.format("%s:" + getBrokerProperties() + "module=%s," + getObjectType() + "=%s,name=%s", domain, module, type, ObjectName.quote(name));
+ private ObjectName createObjectName(final String type, final String name) throws Exception {
+ String format = String.format("%s:" + getBrokerProperties() + getObjectType() + "=%s,name=%s", domain, type, ObjectName.quote(name));
return ObjectName.getInstance(format);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 3336aae..bbf365c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -338,7 +338,6 @@ public interface QueueControl {
* @param headers the message headers and properties to set. Can only
* container Strings maped to primitive types.
* @param body the text to send
- * @param userID
* @param durable
* @param user
* @param password @return
@@ -348,7 +347,6 @@ public interface QueueControl {
String sendMessage(@Parameter(name = "headers", desc = "The headers to add to the message") Map<String, String> headers,
@Parameter(name = "headers", desc = "A type for the message") final int type,
@Parameter(name = "body", desc = "The body (byte[]) of the message encoded as a string using Base64") String body,
- @Parameter(name = "body", desc = "The user ID to set on the message") String userID,
@Parameter(name = "durable", desc = "Whether the message is durable") boolean durable,
@Parameter(name = "user", desc = "The user to authenticate with") String user,
@Parameter(name = "password", desc = "The users password to authenticate with") String password) throws Exception;
@@ -431,6 +429,9 @@ public interface QueueControl {
@Attribute(desc = "whether the queue is paused")
boolean isPaused() throws Exception;
+ @Operation(desc = "Browse Messages", impact = MBeanOperationInfo.ACTION)
+ CompositeData[] browse() throws Exception;
+
/**
* Resets the MessagesAdded property
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java
index a8c7632..716c6c1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java
@@ -23,10 +23,11 @@ package org.apache.activemq.artemis.api.core.management;
* For example, the resource name of the "foo" queue is {@code CORE_QUEUE + "foo"}.
*/
public final class ResourceNames {
+ public static final String ADDRESS = "address.";
- public static final String CORE_SERVER = "core.server";
+ public static final String CORE_SERVER = "broker";
- public static final String CORE_QUEUE = "core.queue.";
+ public static final String CORE_QUEUE = "queue.";
public static final String CORE_ADDRESS = "core.address.";
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSConnectionFactoryControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSConnectionFactoryControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSConnectionFactoryControlImpl.java
deleted file mode 100644
index 3175b9c..0000000
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSConnectionFactoryControlImpl.java
+++ /dev/null
@@ -1,471 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jms.management.impl;
-
-import javax.management.MBeanInfo;
-import javax.management.NotCompliantMBeanException;
-import javax.management.StandardMBean;
-
-import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.api.core.management.Parameter;
-import org.apache.activemq.artemis.api.jms.management.ConnectionFactoryControl;
-import org.apache.activemq.artemis.core.management.impl.MBeanInfoHelper;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.jms.server.JMSServerManager;
-import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration;
-
-public class JMSConnectionFactoryControlImpl extends StandardMBean implements ConnectionFactoryControl {
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final ConnectionFactoryConfiguration cfConfig;
-
- private ActiveMQConnectionFactory cf;
-
- private final String name;
-
- private final JMSServerManager jmsManager;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public JMSConnectionFactoryControlImpl(final ConnectionFactoryConfiguration cfConfig,
- final ActiveMQConnectionFactory cf,
- final JMSServerManager jmsManager,
- final String name) throws NotCompliantMBeanException {
- super(ConnectionFactoryControl.class);
- this.cfConfig = cfConfig;
- this.cf = cf;
- this.name = name;
- this.jmsManager = jmsManager;
- }
-
- // Public --------------------------------------------------------
-
- // ManagedConnectionFactoryMBean implementation ------------------
-
- @Override
- public String[] getRegistryBindings() {
- return jmsManager.getBindingsOnConnectionFactory(name);
- }
-
- @Override
- public boolean isCompressLargeMessages() {
- return cf.isCompressLargeMessage();
- }
-
- @Override
- public void setCompressLargeMessages(final boolean compress) {
- cfConfig.setCompressLargeMessages(compress);
- recreateCF();
- }
-
- @Override
- public boolean isHA() {
- return cfConfig.isHA();
- }
-
- @Override
- public int getFactoryType() {
- return cfConfig.getFactoryType().intValue();
- }
-
- @Override
- public String getClientID() {
- return cfConfig.getClientID();
- }
-
- @Override
- public long getClientFailureCheckPeriod() {
- return cfConfig.getClientFailureCheckPeriod();
- }
-
- @Override
- public void setClientID(String clientID) {
- cfConfig.setClientID(clientID);
- recreateCF();
- }
-
- @Override
- public void setDupsOKBatchSize(int dupsOKBatchSize) {
- cfConfig.setDupsOKBatchSize(dupsOKBatchSize);
- recreateCF();
- }
-
- @Override
- public void setTransactionBatchSize(int transactionBatchSize) {
- cfConfig.setTransactionBatchSize(transactionBatchSize);
- recreateCF();
- }
-
- @Override
- public void setClientFailureCheckPeriod(long clientFailureCheckPeriod) {
- cfConfig.setClientFailureCheckPeriod(clientFailureCheckPeriod);
- recreateCF();
- }
-
- @Override
- public void setConnectionTTL(long connectionTTL) {
- cfConfig.setConnectionTTL(connectionTTL);
- recreateCF();
- }
-
- @Override
- public void setCallTimeout(long callTimeout) {
- cfConfig.setCallTimeout(callTimeout);
- recreateCF();
- }
-
- @Override
- public void setCallFailoverTimeout(long callTimeout) {
- cfConfig.setCallFailoverTimeout(callTimeout);
- recreateCF();
- }
-
- @Override
- public void setConsumerWindowSize(int consumerWindowSize) {
- cfConfig.setConsumerWindowSize(consumerWindowSize);
- recreateCF();
- }
-
- @Override
- public void setConsumerMaxRate(int consumerMaxRate) {
- cfConfig.setConsumerMaxRate(consumerMaxRate);
- recreateCF();
- }
-
- @Override
- public void setConfirmationWindowSize(int confirmationWindowSize) {
- cfConfig.setConfirmationWindowSize(confirmationWindowSize);
- recreateCF();
- }
-
- @Override
- public void setProducerMaxRate(int producerMaxRate) {
- cfConfig.setProducerMaxRate(producerMaxRate);
- recreateCF();
- }
-
- @Override
- public int getProducerWindowSize() {
- return cfConfig.getProducerWindowSize();
- }
-
- @Override
- public void setProducerWindowSize(int producerWindowSize) {
- cfConfig.setProducerWindowSize(producerWindowSize);
- recreateCF();
- }
-
- @Override
- public void setCacheLargeMessagesClient(boolean cacheLargeMessagesClient) {
- cfConfig.setCacheLargeMessagesClient(cacheLargeMessagesClient);
- recreateCF();
- }
-
- @Override
- public boolean isCacheLargeMessagesClient() {
- return cfConfig.isCacheLargeMessagesClient();
- }
-
- @Override
- public void setMinLargeMessageSize(int minLargeMessageSize) {
- cfConfig.setMinLargeMessageSize(minLargeMessageSize);
- recreateCF();
- }
-
- @Override
- public void setBlockOnNonDurableSend(boolean blockOnNonDurableSend) {
- cfConfig.setBlockOnNonDurableSend(blockOnNonDurableSend);
- recreateCF();
- }
-
- @Override
- public void setBlockOnAcknowledge(boolean blockOnAcknowledge) {
- cfConfig.setBlockOnAcknowledge(blockOnAcknowledge);
- recreateCF();
- }
-
- @Override
- public void setBlockOnDurableSend(boolean blockOnDurableSend) {
- cfConfig.setBlockOnDurableSend(blockOnDurableSend);
- recreateCF();
- }
-
- @Override
- public void setAutoGroup(boolean autoGroup) {
- cfConfig.setAutoGroup(autoGroup);
- recreateCF();
- }
-
- @Override
- public void setPreAcknowledge(boolean preAcknowledge) {
- cfConfig.setPreAcknowledge(preAcknowledge);
- recreateCF();
- }
-
- @Override
- public void setMaxRetryInterval(long retryInterval) {
- cfConfig.setMaxRetryInterval(retryInterval);
- recreateCF();
- }
-
- @Override
- public void setRetryIntervalMultiplier(double retryIntervalMultiplier) {
- cfConfig.setRetryIntervalMultiplier(retryIntervalMultiplier);
- recreateCF();
- }
-
- @Override
- public void setReconnectAttempts(int reconnectAttempts) {
- cfConfig.setReconnectAttempts(reconnectAttempts);
- recreateCF();
- }
-
- @Override
- public void setFailoverOnInitialConnection(boolean failover) {
- cfConfig.setFailoverOnInitialConnection(failover);
- recreateCF();
- }
-
- @Override
- public boolean isUseGlobalPools() {
- return cfConfig.isUseGlobalPools();
- }
-
- @Override
- public void setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize) {
- cfConfig.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
- recreateCF();
- }
-
- @Override
- public int getThreadPoolMaxSize() {
- return cfConfig.getThreadPoolMaxSize();
- }
-
- @Override
- public void setThreadPoolMaxSize(int threadPoolMaxSize) {
- cfConfig.setThreadPoolMaxSize(threadPoolMaxSize);
- recreateCF();
- }
-
- @Override
- public int getInitialMessagePacketSize() {
- return cf.getInitialMessagePacketSize();
- }
-
- @Override
- public void setGroupID(String groupID) {
- cfConfig.setGroupID(groupID);
- recreateCF();
- }
-
- @Override
- public String getGroupID() {
- return cfConfig.getGroupID();
- }
-
- @Override
- public void setUseGlobalPools(boolean useGlobalPools) {
- cfConfig.setUseGlobalPools(useGlobalPools);
- recreateCF();
- }
-
- @Override
- public int getScheduledThreadPoolMaxSize() {
- return cfConfig.getScheduledThreadPoolMaxSize();
- }
-
- @Override
- public void setRetryInterval(long retryInterval) {
- cfConfig.setRetryInterval(retryInterval);
- recreateCF();
- }
-
- @Override
- public long getMaxRetryInterval() {
- return cfConfig.getMaxRetryInterval();
- }
-
- @Override
- public String getConnectionLoadBalancingPolicyClassName() {
- return cfConfig.getLoadBalancingPolicyClassName();
- }
-
- @Override
- public void setConnectionLoadBalancingPolicyClassName(String name) {
- cfConfig.setLoadBalancingPolicyClassName(name);
- recreateCF();
- }
-
- @Override
- public TransportConfiguration[] getStaticConnectors() {
- return cf.getStaticConnectors();
- }
-
- @Override
- public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration() {
- return cf.getDiscoveryGroupConfiguration();
- }
-
- @Override
- public void addBinding(@Parameter(name = "binding", desc = "the name of the binding for the Registry") String binding) throws Exception {
- jmsManager.addConnectionFactoryToBindingRegistry(name, binding);
- }
-
- @Override
- public void removeBinding(@Parameter(name = "binding", desc = "the name of the binding for the Registry") String binding) throws Exception {
- jmsManager.removeConnectionFactoryFromBindingRegistry(name, binding);
- }
-
- @Override
- public long getCallTimeout() {
- return cfConfig.getCallTimeout();
- }
-
- @Override
- public long getCallFailoverTimeout() {
- return cfConfig.getCallFailoverTimeout();
- }
-
- @Override
- public int getConsumerMaxRate() {
- return cfConfig.getConsumerMaxRate();
- }
-
- @Override
- public int getConsumerWindowSize() {
- return cfConfig.getConsumerWindowSize();
- }
-
- @Override
- public int getProducerMaxRate() {
- return cfConfig.getProducerMaxRate();
- }
-
- @Override
- public int getConfirmationWindowSize() {
- return cfConfig.getConfirmationWindowSize();
- }
-
- @Override
- public int getDupsOKBatchSize() {
- return cfConfig.getDupsOKBatchSize();
- }
-
- @Override
- public boolean isBlockOnAcknowledge() {
- return cfConfig.isBlockOnAcknowledge();
- }
-
- @Override
- public boolean isBlockOnNonDurableSend() {
- return cfConfig.isBlockOnNonDurableSend();
- }
-
- @Override
- public boolean isBlockOnDurableSend() {
- return cfConfig.isBlockOnDurableSend();
- }
-
- @Override
- public boolean isPreAcknowledge() {
- return cfConfig.isPreAcknowledge();
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public long getConnectionTTL() {
- return cfConfig.getConnectionTTL();
- }
-
- @Override
- public int getReconnectAttempts() {
- return cfConfig.getReconnectAttempts();
- }
-
- @Override
- public boolean isFailoverOnInitialConnection() {
- return cfConfig.isFailoverOnInitialConnection();
- }
-
- @Override
- public int getMinLargeMessageSize() {
- return cfConfig.getMinLargeMessageSize();
- }
-
- @Override
- public long getRetryInterval() {
- return cfConfig.getRetryInterval();
- }
-
- @Override
- public double getRetryIntervalMultiplier() {
- return cfConfig.getRetryIntervalMultiplier();
- }
-
- @Override
- public int getTransactionBatchSize() {
- return cfConfig.getTransactionBatchSize();
- }
-
- @Override
- public void setProtocolManagerFactoryStr(String protocolManagerFactoryStr) {
- cfConfig.setProtocolManagerFactoryStr(protocolManagerFactoryStr);
- recreateCF();
- }
-
- @Override
- public String getProtocolManagerFactoryStr() {
- return cfConfig.getProtocolManagerFactoryStr();
- }
-
- @Override
- public boolean isAutoGroup() {
- return cfConfig.isAutoGroup();
- }
-
- @Override
- public MBeanInfo getMBeanInfo() {
- MBeanInfo info = super.getMBeanInfo();
- return new MBeanInfo(info.getClassName(), info.getDescription(), info.getAttributes(), info.getConstructors(), MBeanInfoHelper.getMBeanOperationsInfo(ConnectionFactoryControl.class), info.getNotifications());
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- private void recreateCF() {
- try {
- this.cf = jmsManager.recreateCF(this.name, this.cfConfig);
- } catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
deleted file mode 100644
index 36cba96..0000000
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
+++ /dev/null
@@ -1,532 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jms.management.impl;
-
-import javax.jms.InvalidSelectorException;
-import javax.json.JsonArrayBuilder;
-import javax.management.MBeanInfo;
-import javax.management.StandardMBean;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException;
-import org.apache.activemq.artemis.api.core.FilterConstants;
-import org.apache.activemq.artemis.api.core.JsonUtil;
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
-import org.apache.activemq.artemis.api.core.management.Operation;
-import org.apache.activemq.artemis.api.core.management.QueueControl;
-import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;
-import org.apache.activemq.artemis.core.management.impl.MBeanInfoHelper;
-import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
-import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
-import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
-import org.apache.activemq.artemis.jms.management.impl.openmbean.JMSOpenTypeSupport;
-import org.apache.activemq.artemis.jms.server.JMSServerManager;
-import org.apache.activemq.artemis.utils.Base64;
-import org.apache.activemq.artemis.utils.JsonLoader;
-import org.apache.activemq.artemis.utils.SelectorTranslator;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
-
-public class JMSQueueControlImpl extends StandardMBean implements JMSQueueControl {
-
- private final ActiveMQDestination managedQueue;
-
- private final JMSServerManager jmsServerManager;
-
- private final QueueControl coreQueueControl;
-
- private final MessageCounter counter;
-
- // Static --------------------------------------------------------
-
- /**
- * Returns null if the string is null or empty
- */
- public static String createFilterFromJMSSelector(final String selectorStr) throws ActiveMQException {
- return selectorStr == null || selectorStr.trim().length() == 0 ? null : SelectorTranslator.convertToActiveMQFilterString(selectorStr);
- }
-
- private static String createFilterForJMSMessageID(final String jmsMessageID) throws Exception {
- return FilterConstants.ACTIVEMQ_USERID + " = '" + jmsMessageID + "'";
- }
-
- static String toJSON(final Map<String, Object>[] messages) {
- JsonArrayBuilder array = JsonLoader.createArrayBuilder();
- for (Map<String, Object> message : messages) {
- array.add(JsonUtil.toJsonObject(message));
- }
- return array.build().toString();
- }
-
- // Constructors --------------------------------------------------
-
- public JMSQueueControlImpl(final ActiveMQDestination managedQueue,
- final QueueControl coreQueueControl,
- final JMSServerManager jmsServerManager,
- final MessageCounter counter) throws Exception {
- super(JMSQueueControl.class);
- this.managedQueue = managedQueue;
- this.jmsServerManager = jmsServerManager;
- this.coreQueueControl = coreQueueControl;
- this.counter = counter;
- }
-
- // Public --------------------------------------------------------
-
- // ManagedJMSQueueMBean implementation ---------------------------
-
- @Override
- public String getName() {
- return managedQueue.getName();
- }
-
- @Override
- public String getAddress() {
- return managedQueue.getAddress();
- }
-
- @Override
- public boolean isTemporary() {
- return managedQueue.isTemporary();
- }
-
- @Override
- public long getMessageCount() {
- return coreQueueControl.getMessageCount();
- }
-
- @Override
- public long getMessagesAdded() {
- return coreQueueControl.getMessagesAdded();
- }
-
- @Override
- public long getMessagesExpired() {
- return coreQueueControl.getMessagesExpired();
- }
-
- @Override
- public long getMessagesKilled() {
- return coreQueueControl.getMessagesKilled();
- }
-
- @Override
- public int getConsumerCount() {
- return coreQueueControl.getConsumerCount();
- }
-
- @Override
- public int getDeliveringCount() {
- return coreQueueControl.getDeliveringCount();
- }
-
- @Override
- public long getScheduledCount() {
- return coreQueueControl.getScheduledCount();
- }
-
- public boolean isDurable() {
- return coreQueueControl.isDurable();
- }
-
- @Override
- public String getDeadLetterAddress() {
- return coreQueueControl.getDeadLetterAddress();
- }
-
- @Override
- public String getExpiryAddress() {
- return coreQueueControl.getExpiryAddress();
- }
-
- @Override
- public String getFirstMessageAsJSON() throws Exception {
- return coreQueueControl.getFirstMessageAsJSON();
- }
-
- @Override
- public Long getFirstMessageTimestamp() throws Exception {
- return coreQueueControl.getFirstMessageTimestamp();
- }
-
- @Override
- public Long getFirstMessageAge() throws Exception {
- return coreQueueControl.getFirstMessageAge();
- }
-
- @Override
- public void addBinding(String binding) throws Exception {
- jmsServerManager.addQueueToBindingRegistry(managedQueue.getName(), binding);
- }
-
- @Override
- public String[] getRegistryBindings() {
- return jmsServerManager.getBindingsOnQueue(managedQueue.getName());
- }
-
- @Override
- public boolean removeMessage(final String messageID) throws Exception {
- String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);
- int removed = coreQueueControl.removeMessages(filter);
- if (removed != 1) {
- throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
- }
- return true;
- }
-
- @Override
- public int removeMessages(final String filterStr) throws Exception {
- String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
- return coreQueueControl.removeMessages(filter);
- }
-
- @Override
- public Map<String, Object>[] listMessages(final String filterStr) throws Exception {
- try {
- String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
- Map<String, Object>[] coreMessages = coreQueueControl.listMessages(filter);
-
- return toJMSMap(coreMessages);
- } catch (ActiveMQException e) {
- throw new IllegalStateException(e.getMessage());
- }
- }
-
- private Map<String, Object>[] toJMSMap(Map<String, Object>[] coreMessages) {
- Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
-
- int i = 0;
-
- for (Map<String, Object> coreMessage : coreMessages) {
- Map<String, Object> jmsMessage = ActiveMQMessage.coreMaptoJMSMap(coreMessage);
- jmsMessages[i++] = jmsMessage;
- }
- return jmsMessages;
- }
-
- private CompositeData toJMSCompositeType(CompositeDataSupport data) throws Exception {
- return JMSOpenTypeSupport.convert(data);
- }
-
- @Override
- public Map<String, Object>[] listScheduledMessages() throws Exception {
- Map<String, Object>[] coreMessages = coreQueueControl.listScheduledMessages();
-
- return toJMSMap(coreMessages);
- }
-
- @Override
- public String listScheduledMessagesAsJSON() throws Exception {
- return coreQueueControl.listScheduledMessagesAsJSON();
- }
-
- @Override
- public Map<String, Map<String, Object>[]> listDeliveringMessages() throws Exception {
- try {
- Map<String, Map<String, Object>[]> returnMap = new HashMap<>();
-
- // the workingMap from the queue-control
- Map<String, Map<String, Object>[]> workingMap = coreQueueControl.listDeliveringMessages();
-
- for (Map.Entry<String, Map<String, Object>[]> entry : workingMap.entrySet()) {
- returnMap.put(entry.getKey(), toJMSMap(entry.getValue()));
- }
-
- return returnMap;
- } catch (ActiveMQException e) {
- throw new IllegalStateException(e.getMessage());
- }
- }
-
- @Override
- public String listDeliveringMessagesAsJSON() throws Exception {
- return coreQueueControl.listDeliveringMessagesAsJSON();
- }
-
- @Override
- public String listMessagesAsJSON(final String filter) throws Exception {
- return JMSQueueControlImpl.toJSON(listMessages(filter));
- }
-
- @Override
- public long countMessages(final String filterStr) throws Exception {
- String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
- return coreQueueControl.countMessages(filter);
- }
-
- @Override
- public boolean expireMessage(final String messageID) throws Exception {
- String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);
- int expired = coreQueueControl.expireMessages(filter);
- if (expired != 1) {
- throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
- }
- return true;
- }
-
- @Override
- public int expireMessages(final String filterStr) throws Exception {
- String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
- return coreQueueControl.expireMessages(filter);
- }
-
- @Override
- public boolean sendMessageToDeadLetterAddress(final String messageID) throws Exception {
- String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);
- int dead = coreQueueControl.sendMessagesToDeadLetterAddress(filter);
- if (dead != 1) {
- throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
- }
- return true;
- }
-
- @Override
- public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception {
- String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
- return coreQueueControl.sendMessagesToDeadLetterAddress(filter);
- }
-
- @Override
- public String sendTextMessageWithProperties(String properties) throws Exception {
- String[] kvs = properties.split(",");
- Map<String, String> props = new HashMap<>();
- for (String kv : kvs) {
- String[] it = kv.split("=");
- if (it.length == 2) {
- props.put(it[0], it[1]);
- }
- }
- return sendTextMessage(props, props.remove("body"), props.remove("username"), props.remove("password"));
- }
-
- @Override
- public String sendTextMessage(String body) throws Exception {
- return sendTextMessage(Collections.EMPTY_MAP, body);
- }
-
- @Override
- public String sendTextMessage(Map<String, String> headers, String body) throws Exception {
- return sendTextMessage(headers, body, null, null);
- }
-
- @Override
- public String sendTextMessage(String body, String user, String password) throws Exception {
- return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
- }
-
- @Override
- public String sendTextMessage(Map<String, String> headers,
- String body,
- String user,
- String password) throws Exception {
- boolean durable = false;
- if (headers.containsKey("JMSDeliveryMode")) {
- String jmsDeliveryMode = headers.remove("JMSDeliveryMode");
- if (jmsDeliveryMode != null && (jmsDeliveryMode.equals("2") || jmsDeliveryMode.equalsIgnoreCase("PERSISTENT"))) {
- durable = true;
- }
- }
- String userID = UUIDGenerator.getInstance().generateStringUUID();
- ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(56);
- buffer.writeNullableSimpleString(new SimpleString(body));
- byte[] bytes = new byte[buffer.readableBytes()];
- buffer.readBytes(bytes);
- coreQueueControl.sendMessage(headers, Message.TEXT_TYPE, Base64.encodeBytes(bytes), userID, durable, user, password);
- return userID;
- }
-
- @Override
- public boolean changeMessagePriority(final String messageID, final int newPriority) throws Exception {
- String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);
- int changed = coreQueueControl.changeMessagesPriority(filter, newPriority);
- if (changed != 1) {
- throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
- }
- return true;
- }
-
- @Override
- public int changeMessagesPriority(final String filterStr, final int newPriority) throws Exception {
- String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
- return coreQueueControl.changeMessagesPriority(filter, newPriority);
- }
-
- @Override
- public boolean retryMessage(final String jmsMessageID) throws Exception {
-
- // Figure out messageID from JMSMessageID.
- final String filter = createFilterForJMSMessageID(jmsMessageID);
- Map<String, Object>[] messages = coreQueueControl.listMessages(filter);
- if (messages.length != 1) { // if no messages. There should not be more than one, JMSMessageID should be unique.
- return false;
- }
-
- final Map<String, Object> messageToRedeliver = messages[0];
- Long messageID = (Long) messageToRedeliver.get("messageID");
- return messageID != null && coreQueueControl.retryMessage(messageID);
- }
-
- @Override
- public int retryMessages() throws Exception {
- return coreQueueControl.retryMessages();
- }
-
- @Override
- public boolean moveMessage(final String messageID, final String otherQueueName) throws Exception {
- return moveMessage(messageID, otherQueueName, false);
- }
-
- @Override
- public boolean moveMessage(final String messageID,
- final String otherQueueName,
- final boolean rejectDuplicates) throws Exception {
- String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);
- ActiveMQDestination otherQueue = ActiveMQDestination.createQueue(otherQueueName);
- int moved = coreQueueControl.moveMessages(filter, otherQueue.getAddress(), rejectDuplicates);
- if (moved != 1) {
- throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
- }
-
- return true;
- }
-
- @Override
- public int moveMessages(final String filterStr,
- final String otherQueueName,
- final boolean rejectDuplicates) throws Exception {
- String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
- ActiveMQDestination otherQueue = ActiveMQDestination.createQueue(otherQueueName);
- return coreQueueControl.moveMessages(filter, otherQueue.getAddress(), rejectDuplicates);
- }
-
- @Override
- public int moveMessages(final String filterStr, final String otherQueueName) throws Exception {
- return moveMessages(filterStr, otherQueueName, false);
- }
-
- @Override
- @Operation(desc = "List all the existent consumers on the Queue")
- public String listConsumersAsJSON() throws Exception {
- return coreQueueControl.listConsumersAsJSON();
- }
-
- @Override
- public String listMessageCounter() {
- try {
- return MessageCounterInfo.toJSon(counter);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- public void resetMessageCounter() throws Exception {
- coreQueueControl.resetMessageCounter();
- }
-
- @Override
- public String listMessageCounterAsHTML() {
- return MessageCounterHelper.listMessageCounterAsHTML(new MessageCounter[]{counter});
- }
-
- @Override
- public String listMessageCounterHistory() throws Exception {
- return MessageCounterHelper.listMessageCounterHistory(counter);
- }
-
- @Override
- public String listMessageCounterHistoryAsHTML() {
- return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[]{counter});
- }
-
- @Override
- public boolean isPaused() throws Exception {
- return coreQueueControl.isPaused();
- }
-
- @Override
- public void pause() throws Exception {
- coreQueueControl.pause();
- }
-
- @Override
- public void pause(boolean persist) throws Exception {
- coreQueueControl.pause(persist);
- }
-
- @Override
- public void resume() throws Exception {
- coreQueueControl.resume();
- }
-
- @Override
- public CompositeData[] browse() throws Exception {
- return browse(null);
- }
-
- @Override
- public CompositeData[] browse(String filter) throws Exception {
- try {
- CompositeData[] messages = coreQueueControl.browse(filter);
-
- ArrayList<CompositeData> c = new ArrayList<>();
-
- for (CompositeData message : messages) {
- c.add(toJMSCompositeType((CompositeDataSupport) message));
- }
- CompositeData[] rc = new CompositeData[c.size()];
- c.toArray(rc);
- return rc;
- } catch (ActiveMQInvalidFilterExpressionException e) {
- throw new InvalidSelectorException(e.getMessage());
- }
- }
-
- @Override
- public String getSelector() {
- return coreQueueControl.getFilter();
- }
-
- @Override
- public void flushExecutor() {
- coreQueueControl.flushExecutor();
- }
-
- @Override
- public MBeanInfo getMBeanInfo() {
- MBeanInfo info = super.getMBeanInfo();
- return new MBeanInfo(info.getClassName(), info.getDescription(), MBeanInfoHelper.getMBeanAttributesInfo(JMSQueueControl.class), info.getConstructors(), MBeanInfoHelper.getMBeanOperationsInfo(JMSQueueControl.class), info.getNotifications());
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSServerControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSServerControlImpl.java
deleted file mode 100644
index e9e2f3c..0000000
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSServerControlImpl.java
+++ /dev/null
@@ -1,876 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jms.management.impl;
-
-import javax.json.JsonArray;
-import javax.json.JsonArrayBuilder;
-import javax.json.JsonObject;
-import javax.json.JsonObjectBuilder;
-import javax.management.ListenerNotFoundException;
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanNotificationInfo;
-import javax.management.MBeanOperationInfo;
-import javax.management.Notification;
-import javax.management.NotificationBroadcasterSupport;
-import javax.management.NotificationEmitter;
-import javax.management.NotificationFilter;
-import javax.management.NotificationListener;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.api.core.management.Parameter;
-import org.apache.activemq.artemis.api.jms.JMSFactoryType;
-import org.apache.activemq.artemis.api.jms.management.ConnectionFactoryControl;
-import org.apache.activemq.artemis.api.jms.management.DestinationControl;
-import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;
-import org.apache.activemq.artemis.api.jms.management.JMSServerControl;
-import org.apache.activemq.artemis.api.jms.management.TopicControl;
-import org.apache.activemq.artemis.core.client.impl.Topology;
-import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
-import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.management.impl.AbstractControl;
-import org.apache.activemq.artemis.core.management.impl.MBeanInfoHelper;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
-import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
-import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.jms.server.ActiveMQJMSServerLogger;
-import org.apache.activemq.artemis.jms.server.JMSServerManager;
-import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration;
-import org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
-import org.apache.activemq.artemis.jms.server.management.JMSNotificationType;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.utils.JsonLoader;
-import org.apache.activemq.artemis.utils.TypedProperties;
-
-public class JMSServerControlImpl extends AbstractControl implements JMSServerControl, NotificationEmitter, org.apache.activemq.artemis.core.server.management.NotificationListener {
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final JMSServerManager server;
-
- private final NotificationBroadcasterSupport broadcaster;
-
- private final AtomicLong notifSeq = new AtomicLong(0);
-
- // Static --------------------------------------------------------
-
- private static String[] convert(final Object[] bindings) {
- String[] theBindings = new String[bindings.length];
- for (int i = 0, bindingsLength = bindings.length; i < bindingsLength; i++) {
- theBindings[i] = bindings[i].toString().trim();
- }
- return theBindings;
- }
-
- private static String[] toArray(final String commaSeparatedString) {
- if (commaSeparatedString == null || commaSeparatedString.trim().length() == 0) {
- return new String[0];
- }
- String[] values = commaSeparatedString.split(",");
- String[] trimmed = new String[values.length];
- for (int i = 0; i < values.length; i++) {
- trimmed[i] = values[i].trim();
- trimmed[i] = trimmed[i].replace(",", ",");
- }
- return trimmed;
- }
-
- public static MBeanNotificationInfo[] getNotificationInfos() {
- JMSNotificationType[] values = JMSNotificationType.values();
- String[] names = new String[values.length];
- for (int i = 0; i < values.length; i++) {
- names[i] = values[i].toString();
- }
- return new MBeanNotificationInfo[]{new MBeanNotificationInfo(names, JMSServerControl.class.getName(), "Notifications emitted by a JMS Server")};
- }
-
- // Constructors --------------------------------------------------
-
- public JMSServerControlImpl(final JMSServerManager server) throws Exception {
- super(JMSServerControl.class, server.getActiveMQServer().getStorageManager());
- this.server = server;
- broadcaster = new NotificationBroadcasterSupport();
- server.getActiveMQServer().getManagementService().addNotificationListener(this);
- }
-
- // Public --------------------------------------------------------
-
- // JMSServerControlMBean implementation --------------------------
-
- /**
- * See the interface definition for the javadoc.
- */
- @Override
- public void createConnectionFactory(String name,
- boolean ha,
- boolean useDiscovery,
- int cfType,
- String[] connectorNames,
- Object[] bindings) throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- if (useDiscovery) {
- if (connectorNames == null || connectorNames.length == 0) {
- throw new IllegalArgumentException("no discovery group name supplied");
- }
- server.createConnectionFactory(name, ha, JMSFactoryType.valueOf(cfType), connectorNames[0], JMSServerControlImpl.convert(bindings));
- } else {
- List<String> connectorList = new ArrayList<>(connectorNames.length);
-
- for (String str : connectorNames) {
- connectorList.add(str);
- }
-
- server.createConnectionFactory(name, ha, JMSFactoryType.valueOf(cfType), connectorList, JMSServerControlImpl.convert(bindings));
- }
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public void createConnectionFactory(String name,
- boolean ha,
- boolean useDiscovery,
- int cfType,
- String connectors,
- String bindings,
- String clientID,
- long clientFailureCheckPeriod,
- long connectionTTL,
- long callTimeout,
- long callFailoverTimeout,
- int minLargeMessageSize,
- boolean compressLargeMessages,
- int consumerWindowSize,
- int consumerMaxRate,
- int confirmationWindowSize,
- int producerWindowSize,
- int producerMaxRate,
- boolean blockOnAcknowledge,
- boolean blockOnDurableSend,
- boolean blockOnNonDurableSend,
- boolean autoGroup,
- boolean preAcknowledge,
- String loadBalancingPolicyClassName,
- int transactionBatchSize,
- int dupsOKBatchSize,
- boolean useGlobalPools,
- int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
- long retryInterval,
- double retryIntervalMultiplier,
- long maxRetryInterval,
- int reconnectAttempts,
- boolean failoverOnInitialConnection,
- String groupId) throws Exception {
- createConnectionFactory(name, ha, useDiscovery, cfType, toArray(connectors), toArray(bindings), clientID, clientFailureCheckPeriod, connectionTTL, callTimeout, callFailoverTimeout, minLargeMessageSize, compressLargeMessages, consumerWindowSize, consumerMaxRate, confirmationWindowSize, producerWindowSize, producerMaxRate, blockOnAcknowledge, blockOnDurableSend, blockOnNonDurableSend, autoGroup, preAcknowledge, loadBalancingPolicyClassName, transactionBatchSize, dupsOKBatchSize, useGlobalPools, scheduledThreadPoolMaxSize, threadPoolMaxSize, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, failoverOnInitialConnection, groupId);
- }
-
- @Override
- public void createConnectionFactory(String name,
- boolean ha,
- boolean useDiscovery,
- int cfType,
- String[] connectorNames,
- String[] bindings,
- String clientID,
- long clientFailureCheckPeriod,
- long connectionTTL,
- long callTimeout,
- long callFailoverTimeout,
- int minLargeMessageSize,
- boolean compressLargeMessages,
- int consumerWindowSize,
- int consumerMaxRate,
- int confirmationWindowSize,
- int producerWindowSize,
- int producerMaxRate,
- boolean blockOnAcknowledge,
- boolean blockOnDurableSend,
- boolean blockOnNonDurableSend,
- boolean autoGroup,
- boolean preAcknowledge,
- String loadBalancingPolicyClassName,
- int transactionBatchSize,
- int dupsOKBatchSize,
- boolean useGlobalPools,
- int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
- long retryInterval,
- double retryIntervalMultiplier,
- long maxRetryInterval,
- int reconnectAttempts,
- boolean failoverOnInitialConnection,
- String groupId) throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl().setName(name).setHA(ha).setBindings(bindings).setFactoryType(JMSFactoryType.valueOf(cfType)).setClientID(clientID).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setCallTimeout(callTimeout).setCallFailoverTimeout(callFailoverTimeout).setMinLargeMessageSize(minLargeMessageSize).setCompressLargeMessages(compressLargeMessages).setConsumerWindowSize(consumerWindowSize).setConsumerMaxRate(consumerMaxRate).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setProducerMaxRate(producerMaxRate).setBlockOnAcknowledge(blockOnAcknowledge).setBlockOnDurableSend(blockOnDurableSend).setBlockOnNonDurableSend(blockOnNonDurableSend).setAutoGroup(autoGroup).setPreAcknowledge(preAcknowledge).setTransactionBatchSize(transactionBatchSize).setDupsOKBatchSize(dupsOKBatchSize).setUseGlobalPools(useGlobalPools).setScheduledThreadPoolM
axSize(scheduledThreadPoolMaxSize).setThreadPoolMaxSize(threadPoolMaxSize).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setReconnectAttempts(reconnectAttempts).setFailoverOnInitialConnection(failoverOnInitialConnection).setGroupID(groupId);
-
- if (useDiscovery) {
- configuration.setDiscoveryGroupName(connectorNames[0]);
- } else {
- ArrayList<String> connectorNamesList = new ArrayList<>();
- for (String nameC : connectorNames) {
- connectorNamesList.add(nameC);
- }
- configuration.setConnectorNames(connectorNamesList);
- }
-
- if (loadBalancingPolicyClassName != null && !loadBalancingPolicyClassName.trim().equals("")) {
- configuration.setLoadBalancingPolicyClassName(loadBalancingPolicyClassName);
- }
-
- server.createConnectionFactory(true, configuration, bindings);
- } finally {
- blockOnIO();
- }
- }
-
- /**
- * Create a JMS ConnectionFactory with the specified name connected to a single live-backup pair of servers.
- * <br>
- * The ConnectionFactory is bound to the Registry for all the specified bindings Strings.
- */
- @Override
- public void createConnectionFactory(String name,
- boolean ha,
- boolean useDiscovery,
- int cfType,
- String connectors,
- String bindings) throws Exception {
- createConnectionFactory(name, ha, useDiscovery, cfType, toArray(connectors), toArray(bindings));
- }
-
- @Override
- public boolean createQueue(final String name) throws Exception {
- return createQueue(name, null, null, true);
- }
-
- @Override
- public boolean createQueue(final String name, final String bindings) throws Exception {
- return createQueue(name, bindings, null, true);
- }
-
- @Override
- public boolean createQueue(String name, String bindings, String selector) throws Exception {
- return createQueue(name, bindings, selector, true);
- }
-
- @Override
- public boolean createQueue(@Parameter(name = "name", desc = "Name of the queue to create") String name,
- @Parameter(name = "bindings", desc = "comma-separated list of Registry bindings (use ',' if u need to use commas in your bindings name)") String bindings,
- @Parameter(name = "selector", desc = "the jms selector") String selector,
- @Parameter(name = "durable", desc = "is the queue persistent and resilient to restart") boolean durable) throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- return server.createQueue(true, name, selector, durable, JMSServerControlImpl.toArray(bindings));
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public boolean destroyQueue(final String name) throws Exception {
- return destroyQueue(name, false);
- }
-
- @Override
- public boolean destroyQueue(final String name, final boolean removeConsumers) throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- return server.destroyQueue(name, removeConsumers);
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public boolean createTopic(String name) throws Exception {
- return createTopic(name, null);
- }
-
- @Override
- public boolean createTopic(final String topicName, final String bindings) throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- return server.createTopic(true, topicName, JMSServerControlImpl.toArray(bindings));
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public boolean destroyTopic(final String name) throws Exception {
- return destroyTopic(name, true);
- }
-
- @Override
- public boolean destroyTopic(final String name, final boolean removeConsumers) throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- return server.destroyTopic(name, removeConsumers);
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public void destroyConnectionFactory(final String name) throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- server.destroyConnectionFactory(name);
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public boolean isStarted() {
- return server.isStarted();
- }
-
- @Override
- public String getVersion() {
- checkStarted();
-
- return server.getVersion();
- }
-
- @Override
- public String[] getQueueNames() {
- checkStarted();
-
- clearIO();
-
- try {
- Object[] queueControls = server.getActiveMQServer().getManagementService().getResources(JMSQueueControl.class);
- String[] names = new String[queueControls.length];
- for (int i = 0; i < queueControls.length; i++) {
- JMSQueueControl queueControl = (JMSQueueControl) queueControls[i];
- names[i] = queueControl.getName();
- }
- return names;
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public String[] getTopicNames() {
- checkStarted();
-
- clearIO();
-
- try {
- Object[] topicControls = server.getActiveMQServer().getManagementService().getResources(TopicControl.class);
- String[] names = new String[topicControls.length];
- for (int i = 0; i < topicControls.length; i++) {
- TopicControl topicControl = (TopicControl) topicControls[i];
- names[i] = topicControl.getName();
- }
- return names;
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public String[] getConnectionFactoryNames() {
- checkStarted();
-
- clearIO();
-
- try {
- Object[] cfControls = server.getActiveMQServer().getManagementService().getResources(ConnectionFactoryControl.class);
- String[] names = new String[cfControls.length];
- for (int i = 0; i < cfControls.length; i++) {
- ConnectionFactoryControl cfControl = (ConnectionFactoryControl) cfControls[i];
- names[i] = cfControl.getName();
- }
- return names;
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public String getNodeID() {
- return server.getActiveMQServer().getNodeID().toString();
- }
-
- // NotificationEmitter implementation ----------------------------
-
- @Override
- public void removeNotificationListener(final NotificationListener listener,
- final NotificationFilter filter,
- final Object handback) throws ListenerNotFoundException {
- broadcaster.removeNotificationListener(listener, filter, handback);
- }
-
- @Override
- public void removeNotificationListener(final NotificationListener listener) throws ListenerNotFoundException {
- broadcaster.removeNotificationListener(listener);
- }
-
- @Override
- public void addNotificationListener(final NotificationListener listener,
- final NotificationFilter filter,
- final Object handback) throws IllegalArgumentException {
- broadcaster.addNotificationListener(listener, filter, handback);
- }
-
- @Override
- public MBeanNotificationInfo[] getNotificationInfo() {
- return JMSServerControlImpl.getNotificationInfos();
- }
-
- @Override
- public String[] listRemoteAddresses() throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- return server.listRemoteAddresses();
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public String[] listRemoteAddresses(final String ipAddress) throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- return server.listRemoteAddresses(ipAddress);
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public boolean closeConnectionsForAddress(final String ipAddress) throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- return server.closeConnectionsForAddress(ipAddress);
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public boolean closeConsumerConnectionsForAddress(final String address) throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- return server.closeConsumerConnectionsForAddress(address);
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public boolean closeConnectionsForUser(final String userName) throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- return server.closeConnectionsForUser(userName);
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public String[] listConnectionIDs() throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- return server.listConnectionIDs();
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public String listConnectionsAsJSON() throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- JsonArrayBuilder array = JsonLoader.createArrayBuilder();
-
- Set<RemotingConnection> connections = server.getActiveMQServer().getRemotingService().getConnections();
-
- Set<ServerSession> sessions = server.getActiveMQServer().getSessions();
-
- Map<Object, ServerSession> jmsSessions = new HashMap<>();
-
- // First separate the real jms sessions, after all we are only interested in those here on the *jms* server controller
- for (ServerSession session : sessions) {
- if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null) {
- jmsSessions.put(session.getConnectionID(), session);
- }
- }
-
- for (RemotingConnection connection : connections) {
- ServerSession session = jmsSessions.get(connection.getID());
- if (session != null) {
- JsonObjectBuilder objectBuilder = JsonLoader.createObjectBuilder().add("connectionID", connection.getID().toString()).add("clientAddress", connection.getRemoteAddress()).add("creationTime", connection.getCreationTime());
-
- if (session.getMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY) != null) {
- objectBuilder.add("clientID", session.getMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY));
- }
-
- if (session.getUsername() != null) {
- objectBuilder.add("principal", session.getUsername());
- }
-
- array.add(objectBuilder.build());
- }
- }
- return array.build().toString();
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public String listConsumersAsJSON(String connectionID) throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- JsonArrayBuilder array = JsonLoader.createArrayBuilder();
-
- Set<RemotingConnection> connections = server.getActiveMQServer().getRemotingService().getConnections();
- for (RemotingConnection connection : connections) {
- if (connectionID.equals(connection.getID().toString())) {
- List<ServerSession> sessions = server.getActiveMQServer().getSessions(connectionID);
- for (ServerSession session : sessions) {
- Set<ServerConsumer> consumers = session.getServerConsumers();
- for (ServerConsumer consumer : consumers) {
- JsonObject obj = toJSONObject(consumer);
- if (obj != null) {
- array.add(obj);
- }
- }
- }
- }
- }
- return array.build().toString();
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public String listAllConsumersAsJSON() throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- JsonArray jsonArray = toJsonArray(server.getActiveMQServer().getSessions());
- return jsonArray.toString();
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public String[] listSessions(final String connectionID) throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- return server.listSessions(connectionID);
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public String listPreparedTransactionDetailsAsJSON() throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- return server.listPreparedTransactionDetailsAsJSON();
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public String listPreparedTransactionDetailsAsHTML() throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- return server.listPreparedTransactionDetailsAsHTML();
- } finally {
- blockOnIO();
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
- /* (non-Javadoc)
- * @see org.apache.activemq.artemis.core.management.impl.AbstractControl#fillMBeanOperationInfo()
- */
- @Override
- protected MBeanOperationInfo[] fillMBeanOperationInfo() {
- return MBeanInfoHelper.getMBeanOperationsInfo(JMSServerControl.class);
- }
-
- @Override
- protected MBeanAttributeInfo[] fillMBeanAttributeInfo() {
- return MBeanInfoHelper.getMBeanAttributesInfo(JMSServerControl.class);
- }
-
- // Private -------------------------------------------------------
-
- private void checkStarted() {
- if (!server.isStarted()) {
- throw new IllegalStateException("ActiveMQ Artemis JMS Server is not started. It can not be managed yet");
- }
- }
-
- // Inner classes -------------------------------------------------
-
- @Override
- public String[] listTargetDestinations(String sessionID) throws Exception {
- String[] addresses = server.getActiveMQServer().getActiveMQServerControl().listTargetAddresses(sessionID);
- Map<String, DestinationControl> allDests = new HashMap<>();
-
- Object[] queueControls = server.getActiveMQServer().getManagementService().getResources(JMSQueueControl.class);
- for (Object queueControl2 : queueControls) {
- JMSQueueControl queueControl = (JMSQueueControl) queueControl2;
- allDests.put(queueControl.getAddress(), queueControl);
- }
-
- Object[] topicControls = server.getActiveMQServer().getManagementService().getResources(TopicControl.class);
- for (Object topicControl2 : topicControls) {
- TopicControl topicControl = (TopicControl) topicControl2;
- allDests.put(topicControl.getAddress(), topicControl);
- }
-
- List<String> destinations = new ArrayList<>();
- for (String addresse : addresses) {
- DestinationControl control = allDests.get(addresse);
- if (control != null) {
- destinations.add(control.getAddress());
- }
- }
- return destinations.toArray(new String[destinations.size()]);
- }
-
- @Override
- public String getLastSentMessageID(String sessionID, String address) throws Exception {
- ServerSession session = server.getActiveMQServer().getSessionByID(sessionID);
- if (session != null) {
- return session.getLastSentMessageID(address);
- }
- return null;
- }
-
- @Override
- public String getSessionCreationTime(String sessionID) throws Exception {
- ServerSession session = server.getActiveMQServer().getSessionByID(sessionID);
- if (session != null) {
- return String.valueOf(session.getCreationTime());
- }
- return null;
- }
-
- @Override
- public String listSessionsAsJSON(final String connectionID) throws Exception {
- checkStarted();
-
- clearIO();
-
- try {
- return server.listSessionsAsJSON(connectionID);
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public String listNetworkTopology() throws Exception {
- checkStarted();
-
- clearIO();
- try {
- JsonArrayBuilder brokers = JsonLoader.createArrayBuilder();
- ClusterManager clusterManager = server.getActiveMQServer().getClusterManager();
- if (clusterManager != null) {
- Set<ClusterConnection> clusterConnections = clusterManager.getClusterConnections();
- for (ClusterConnection clusterConnection : clusterConnections) {
- Topology topology = clusterConnection.getTopology();
- Collection<TopologyMemberImpl> members = topology.getMembers();
- for (TopologyMemberImpl member : members) {
-
- JsonObjectBuilder obj = JsonLoader.createObjectBuilder();
- TransportConfiguration live = member.getLive();
- if (live != null) {
- obj.add("nodeID", member.getNodeId()).add("live", live.getParams().get("host") + ":" + live.getParams().get("port"));
- TransportConfiguration backup = member.getBackup();
- if (backup != null) {
- obj.add("backup", backup.getParams().get("host") + ":" + backup.getParams().get("port"));
- }
- }
- brokers.add(obj);
- }
- }
- }
- return brokers.build().toString();
- } finally {
- blockOnIO();
- }
- }
-
- @Override
- public String closeConnectionWithClientID(final String clientID) throws Exception {
- return server.getActiveMQServer().destroyConnectionWithSessionMetadata(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
- }
-
- private String determineJMSDestinationType(Queue queue) {
- String result;
- if (server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(queue.getAddress().toString())).getRoutingType() == AddressInfo.RoutingType.ANYCAST) {
- if (queue.isTemporary()) {
- result = "tempqueue";
- } else {
- result = "queue";
- }
- } else if (server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(queue.getAddress().toString())).getRoutingType() == AddressInfo.RoutingType.MULTICAST) {
- if (queue.isTemporary()) {
- result = "temptopic";
- } else {
- result = "topic";
- }
- } else {
- ActiveMQJMSServerLogger.LOGGER.debug("JMSServerControlImpl.determineJMSDestinationType() " + queue);
- // not related to JMS
- return null;
- }
- return result;
- }
-
- private JsonObject toJSONObject(ServerConsumer consumer) {
- AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(consumer.getQueue().getAddress().toString()));
- if (addressInfo == null) {
- return null;
- }
- JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("consumerID", consumer.getID()).add("connectionID", consumer.getConnectionID().toString()).add("sessionID", consumer.getSessionID()).add("queueName", consumer.getQueue().getName().toString()).add("browseOnly", consumer.isBrowseOnly()).add("creationTime", consumer.getCreationTime()).add("destinationName", consumer.getQueue().getAddress().toString()).add("destinationType", determineJMSDestinationType(consumer.getQueue()));
- // JMS consumer with message filter use the queue's filter
- Filter queueFilter = consumer.getQueue().getFilter();
- if (queueFilter != null) {
- obj.add("filter", queueFilter.getFilterString().toString());
- }
-
- if (addressInfo.getRoutingType().equals(AddressInfo.RoutingType.MULTICAST)) {
- if (consumer.getQueue().isTemporary()) {
- obj.add("durable", false);
- } else {
- obj.add("durable", true);
- }
- } else {
- obj.add("durable", false);
- }
-
- return obj.build();
- }
-
- @Override
- public void onNotification(org.apache.activemq.artemis.core.server.management.Notification notification) {
- if (!(notification.getType() instanceof JMSNotificationType))
- return;
- JMSNotificationType type = (JMSNotificationType) notification.getType();
- TypedProperties prop = notification.getProperties();
-
- this.broadcaster.sendNotification(new Notification(type.toString(), this, notifSeq.incrementAndGet(), prop.getSimpleStringProperty(JMSNotificationType.MESSAGE).toString()));
- }
-
- private JsonArray toJsonArray(Collection<ServerSession> sessions) {
- JsonArrayBuilder array = JsonLoader.createArrayBuilder();
-
- for (ServerSession session : sessions) {
- Set<ServerConsumer> consumers = session.getServerConsumers();
- for (ServerConsumer consumer : consumers) {
- JsonObject obj = toJSONObject(consumer);
- if (obj != null) {
- array.add(obj);
- }
- }
- }
- return array.build();
- }
-
-}
[2/3] activemq-artemis git commit: remove JMS JMX Objects and add new
Address JMX objects
Posted by an...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java
deleted file mode 100644
index f60f526..0000000
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jms.management.impl;
-
-import javax.json.JsonArrayBuilder;
-import javax.json.JsonObject;
-import javax.management.MBeanInfo;
-import javax.management.StandardMBean;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.Pair;
-import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
-import org.apache.activemq.artemis.api.core.management.AddressControl;
-import org.apache.activemq.artemis.api.core.management.QueueControl;
-import org.apache.activemq.artemis.api.core.management.ResourceNames;
-import org.apache.activemq.artemis.api.jms.management.TopicControl;
-import org.apache.activemq.artemis.core.management.impl.MBeanInfoHelper;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
-import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
-import org.apache.activemq.artemis.jms.server.JMSServerManager;
-import org.apache.activemq.artemis.utils.JsonLoader;
-import org.apache.activemq.artemis.utils.SelectorTranslator;
-
-import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
-
-public class JMSTopicControlImpl extends StandardMBean implements TopicControl {
-
- private final ActiveMQDestination managedTopic;
-
- private final AddressControl addressControl;
-
- private final ManagementService managementService;
-
- private final JMSServerManager jmsServerManager;
-
- // Static --------------------------------------------------------
-
- public static String createFilterFromJMSSelector(final String selectorStr) throws ActiveMQException {
- return selectorStr == null || selectorStr.trim().length() == 0 ? null : SelectorTranslator.convertToActiveMQFilterString(selectorStr);
- }
-
- // Constructors --------------------------------------------------
-
- public JMSTopicControlImpl(final ActiveMQDestination topic,
- final JMSServerManager jmsServerManager,
- final AddressControl addressControl,
- final ManagementService managementService) throws Exception {
- super(TopicControl.class);
- this.jmsServerManager = jmsServerManager;
- managedTopic = topic;
- this.addressControl = addressControl;
- this.managementService = managementService;
- }
-
- // TopicControlMBean implementation ------------------------------
-
- @Override
- public void addBinding(String binding) throws Exception {
- jmsServerManager.addTopicToBindingRegistry(managedTopic.getName(), binding);
- }
-
- @Override
- public String[] getRegistryBindings() {
- return jmsServerManager.getBindingsOnTopic(managedTopic.getName());
- }
-
- @Override
- public String getName() {
- return managedTopic.getName();
- }
-
- @Override
- public boolean isTemporary() {
- return managedTopic.isTemporary();
- }
-
- @Override
- public String getAddress() {
- return managedTopic.getAddress();
- }
-
- @Override
- public long getMessageCount() {
- return getMessageCount(DurabilityType.ALL);
- }
-
- @Override
- public int getDeliveringCount() {
- List<QueueControl> queues = getQueues(DurabilityType.ALL);
- int count = 0;
- for (QueueControl queue : queues) {
- count += queue.getDeliveringCount();
- }
- return count;
- }
-
- @Override
- public long getMessagesAdded() {
- List<QueueControl> queues = getQueues(DurabilityType.ALL);
- int count = 0;
- for (QueueControl queue : queues) {
- count += queue.getMessagesAdded();
- }
- return count;
- }
-
- @Override
- public int getDurableMessageCount() {
- return getMessageCount(DurabilityType.DURABLE);
- }
-
- @Override
- public int getNonDurableMessageCount() {
- return getMessageCount(DurabilityType.NON_DURABLE);
- }
-
- @Override
- public int getSubscriptionCount() {
- return getQueues(DurabilityType.ALL).size();
- }
-
- @Override
- public int getDurableSubscriptionCount() {
- return getQueues(DurabilityType.DURABLE).size();
- }
-
- @Override
- public int getNonDurableSubscriptionCount() {
- return getQueues(DurabilityType.NON_DURABLE).size();
- }
-
- @Override
- public Object[] listAllSubscriptions() {
- return listSubscribersInfos(DurabilityType.ALL);
- }
-
- @Override
- public String listAllSubscriptionsAsJSON() throws Exception {
- return listSubscribersInfosAsJSON(DurabilityType.ALL);
- }
-
- @Override
- public Object[] listDurableSubscriptions() {
- return listSubscribersInfos(DurabilityType.DURABLE);
- }
-
- @Override
- public String listDurableSubscriptionsAsJSON() throws Exception {
- return listSubscribersInfosAsJSON(DurabilityType.DURABLE);
- }
-
- @Override
- public Object[] listNonDurableSubscriptions() {
- return listSubscribersInfos(DurabilityType.NON_DURABLE);
- }
-
- @Override
- public String listNonDurableSubscriptionsAsJSON() throws Exception {
- return listSubscribersInfosAsJSON(DurabilityType.NON_DURABLE);
- }
-
- @Override
- public Map<String, Object>[] listMessagesForSubscription(final String queueName) throws Exception {
- QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
- if (coreQueueControl == null) {
- throw new IllegalArgumentException("No subscriptions with name " + queueName);
- }
-
- Map<String, Object>[] coreMessages = coreQueueControl.listMessages(null);
-
- Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
-
- int i = 0;
-
- for (Map<String, Object> coreMessage : coreMessages) {
- jmsMessages[i++] = ActiveMQMessage.coreMaptoJMSMap(coreMessage);
- }
- return jmsMessages;
- }
-
- @Override
- public String listMessagesForSubscriptionAsJSON(final String queueName) throws Exception {
- return JMSQueueControlImpl.toJSON(listMessagesForSubscription(queueName));
- }
-
- @Override
- public int countMessagesForSubscription(final String clientID,
- final String subscriptionName,
- final String filterStr) throws Exception {
- String queueName = ActiveMQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName);
- QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
- if (coreQueueControl == null) {
- throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
- }
- String filter = JMSTopicControlImpl.createFilterFromJMSSelector(filterStr);
- return coreQueueControl.listMessages(filter).length;
- }
-
- @Override
- public int removeMessages(final String filterStr) throws Exception {
- String filter = JMSTopicControlImpl.createFilterFromJMSSelector(filterStr);
- int count = 0;
- String[] queues = addressControl.getQueueNames();
- for (String queue : queues) {
- QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.CORE_QUEUE + queue);
- if (coreQueueControl != null) {
- count += coreQueueControl.removeMessages(filter);
- }
- }
-
- return count;
- }
-
- @Override
- public void dropDurableSubscription(final String clientID, final String subscriptionName) throws Exception {
- String queueName = ActiveMQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName);
- QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
- if (coreQueueControl == null) {
- throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
- }
- ActiveMQServerControl serverControl = (ActiveMQServerControl) managementService.getResource(ResourceNames.CORE_SERVER);
- serverControl.destroyQueue(queueName, true);
- }
-
- @Override
- public void dropAllSubscriptions() throws Exception {
- ActiveMQServerControl serverControl = (ActiveMQServerControl) managementService.getResource(ResourceNames.CORE_SERVER);
- String[] queues = addressControl.getQueueNames();
- for (String queue : queues) {
- // Drop all subscription shouldn't delete the dummy queue used to identify if the topic exists on the core queues.
- // we will just ignore this queue
- if (!queue.equals(managedTopic.getAddress())) {
- serverControl.destroyQueue(queue);
- }
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private Object[] listSubscribersInfos(final DurabilityType durability) {
- List<QueueControl> queues = getQueues(durability);
- List<Object[]> subInfos = new ArrayList<>(queues.size());
-
- for (QueueControl queue : queues) {
- String clientID = null;
- String subName = null;
-
- if (queue.isDurable()) {
- Pair<String, String> pair = ActiveMQDestination.decomposeQueueNameForDurableSubscription(queue.getName());
- clientID = pair.getA();
- subName = pair.getB();
- }
-
- String filter = queue.getFilter() != null ? queue.getFilter() : null;
-
- Object[] subscriptionInfo = new Object[6];
- subscriptionInfo[0] = queue.getName();
- subscriptionInfo[1] = clientID;
- subscriptionInfo[2] = subName;
- subscriptionInfo[3] = queue.isDurable();
- subscriptionInfo[4] = queue.getMessageCount();
- subscriptionInfo[5] = filter;
- subInfos.add(subscriptionInfo);
- }
- return subInfos.toArray(new Object[subInfos.size()]);
- }
-
- private String listSubscribersInfosAsJSON(final DurabilityType durability) throws Exception {
- try {
- List<QueueControl> queues = getQueues(durability);
- JsonArrayBuilder array = JsonLoader.createArrayBuilder();
-
- for (QueueControl queue : queues) {
- String clientID = null;
- String subName = null;
-
- if (queue.isDurable()) {
- Pair<String, String> pair = ActiveMQDestination.decomposeQueueNameForDurableSubscription(queue.getName());
- clientID = pair.getA();
- subName = pair.getB();
- } else {
- // in the case of heirarchical topics the queue name will not follow the <part>.<part> pattern of normal
- // durable subscribers so skip decomposing the name for the client ID and subscription name and just
- // hard-code it
- clientID = "";
- subName = "";
- }
-
- String filter = queue.getFilter() != null ? queue.getFilter() : null;
-
- JsonObject info = JsonLoader.createObjectBuilder().add("queueName", queue.getName()).add("clientID", nullSafe(clientID)).add("selector", nullSafe(filter)).add("name", nullSafe(subName)).add("durable", queue.isDurable()).add("messageCount", queue.getMessageCount()).add("deliveringCount", queue.getDeliveringCount()).add("consumers", queue.listConsumersAsJSON()).build();
-
- array.add(info);
- }
-
- return array.build().toString();
- } catch (Exception e) {
- e.printStackTrace();
- return e.toString();
- }
- }
-
- private int getMessageCount(final DurabilityType durability) {
- List<QueueControl> queues = getQueues(durability);
- int count = 0;
- for (QueueControl queue : queues) {
- count += queue.getMessageCount();
- }
- return count;
- }
-
- private List<QueueControl> getQueues(final DurabilityType durability) {
- try {
- List<QueueControl> matchingQueues = new ArrayList<>();
- String[] queues = addressControl.getQueueNames();
- for (String queue : queues) {
- QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.CORE_QUEUE + queue);
-
- // Ignore the "special" subscription
- if (coreQueueControl != null && !coreQueueControl.getName().equals(addressControl.getAddress())) {
- if (durability == DurabilityType.ALL || durability == DurabilityType.DURABLE && coreQueueControl.isDurable() ||
- durability == DurabilityType.NON_DURABLE && !coreQueueControl.isDurable()) {
- matchingQueues.add(coreQueueControl);
- }
- }
- }
- return matchingQueues;
- } catch (Exception e) {
- return Collections.emptyList();
- }
- }
-
- @Override
- public MBeanInfo getMBeanInfo() {
- MBeanInfo info = super.getMBeanInfo();
- return new MBeanInfo(info.getClassName(), info.getDescription(), MBeanInfoHelper.getMBeanAttributesInfo(TopicControl.class), info.getConstructors(), MBeanInfoHelper.getMBeanOperationsInfo(TopicControl.class), info.getNotifications());
- }
-
- // Inner classes -------------------------------------------------
-
- private enum DurabilityType {
- ALL, DURABLE, NON_DURABLE
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSCompositeDataConstants.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSCompositeDataConstants.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSCompositeDataConstants.java
deleted file mode 100644
index dc5b33b..0000000
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSCompositeDataConstants.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jms.management.impl.openmbean;
-
-public interface JMSCompositeDataConstants {
-
- String JMS_DESTINATION = "JMSDestination";
- String JMS_MESSAGE_ID = "JMSMessageID";
- String JMS_TYPE = "JMSType";
- String JMS_DELIVERY_MODE = "JMSDeliveryMode";
- String JMS_EXPIRATION = "JMSExpiration";
- String JMS_PRIORITY = "JMSPriority";
- String JMS_REDELIVERED = "JMSRedelivered";
- String JMS_TIMESTAMP = "JMSTimestamp";
- String JMSXGROUP_SEQ = "JMSXGroupSeq";
- String JMSXGROUP_ID = "JMSXGroupID";
- String JMSXUSER_ID = "JMSXUserID";
- String JMS_CORRELATION_ID = "JMSCorrelationID";
- String ORIGINAL_DESTINATION = "OriginalDestination";
- String JMS_REPLY_TO = "JMSReplyTo";
-
- String JMS_DESTINATION_DESCRIPTION = "The message destination";
- String JMS_MESSAGE_ID_DESCRIPTION = "The message ID";
- String JMS_TYPE_DESCRIPTION = "The message type";
- String JMS_DELIVERY_MODE_DESCRIPTION = "The message delivery mode";
- String JMS_EXPIRATION_DESCRIPTION = "The message expiration";
- String JMS_PRIORITY_DESCRIPTION = "The message priority";
- String JMS_REDELIVERED_DESCRIPTION = "Is the message redelivered";
- String JMS_TIMESTAMP_DESCRIPTION = "The message timestamp";
- String JMSXGROUP_SEQ_DESCRIPTION = "The message group sequence number";
- String JMSXGROUP_ID_DESCRIPTION = "The message group ID";
- String JMSXUSER_ID_DESCRIPTION = "The user that sent the message";
- String JMS_CORRELATION_ID_DESCRIPTION = "The message correlation ID";
- String ORIGINAL_DESTINATION_DESCRIPTION = "Original Destination Before Senting To DLQ";
- String JMS_REPLY_TO_DESCRIPTION = "The reply to address";
-
- String BODY_LENGTH = "BodyLength";
- String BODY_PREVIEW = "BodyPreview";
- String CONTENT_MAP = "ContentMap";
- String MESSAGE_TEXT = "Text";
- String MESSAGE_URL = "Url";
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSOpenTypeSupport.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSOpenTypeSupport.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSOpenTypeSupport.java
deleted file mode 100644
index 285657d..0000000
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSOpenTypeSupport.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jms.management.impl.openmbean;
-
-import javax.management.openmbean.ArrayType;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants;
-import org.apache.activemq.artemis.reader.MapMessageUtil;
-import org.apache.activemq.artemis.utils.TypedProperties;
-
-public final class JMSOpenTypeSupport {
-
- public interface OpenTypeFactory {
-
- CompositeType getCompositeType() throws OpenDataException;
-
- Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException;
- }
-
- private static final Map<Byte, AbstractOpenTypeFactory> OPEN_TYPE_FACTORIES = new HashMap<>();
-
- public abstract static class AbstractOpenTypeFactory implements OpenTypeFactory {
-
- private CompositeType compositeType;
- private final List<String> itemNamesList = new ArrayList<>();
- private final List<String> itemDescriptionsList = new ArrayList<>();
- private final List<OpenType> itemTypesList = new ArrayList<>();
-
- @Override
- public CompositeType getCompositeType() throws OpenDataException {
- if (compositeType == null) {
- init();
- compositeType = createCompositeType();
- }
- return compositeType;
- }
-
- protected void init() throws OpenDataException {
- }
-
- protected CompositeType createCompositeType() throws OpenDataException {
- String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]);
- String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]);
- OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]);
- return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes);
- }
-
- protected abstract String getTypeName();
-
- protected void addItem(String name, String description, OpenType type) {
- itemNamesList.add(name);
- itemDescriptionsList.add(description);
- itemTypesList.add(type);
- }
-
- protected String getDescription() {
- return getTypeName();
- }
-
- @Override
- public Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException {
- Map<String, Object> rc = new HashMap<>();
- return rc;
- }
- }
-
- static class MessageOpenTypeFactory extends AbstractOpenTypeFactory {
-
- protected TabularType stringPropertyTabularType;
- protected TabularType booleanPropertyTabularType;
- protected TabularType bytePropertyTabularType;
- protected TabularType shortPropertyTabularType;
- protected TabularType intPropertyTabularType;
- protected TabularType longPropertyTabularType;
- protected TabularType floatPropertyTabularType;
- protected TabularType doublePropertyTabularType;
-
- protected ArrayType body;
-
- @Override
- protected String getTypeName() {
- return Message.class.getName();
- }
-
- @Override
- protected void init() throws OpenDataException {
- super.init();
-
- addItem(JMSCompositeDataConstants.JMS_DESTINATION, JMSCompositeDataConstants.JMS_DESTINATION_DESCRIPTION, SimpleType.STRING);
- addItem(JMSCompositeDataConstants.JMS_MESSAGE_ID, JMSCompositeDataConstants.JMS_MESSAGE_ID_DESCRIPTION, SimpleType.STRING);
- addItem(JMSCompositeDataConstants.JMS_CORRELATION_ID, JMSCompositeDataConstants.JMS_CORRELATION_ID_DESCRIPTION, SimpleType.STRING);
- addItem(JMSCompositeDataConstants.JMS_TYPE, JMSCompositeDataConstants.JMS_TYPE_DESCRIPTION, SimpleType.STRING);
- addItem(JMSCompositeDataConstants.JMS_DELIVERY_MODE, JMSCompositeDataConstants.JMS_DELIVERY_MODE_DESCRIPTION, SimpleType.STRING);
- addItem(JMSCompositeDataConstants.JMS_EXPIRATION, JMSCompositeDataConstants.JMS_EXPIRATION_DESCRIPTION, SimpleType.LONG);
- addItem(JMSCompositeDataConstants.JMS_PRIORITY, JMSCompositeDataConstants.JMS_PRIORITY_DESCRIPTION, SimpleType.INTEGER);
- addItem(JMSCompositeDataConstants.JMS_REDELIVERED, JMSCompositeDataConstants.JMS_REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN);
- addItem(JMSCompositeDataConstants.JMS_TIMESTAMP, JMSCompositeDataConstants.JMS_TIMESTAMP_DESCRIPTION, SimpleType.DATE);
- addItem(JMSCompositeDataConstants.JMSXGROUP_ID, JMSCompositeDataConstants.JMSXGROUP_ID_DESCRIPTION, SimpleType.STRING);
- addItem(JMSCompositeDataConstants.JMSXGROUP_SEQ, JMSCompositeDataConstants.JMSXGROUP_SEQ_DESCRIPTION, SimpleType.INTEGER);
- addItem(JMSCompositeDataConstants.JMSXUSER_ID, JMSCompositeDataConstants.JMSXUSER_ID_DESCRIPTION, SimpleType.STRING);
- addItem(JMSCompositeDataConstants.JMS_REPLY_TO, JMSCompositeDataConstants.JMS_REPLY_TO_DESCRIPTION, SimpleType.STRING);
- addItem(JMSCompositeDataConstants.ORIGINAL_DESTINATION, JMSCompositeDataConstants.ORIGINAL_DESTINATION_DESCRIPTION, SimpleType.STRING);
- addItem(CompositeDataConstants.PROPERTIES, CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING);
-
- // now lets expose the type safe properties
- stringPropertyTabularType = createTabularType(String.class, SimpleType.STRING);
- booleanPropertyTabularType = createTabularType(Boolean.class, SimpleType.BOOLEAN);
- bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE);
- shortPropertyTabularType = createTabularType(Short.class, SimpleType.SHORT);
- intPropertyTabularType = createTabularType(Integer.class, SimpleType.INTEGER);
- longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG);
- floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT);
- doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE);
-
- addItem(CompositeDataConstants.STRING_PROPERTIES, CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION, stringPropertyTabularType);
- addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION, booleanPropertyTabularType);
- addItem(CompositeDataConstants.BYTE_PROPERTIES, CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType);
- addItem(CompositeDataConstants.SHORT_PROPERTIES, CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType);
- addItem(CompositeDataConstants.INT_PROPERTIES, CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType);
- addItem(CompositeDataConstants.LONG_PROPERTIES, CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType);
- addItem(CompositeDataConstants.FLOAT_PROPERTIES, CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType);
- addItem(CompositeDataConstants.DOUBLE_PROPERTIES, CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, doublePropertyTabularType);
- }
-
- @Override
- public Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException {
- Map<String, Object> rc = super.getFields(data);
- putString(rc, data, JMSCompositeDataConstants.JMS_MESSAGE_ID, CompositeDataConstants.USER_ID);
- putString(rc, data, JMSCompositeDataConstants.JMS_DESTINATION, CompositeDataConstants.ADDRESS);
- putStringProperty(rc, data, JMSCompositeDataConstants.JMS_REPLY_TO, "JMSReplyTo");
- rc.put(JMSCompositeDataConstants.JMS_TYPE, getType());
- rc.put(JMSCompositeDataConstants.JMS_DELIVERY_MODE, ((Boolean) data.get(CompositeDataConstants.DURABLE)) ? "PERSISTENT" : "NON-PERSISTENT");
- rc.put(JMSCompositeDataConstants.JMS_EXPIRATION, data.get(CompositeDataConstants.EXPIRATION));
- rc.put(JMSCompositeDataConstants.JMS_TIMESTAMP, new Date((Long) data.get(CompositeDataConstants.TIMESTAMP)));
- rc.put(JMSCompositeDataConstants.JMS_PRIORITY, ((Byte) data.get(CompositeDataConstants.PRIORITY)).intValue());
- putStringProperty(rc, data, JMSCompositeDataConstants.JMS_CORRELATION_ID, JMSCompositeDataConstants.JMS_CORRELATION_ID);
- rc.put(JMSCompositeDataConstants.JMS_REDELIVERED, data.get(CompositeDataConstants.REDELIVERED));
- putStringProperty(rc, data, JMSCompositeDataConstants.JMSXGROUP_ID, Message.HDR_GROUP_ID.toString());
- putIntProperty(rc, data, JMSCompositeDataConstants.JMSXGROUP_SEQ, JMSCompositeDataConstants.JMSXGROUP_SEQ);
- putStringProperty(rc, data, JMSCompositeDataConstants.JMSXUSER_ID, Message.HDR_VALIDATED_USER.toString());
- putStringProperty(rc, data, JMSCompositeDataConstants.ORIGINAL_DESTINATION, Message.HDR_ORIGINAL_ADDRESS.toString());
-
- rc.put(CompositeDataConstants.PROPERTIES, "" + data.get(CompositeDataConstants.PROPERTIES));
-
- rc.put(CompositeDataConstants.STRING_PROPERTIES, data.get(CompositeDataConstants.STRING_PROPERTIES));
- rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, data.get(CompositeDataConstants.BOOLEAN_PROPERTIES));
- rc.put(CompositeDataConstants.BYTE_PROPERTIES, data.get(CompositeDataConstants.BYTE_PROPERTIES));
- rc.put(CompositeDataConstants.SHORT_PROPERTIES, data.get(CompositeDataConstants.SHORT_PROPERTIES));
- rc.put(CompositeDataConstants.INT_PROPERTIES, data.get(CompositeDataConstants.INT_PROPERTIES));
- rc.put(CompositeDataConstants.LONG_PROPERTIES, data.get(CompositeDataConstants.LONG_PROPERTIES));
- rc.put(CompositeDataConstants.FLOAT_PROPERTIES, data.get(CompositeDataConstants.FLOAT_PROPERTIES));
- rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, data.get(CompositeDataConstants.DOUBLE_PROPERTIES));
-
- return rc;
- }
-
- private void putString(Map<String, Object> rc, CompositeDataSupport data, String target, String source) {
- String prop = (String) data.get(source);
- if (prop != null) {
- rc.put(target, prop);
- } else {
- rc.put(target, "");
- }
- }
-
- private void putStringProperty(Map<String, Object> rc, CompositeDataSupport data, String target, String source) {
- TabularDataSupport properties = (TabularDataSupport) data.get(CompositeDataConstants.STRING_PROPERTIES);
- Object[] keys = new Object[]{source};
- CompositeDataSupport cds = (CompositeDataSupport) properties.get(keys);
- String prop = "";
- if (cds != null && cds.get("value") != null) {
- prop = (String) cds.get("value");
- }
- rc.put(target, prop);
- }
-
- private void putIntProperty(Map<String, Object> rc, CompositeDataSupport data, String target, String source) {
- TabularDataSupport properties = (TabularDataSupport) data.get(CompositeDataConstants.INT_PROPERTIES);
- Object[] keys = new Object[]{source};
- CompositeDataSupport cds = (CompositeDataSupport) properties.get(keys);
- Integer prop = 0;
- if (cds != null && cds.get("value") != null) {
- prop = (Integer) cds.get("value");
- }
- rc.put(target, prop);
- }
-
- private String getType() {
- return "Message";
- }
-
- protected String toString(Object value) {
- if (value == null) {
- return null;
- }
- return value.toString();
- }
-
- protected <T> TabularType createTabularType(Class<T> type, OpenType openType) throws OpenDataException {
- String typeName = "java.util.Map<java.lang.String, " + type.getName() + ">";
- String[] keyValue = new String[]{"key", "value"};
- OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType};
- CompositeType rowType = new CompositeType(typeName, typeName, keyValue, keyValue, openTypes);
- return new TabularType(typeName, typeName, rowType, new String[]{"key"});
- }
- }
-
- static class ByteMessageOpenTypeFactory extends MessageOpenTypeFactory {
-
- @Override
- protected String getTypeName() {
- return "BytesMessage";
- }
-
- @Override
- protected void init() throws OpenDataException {
- super.init();
- addItem(JMSCompositeDataConstants.BODY_LENGTH, "Body length", SimpleType.LONG);
- addItem(JMSCompositeDataConstants.BODY_PREVIEW, "Body preview", new ArrayType(SimpleType.BYTE, true));
- }
-
- @Override
- public Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException {
- Map<String, Object> rc = super.getFields(data);
- ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer((byte[]) data.get("body"));
- long length = 0;
- length = buffer.readableBytes();
- rc.put(JMSCompositeDataConstants.BODY_LENGTH, Long.valueOf(length));
- byte[] preview = new byte[(int) Math.min(length, 255)];
- buffer.readBytes(preview);
- rc.put(JMSCompositeDataConstants.BODY_PREVIEW, preview);
- return rc;
- }
- }
-
- static class MapMessageOpenTypeFactory extends MessageOpenTypeFactory {
-
- @Override
- protected String getTypeName() {
- return "MapMessage";
- }
-
- @Override
- protected void init() throws OpenDataException {
- super.init();
- addItem(JMSCompositeDataConstants.CONTENT_MAP, "Content map", SimpleType.STRING);
- }
-
- @Override
- public Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException {
- Map<String, Object> rc = super.getFields(data);
- ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer((byte[]) data.get("body"));
- TypedProperties properties = new TypedProperties();
- MapMessageUtil.readBodyMap(buffer, properties);
- rc.put(JMSCompositeDataConstants.CONTENT_MAP, "" + properties.getMap());
- return rc;
- }
- }
-
- static class ObjectMessageOpenTypeFactory extends MessageOpenTypeFactory {
-
- @Override
- protected String getTypeName() {
- return "ObjectMessage";
- }
- }
-
- static class StreamMessageOpenTypeFactory extends MessageOpenTypeFactory {
-
- @Override
- protected String getTypeName() {
- return "StreamMessage";
- }
- }
-
- static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory {
-
- @Override
- protected String getTypeName() {
- return "TextMessage";
- }
-
- @Override
- protected void init() throws OpenDataException {
- super.init();
- addItem(JMSCompositeDataConstants.MESSAGE_TEXT, JMSCompositeDataConstants.MESSAGE_TEXT, SimpleType.STRING);
- }
-
- @Override
- public Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException {
- Map<String, Object> rc = super.getFields(data);
- ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer((byte[]) data.get("body"));
- SimpleString value = buffer.readNullableSimpleString();
- rc.put(JMSCompositeDataConstants.MESSAGE_TEXT, value != null ? value.toString() : "");
- return rc;
- }
-
- }
-
- static {
- OPEN_TYPE_FACTORIES.put(Message.DEFAULT_TYPE, new MessageOpenTypeFactory());
- OPEN_TYPE_FACTORIES.put(Message.TEXT_TYPE, new TextMessageOpenTypeFactory());
- OPEN_TYPE_FACTORIES.put(Message.BYTES_TYPE, new ByteMessageOpenTypeFactory());
- OPEN_TYPE_FACTORIES.put(Message.MAP_TYPE, new MapMessageOpenTypeFactory());
- OPEN_TYPE_FACTORIES.put(Message.OBJECT_TYPE, new ObjectMessageOpenTypeFactory());
- OPEN_TYPE_FACTORIES.put(Message.STREAM_TYPE, new StreamMessageOpenTypeFactory());
- }
-
- private JMSOpenTypeSupport() {
- }
-
- public static OpenTypeFactory getFactory(Byte type) throws OpenDataException {
- return OPEN_TYPE_FACTORIES.get(type);
- }
-
- public static CompositeData convert(CompositeDataSupport data) throws OpenDataException {
- OpenTypeFactory f = getFactory((Byte) data.get("type"));
- if (f == null) {
- throw new OpenDataException("Cannot create a CompositeData for type: " + data.get("type"));
- }
- CompositeType ct = f.getCompositeType();
- Map<String, Object> fields = f.getFields(data);
- return new CompositeDataSupport(ct, fields);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
index f10962e..e879dbf 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
@@ -86,9 +86,7 @@ import org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration;
import org.apache.activemq.artemis.jms.server.config.TopicConfiguration;
import org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.apache.activemq.artemis.jms.server.config.impl.FileJMSConfiguration;
-import org.apache.activemq.artemis.jms.server.management.JMSManagementService;
import org.apache.activemq.artemis.jms.server.management.JMSNotificationType;
-import org.apache.activemq.artemis.jms.server.management.impl.JMSManagementServiceImpl;
import org.apache.activemq.artemis.jms.transaction.JMSTransactionDetail;
import org.apache.activemq.artemis.spi.core.naming.BindingRegistry;
import org.apache.activemq.artemis.utils.JsonLoader;
@@ -133,8 +131,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
private final ActiveMQServer server;
- private JMSManagementService jmsManagementService;
-
private boolean startCalled;
private boolean active;
@@ -191,10 +187,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
try {
- jmsManagementService = new JMSManagementServiceImpl(server.getManagementService(), server, this);
-
- jmsManagementService.registerJMSServer(this);
-
// Must be set to active before calling initJournal
active = true;
@@ -249,15 +241,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
topicBindings.clear();
topics.clear();
- // it could be null if a backup
- if (jmsManagementService != null) {
- jmsManagementService.unregisterJMSServer();
-
- jmsManagementService.stop();
- }
-
- jmsManagementService = null;
-
active = false;
}
} catch (Exception e) {
@@ -388,7 +371,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
// server.setJMSQueueCreator(new JMSDestinationCreator());
//
// server.setJMSQueueDeleter(new JMSQueueDeleter());
-
server.registerActivateCallback(this);
// server.registerPostQueueCreationCallback(new JMSPostQueueCreationCallback());
@@ -800,8 +782,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
queues.remove(name);
queueBindings.remove(name);
- jmsManagementService.unregisterQueue(name);
-
storage.deleteDestination(PersistedType.Queue, name);
sendNotification(JMSNotificationType.QUEUE_DESTROYED, name);
@@ -840,8 +820,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
topics.remove(name);
topicBindings.remove(name);
- jmsManagementService.unregisterTopic(name);
-
storage.deleteDestination(PersistedType.Topic, name);
sendNotification(JMSNotificationType.TOPIC_DESTROYED, name);
@@ -1097,8 +1075,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
this.recoverregistryBindings(queueName, PersistedType.Queue);
- jmsManagementService.registerQueue(activeMQQueue, queue);
-
return true;
}
}
@@ -1133,8 +1109,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
this.recoverregistryBindings(topicName, PersistedType.Topic);
- jmsManagementService.registerTopic(activeMQTopic);
-
return true;
}
}
@@ -1154,8 +1128,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
connectionFactories.put(cfConfig.getName(), cf);
- jmsManagementService.registerConnectionFactory(cfConfig.getName(), cfConfig, cf);
-
return cf;
}
@@ -1281,8 +1253,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
connectionFactoryBindings.remove(name);
connectionFactories.remove(name);
- jmsManagementService.unregisterConnectionFactory(name);
-
return true;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/JMSManagementService.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/JMSManagementService.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/JMSManagementService.java
deleted file mode 100644
index ff6c240..0000000
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/JMSManagementService.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jms.server.management;
-
-import org.apache.activemq.artemis.api.jms.management.JMSServerControl;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
-import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
-import org.apache.activemq.artemis.jms.server.JMSServerManager;
-import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration;
-
-public interface JMSManagementService {
-
- JMSServerControl registerJMSServer(JMSServerManager server) throws Exception;
-
- void unregisterJMSServer() throws Exception;
-
- void registerQueue(ActiveMQQueue queue, Queue serverQueue) throws Exception;
-
- void unregisterQueue(String name) throws Exception;
-
- void registerTopic(ActiveMQTopic topic) throws Exception;
-
- void unregisterTopic(String name) throws Exception;
-
- void registerConnectionFactory(String name,
- ConnectionFactoryConfiguration config,
- ActiveMQConnectionFactory connectionFactory) throws Exception;
-
- void unregisterConnectionFactory(String name) throws Exception;
-
- void stop() throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java
deleted file mode 100644
index 2b3f7a2..0000000
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jms.server.management.impl;
-
-import javax.management.ObjectName;
-
-import org.apache.activemq.artemis.api.core.management.AddressControl;
-import org.apache.activemq.artemis.api.core.management.QueueControl;
-import org.apache.activemq.artemis.api.core.management.ResourceNames;
-import org.apache.activemq.artemis.api.jms.management.ConnectionFactoryControl;
-import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;
-import org.apache.activemq.artemis.api.jms.management.JMSServerControl;
-import org.apache.activemq.artemis.api.jms.management.TopicControl;
-import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
-import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
-import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
-import org.apache.activemq.artemis.jms.management.impl.JMSConnectionFactoryControlImpl;
-import org.apache.activemq.artemis.jms.management.impl.JMSQueueControlImpl;
-import org.apache.activemq.artemis.jms.management.impl.JMSServerControlImpl;
-import org.apache.activemq.artemis.jms.management.impl.JMSTopicControlImpl;
-import org.apache.activemq.artemis.jms.server.JMSServerManager;
-import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration;
-import org.apache.activemq.artemis.jms.server.management.JMSManagementService;
-
-public class JMSManagementServiceImpl implements JMSManagementService {
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final ManagementService managementService;
-
- private final JMSServerManager jmsServerManager;
-
- // Static --------------------------------------------------------
-
- public JMSManagementServiceImpl(final ManagementService managementService,
- final ActiveMQServer server,
- final JMSServerManager jmsServerManager) {
- this.managementService = managementService;
- this.jmsServerManager = jmsServerManager;
- }
-
- // Public --------------------------------------------------------
-
- // JMSManagementRegistration implementation ----------------------
-
- @Override
- public synchronized JMSServerControl registerJMSServer(final JMSServerManager server) throws Exception {
- ObjectName objectName = managementService.getObjectNameBuilder().getJMSServerObjectName();
- JMSServerControlImpl control = new JMSServerControlImpl(server);
- managementService.registerInJMX(objectName, control);
- managementService.registerInRegistry(ResourceNames.JMS_SERVER, control);
- return control;
- }
-
- @Override
- public synchronized void unregisterJMSServer() throws Exception {
- ObjectName objectName = managementService.getObjectNameBuilder().getJMSServerObjectName();
- managementService.unregisterFromJMX(objectName);
- managementService.unregisterFromRegistry(ResourceNames.JMS_SERVER);
- }
-
- @Override
- public synchronized void registerQueue(final ActiveMQQueue queue, final Queue serverQueue) throws Exception {
- QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.CORE_QUEUE + queue.getAddress());
- MessageCounterManager messageCounterManager = managementService.getMessageCounterManager();
- MessageCounter counter = new MessageCounter(queue.getName(), null, serverQueue, false, coreQueueControl.isDurable(), messageCounterManager.getMaxDayCount());
- messageCounterManager.registerMessageCounter(queue.getName(), counter);
- ObjectName objectName = managementService.getObjectNameBuilder().getJMSQueueObjectName(queue.getQueueName());
- JMSQueueControlImpl control = new JMSQueueControlImpl(queue, coreQueueControl, jmsServerManager, counter);
- managementService.registerInJMX(objectName, control);
- managementService.registerInRegistry(queue.getQueueName(), control);
- }
-
- @Override
- public synchronized void unregisterQueue(final String name) throws Exception {
- ObjectName objectName = managementService.getObjectNameBuilder().getJMSQueueObjectName(name);
- managementService.unregisterFromJMX(objectName);
- managementService.unregisterFromRegistry(name);
- }
-
- @Override
- public synchronized void registerTopic(final ActiveMQTopic topic) throws Exception {
- ObjectName objectName = managementService.getObjectNameBuilder().getJMSTopicObjectName(topic.getTopicName());
- AddressControl addressControl = (AddressControl) managementService.getResource(ResourceNames.CORE_ADDRESS + topic.getAddress());
- JMSTopicControlImpl control = new JMSTopicControlImpl(topic, jmsServerManager, addressControl, managementService);
- managementService.registerInJMX(objectName, control);
- managementService.registerInRegistry(topic.getTopicName(), control);
- }
-
- @Override
- public synchronized void unregisterTopic(final String name) throws Exception {
- ObjectName objectName = managementService.getObjectNameBuilder().getJMSTopicObjectName(name);
- managementService.unregisterFromJMX(objectName);
- managementService.unregisterFromRegistry(name);
- }
-
- @Override
- public synchronized void registerConnectionFactory(final String name,
- final ConnectionFactoryConfiguration cfConfig,
- final ActiveMQConnectionFactory connectionFactory) throws Exception {
- ObjectName objectName = managementService.getObjectNameBuilder().getConnectionFactoryObjectName(name);
- JMSConnectionFactoryControlImpl control = new JMSConnectionFactoryControlImpl(cfConfig, connectionFactory, jmsServerManager, name);
- managementService.registerInJMX(objectName, control);
- managementService.registerInRegistry(ResourceNames.JMS_CONNECTION_FACTORY + name, control);
- }
-
- @Override
- public synchronized void unregisterConnectionFactory(final String name) throws Exception {
- ObjectName objectName = managementService.getObjectNameBuilder().getConnectionFactoryObjectName(name);
- managementService.unregisterFromJMX(objectName);
- managementService.unregisterFromRegistry(ResourceNames.JMS_CONNECTION_FACTORY + name);
- }
-
- @Override
- public void stop() throws Exception {
- for (Object resource : managementService.getResources(ConnectionFactoryControl.class)) {
- unregisterConnectionFactory(((ConnectionFactoryControl) resource).getName());
- }
- for (Object resource : managementService.getResources(JMSQueueControl.class)) {
- unregisterQueue(((JMSQueueControl) resource).getName());
- }
- for (Object resource : managementService.getResources(TopicControl.class)) {
- unregisterTopic(((TopicControl) resource).getName());
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 9140fe4..fa26c4d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.server.cluster.ha.LiveOnlyPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
@@ -552,6 +553,18 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
@Override
+ public void createAddress(String name, int routingType, boolean defaultDeleteOnNoConsumers, int defaultMaxConsumers) throws Exception {
+ checkStarted();
+
+ clearIO();
+ try {
+ server.createOrUpdateAddressInfo(new AddressInfo(new SimpleString(name), AddressInfo.RoutingType.getType((byte)routingType), defaultDeleteOnNoConsumers, defaultMaxConsumers));
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ @Override
public void deployQueue(final String address, final String name, final String filterString) throws Exception {
checkStarted();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index bc07973..c627e7e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -21,6 +21,7 @@ import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -34,7 +35,13 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.security.SecurityStore;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.JsonLoader;
public class AddressControlImpl extends AbstractControl implements AddressControl {
@@ -43,7 +50,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
// Attributes ----------------------------------------------------
- private final SimpleString address;
+ private AddressInfo addressInfo;
private final PostOffice postOffice;
@@ -51,20 +58,24 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
private final HierarchicalRepository<Set<Role>> securityRepository;
+ private final SecurityStore securityStore;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public AddressControlImpl(final SimpleString address,
+ public AddressControlImpl(AddressInfo addressInfo,
final PostOffice postOffice,
final PagingManager pagingManager,
final StorageManager storageManager,
- final HierarchicalRepository<Set<Role>> securityRepository) throws Exception {
+ final HierarchicalRepository<Set<Role>> securityRepository,
+ final SecurityStore securityStore)throws Exception {
super(AddressControl.class, storageManager);
- this.address = address;
+ this.addressInfo = addressInfo;
this.postOffice = postOffice;
this.pagingManager = pagingManager;
this.securityRepository = securityRepository;
+ this.securityStore = securityStore;
}
// Public --------------------------------------------------------
@@ -73,14 +84,19 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
@Override
public String getAddress() {
- return address.toString();
+ return addressInfo.getName().toString();
+ }
+
+ @Override
+ public String getRoutingType() {
+ return addressInfo.getRoutingType().toString();
}
@Override
public String[] getQueueNames() throws Exception {
clearIO();
try {
- Bindings bindings = postOffice.getBindingsForAddress(address);
+ Bindings bindings = postOffice.getBindingsForAddress(addressInfo.getName());
List<String> queueNames = new ArrayList<>();
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
@@ -99,7 +115,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public String[] getBindingNames() throws Exception {
clearIO();
try {
- Bindings bindings = postOffice.getBindingsForAddress(address);
+ Bindings bindings = postOffice.getBindingsForAddress(addressInfo.getName());
String[] bindingNames = new String[bindings.getBindings().size()];
int i = 0;
for (Binding binding : bindings.getBindings()) {
@@ -117,7 +133,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public Object[] getRoles() throws Exception {
clearIO();
try {
- Set<Role> roles = securityRepository.getMatch(address.toString());
+ Set<Role> roles = securityRepository.getMatch(addressInfo.getName().toString());
Object[] objRoles = new Object[roles.size()];
@@ -136,7 +152,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
clearIO();
try {
JsonArrayBuilder json = JsonLoader.createArrayBuilder();
- Set<Role> roles = securityRepository.getMatch(address.toString());
+ Set<Role> roles = securityRepository.getMatch(addressInfo.getName().toString());
for (Role role : roles) {
json.add(role.toJson());
@@ -151,7 +167,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public long getNumberOfBytesPerPage() throws Exception {
clearIO();
try {
- return pagingManager.getPageStore(address).getPageSizeBytes();
+ return pagingManager.getPageStore(addressInfo.getName()).getPageSizeBytes();
} finally {
blockOnIO();
}
@@ -161,7 +177,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public long getAddressSize() throws Exception {
clearIO();
try {
- return pagingManager.getPageStore(address).getAddressSize();
+ return pagingManager.getPageStore(addressInfo.getName()).getAddressSize();
} finally {
blockOnIO();
}
@@ -172,7 +188,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
clearIO();
long totalMsgs = 0;
try {
- Bindings bindings = postOffice.getBindingsForAddress(address);
+ Bindings bindings = postOffice.getBindingsForAddress(addressInfo.getName());
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
totalMsgs += ((QueueBinding) binding).getQueue().getMessageCount();
@@ -190,7 +206,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public boolean isPaging() throws Exception {
clearIO();
try {
- return pagingManager.getPageStore(address).isPaging();
+ return pagingManager.getPageStore(addressInfo.getName()).isPaging();
} finally {
blockOnIO();
}
@@ -200,18 +216,77 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public int getNumberOfPages() throws Exception {
clearIO();
try {
- PagingStore pageStore = pagingManager.getPageStore(address);
+ PagingStore pageStore = pagingManager.getPageStore(addressInfo.getName());
if (!pageStore.isPaging()) {
return 0;
} else {
- return pagingManager.getPageStore(address).getNumberOfPages();
+ return pagingManager.getPageStore(addressInfo.getName()).getNumberOfPages();
}
} finally {
blockOnIO();
}
}
+ /* @Override
+ public String sendTextMessage(Map<String, String> headers,
+ String body,
+ String user,
+ String password) throws Exception {
+ boolean durable = false;
+ if (headers.containsKey("JMSDeliveryMode")) {
+ String jmsDeliveryMode = headers.remove("JMSDeliveryMode");
+ if (jmsDeliveryMode != null && (jmsDeliveryMode.equals("2") || jmsDeliveryMode.equalsIgnoreCase("PERSISTENT"))) {
+ durable = true;
+ }
+ }
+ String userID = UUIDGenerator.getInstance().generateStringUUID();
+ ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(56);
+ buffer.writeNullableSimpleString(new SimpleString(body));
+ byte[] bytes = new byte[buffer.readableBytes()];
+ buffer.readBytes(bytes);
+ coreQueueControl.sendMessage(headers, Message.TEXT_TYPE, Base64.encodeBytes(bytes), userID, durable, user, password);
+ return userID;
+ }*/
+
+ @Override
+ public String sendMessage(final Map<String, String> headers,
+ final int type,
+ final String body,
+ boolean durable,
+ final String user,
+ final String password) throws Exception {
+ securityStore.check(addressInfo.getName(), CheckType.SEND, new SecurityAuth() {
+ @Override
+ public String getUsername() {
+ return user;
+ }
+
+ @Override
+ public String getPassword() {
+ return password;
+ }
+
+ @Override
+ public RemotingConnection getRemotingConnection() {
+ return null;
+ }
+ });
+ ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50);
+ for (String header : headers.keySet()) {
+ message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
+ }
+ message.setType((byte) type);
+ message.setDurable(durable);
+ message.setTimestamp(System.currentTimeMillis());
+ if (body != null) {
+ message.getBodyBuffer().writeBytes(Base64.decode(body));
+ }
+ message.setAddress(addressInfo.getName());
+ postOffice.route(message, null, true);
+ return "" + message.getMessageID();
+ }
+
@Override
protected MBeanOperationInfo[] fillMBeanOperationInfo() {
return MBeanInfoHelper.getMBeanOperationsInfo(AddressControl.class);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 85bad25..7a1bb26 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -22,6 +22,7 @@ import javax.json.JsonObjectBuilder;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import javax.management.openmbean.CompositeData;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
@@ -38,6 +39,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport;
+import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -59,7 +61,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.LinkedListIterator;
-import org.apache.activemq.artemis.utils.UUID;
public class QueueControlImpl extends AbstractControl implements QueueControl {
@@ -694,7 +695,6 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
public String sendMessage(final Map<String, String> headers,
final int type,
final String body,
- final String userID,
boolean durable,
final String user,
final String password) throws Exception {
@@ -721,11 +721,13 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
message.setType((byte) type);
message.setDurable(durable);
message.setTimestamp(System.currentTimeMillis());
- message.setUserID(new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(userID)));
if (body != null) {
message.getBodyBuffer().writeBytes(Base64.decode(body));
}
message.setAddress(queue.getAddress());
+ ByteBuffer buffer = ByteBuffer.allocate(8);
+ buffer.putLong(queue.getID());
+ message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
postOffice.route(message, null, true);
return "" + message.getMessageID();
}
@@ -885,6 +887,10 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
@Override
+ public CompositeData[] browse() throws Exception {
+ return browse(null);
+ }
+ @Override
public CompositeData[] browse(String filterStr) throws Exception {
checkStarted();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 4c51373..1b6dc42 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -422,7 +422,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public AddressInfo addAddressInfo(AddressInfo addressInfo) {
try {
- getServer().getManagementService().registerAddress(addressInfo.getName());
+ managementService.registerAddress(addressInfo);
} catch (Exception e) {
e.printStackTrace();
}
@@ -432,7 +432,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
try {
- getServer().getManagementService().registerAddress(addressInfo.getName());
+ managementService.registerAddress(addressInfo);
} catch (Exception e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index d62598e..58d8ff2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2424,9 +2424,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw e;
}
- if (!addressAlreadyExists) {
- managementService.registerAddress(queue.getAddress());
- }
managementService.registerQueue(queue, queue.getAddress(), storageManager);
callPostQueueCreationCallbacks(queue.getName());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 488c4b2..708aeda 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -33,6 +33,13 @@ public class AddressInfo {
this.name = name;
}
+ public AddressInfo(SimpleString name, RoutingType routingType, boolean defaultDeleteOnNoConsumers, int defaultMaxConsumers) {
+ this(name);
+ this.routingType = routingType;
+ this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers;
+ this.defaultMaxQueueConsumers = defaultMaxConsumers;
+ }
+
public RoutingType getRoutingType() {
return routingType;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index dc64ddd..76fc69b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -166,7 +166,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
queues.put(queue.getID(), queue);
postOffice.addBinding(binding);
- managementService.registerAddress(queue.getAddress());
+ //managementService.registerAddress(queue.getAddress());
managementService.registerQueue(queue, queue.getAddress(), storageManager);
}
@@ -184,7 +184,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
.setDefaultMaxQueueConsumers(addressBindingInfo.getDefaultMaxConsumers());
postOffice.addAddressInfo(addressInfo);
- managementService.registerAddress(addressInfo.getName());
+ managementService.registerAddress(addressInfo);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
index 5f40c53..7da756c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
@@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -89,7 +90,7 @@ public interface ManagementService extends NotificationService, ActiveMQComponen
void unregisterFromRegistry(final String resourceName);
- void registerAddress(SimpleString address) throws Exception;
+ void registerAddress(AddressInfo addressInfo) throws Exception;
void unregisterAddress(SimpleString address) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index ac1ab1a..6490b0f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -75,6 +75,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
@@ -210,13 +211,13 @@ public class ManagementServiceImpl implements ManagementService {
}
@Override
- public synchronized void registerAddress(final SimpleString address) throws Exception {
- ObjectName objectName = objectNameBuilder.getAddressObjectName(address);
- AddressControlImpl addressControl = new AddressControlImpl(address, postOffice, pagingManager, storageManager, securityRepository);
+ public void registerAddress(AddressInfo addressInfo) throws Exception {
+ ObjectName objectName = objectNameBuilder.getAddressObjectName(addressInfo.getName());
+ AddressControlImpl addressControl = new AddressControlImpl(addressInfo, postOffice, pagingManager, storageManager, securityRepository, securityStore);
registerInJMX(objectName, addressControl);
- registerInRegistry(ResourceNames.CORE_ADDRESS + address, addressControl);
+ registerInRegistry(ResourceNames.ADDRESS + addressInfo.getName(), addressControl);
if (logger.isDebugEnabled()) {
logger.debug("registered address " + objectName);
@@ -230,7 +231,6 @@ public class ManagementServiceImpl implements ManagementService {
unregisterFromJMX(objectName);
unregisterFromRegistry(ResourceNames.CORE_ADDRESS + address);
}
-
@Override
public synchronized void registerQueue(final Queue queue,
final SimpleString address,
@@ -260,7 +260,7 @@ public class ManagementServiceImpl implements ManagementService {
@Override
public synchronized void registerDivert(final Divert divert, final DivertConfiguration config) throws Exception {
- ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName().toString());
+ ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName().toString(), config.getAddress());
DivertControl divertControl = new DivertControlImpl(divert, storageManager, config);
registerInJMX(objectName, divertControl);
registerInRegistry(ResourceNames.CORE_DIVERT + config.getName(), divertControl);
@@ -272,7 +272,7 @@ public class ManagementServiceImpl implements ManagementService {
@Override
public synchronized void unregisterDivert(final SimpleString name) throws Exception {
- ObjectName objectName = objectNameBuilder.getDivertObjectName(name.toString());
+ ObjectName objectName = objectNameBuilder.getDivertObjectName(name.toString(), null);
unregisterFromJMX(objectName);
unregisterFromRegistry(ResourceNames.CORE_DIVERT + name);
}
@@ -470,7 +470,6 @@ public class ManagementServiceImpl implements ManagementService {
@Override
public synchronized void unregisterFromRegistry(final String resourceName) {
- ActiveMQServerLogger.LOGGER.info("Unregistering: " + resourceName, new Exception());
registry.remove(resourceName);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
index 1211dee..e79a3c7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
@@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
@@ -245,7 +246,7 @@ public class ClusteredResetMockTest extends ActiveMQTestBase {
}
@Override
- public void registerAddress(SimpleString address) throws Exception {
+ public void registerAddress(AddressInfo addressInfo) throws Exception {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/BridgeTestBase.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/BridgeTestBase.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/BridgeTestBase.java
index a89edb8..e9815f7 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/BridgeTestBase.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/BridgeTestBase.java
@@ -38,10 +38,10 @@ import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
import com.arjuna.ats.arjuna.coordinator.TxControl;
import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;
-import org.apache.activemq.artemis.api.jms.management.TopicControl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
@@ -504,8 +504,8 @@ public abstract class BridgeTestBase extends ActiveMQTestBase {
if (index == 1) {
managementService = server1.getManagementService();
}
- TopicControl topicControl = (TopicControl) managementService.getResource(topic.getTopicName());
- Assert.assertEquals(0, topicControl.getSubscriptionCount());
+ AddressControl topicControl = (AddressControl) managementService.getResource("address." + topic.getTopicName());
+ Assert.assertEquals(0, topicControl.getQueueNames().length);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java
index 0bc3e28..1a0a997 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java
@@ -35,7 +35,6 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
-import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3858b1cf/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index 3d0f00c..bac9671 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -594,12 +594,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
- checkNoResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name));
+ checkNoResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name, address));
assertEquals(0, serverControl.getDivertNames().length);
serverControl.createDivert(name.toString(), null, address, forwardingAddress, true, null, null);
- checkResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name));
+ checkResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name, address));
}
@Test
@@ -611,13 +611,13 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
- checkNoResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name));
+ checkNoResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name, address));
assertEquals(0, serverControl.getDivertNames().length);
serverControl.createDivert(name.toString(), routingName, address, forwardingAddress, true, null, null);
- checkResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name));
- DivertControl divertControl = ManagementControlHelper.createDivertControl(name.toString(), mbeanServer);
+ checkResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name, address));
+ DivertControl divertControl = ManagementControlHelper.createDivertControl(name.toString(), address, mbeanServer);
assertEquals(name.toString(), divertControl.getUniqueName());
assertEquals(address, divertControl.getAddress());
assertEquals(forwardingAddress, divertControl.getForwardingAddress());
@@ -658,7 +658,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
serverControl.destroyDivert(name.toString());
- checkNoResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name));
+ checkNoResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name, address));
assertEquals(0, serverControl.getDivertNames().length);
// check that a message is no longer diverted