You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2019/07/31 18:09:48 UTC

[activemq] branch activemq-5.15.x updated (bdf2afc -> e652e18)

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a change to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git.


    from bdf2afc  AMQ-7247 - Update maven plugin API + Pax URL versions
     new 509c781  AMQ-3405 - rework fix to use original destination strategy which may be different from the default shared strategy. new test
     new 70b2a43  AMQ-7094 - track the objectName with an annotated mbean such that the jmx audit log event can extract that target of an mbean operation, fix and test
     new 9efb1fe  AMQ-7102 - don't track objectNames that have not been registered due to suppressMBean filter, fix and test
     new e652e18  AMQ-7102 - Improve ManagedRegionBroker performance by replacing registeredMBeans CopyOnWriteSet with a Concurrent hashmap backed set

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/activemq/broker/jmx/AnnotatedMBean.java |  26 ++-
 .../activemq/broker/jmx/AsyncAnnotatedMBean.java   |  18 +-
 .../activemq/broker/jmx/ManagedRegionBroker.java   |  36 ++--
 .../org/apache/activemq/broker/region/Queue.java   |   9 +-
 .../activemq/broker/util/JMXAuditLogEntry.java     |  13 +-
 .../broker/jmx/SelectiveMBeanRegistrationTest.java |  22 +++
 .../java/org/apache/activemq/jmx/DLQRetryTest.java | 203 +++++++++++++++++++++
 .../org/apache/activemq/jmx/JmxAuditLogTest.java   |  75 ++++++++
 .../org/apache/activemq/jmx/JmxCreateNCTest.java   |   3 +
 9 files changed, 377 insertions(+), 28 deletions(-)
 create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/jmx/DLQRetryTest.java


[activemq] 01/04: AMQ-3405 - rework fix to use original destination strategy which may be different from the default shared strategy. new test

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit 509c781669fa681eded9d11681e1cb92e5f5d553
Author: gtully <ga...@gmail.com>
AuthorDate: Thu Jun 7 11:38:42 2018 +0100

    AMQ-3405 - rework fix to use original destination strategy which may be different from the default shared strategy. new test
    
    (cherry picked from commit 1ebf1862795286505dc2ce2d36e91008029061a3)
---
 .../org/apache/activemq/broker/region/Queue.java   |   9 +-
 .../java/org/apache/activemq/jmx/DLQRetryTest.java | 202 +++++++++++++++++++++
 2 files changed, 209 insertions(+), 2 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 71bf5bb..d668c14 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1478,8 +1478,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             try {
                 messages.rollback(m.getMessageId());
                 if (isDLQ()) {
-                    DeadLetterStrategy strategy = getDeadLetterStrategy();
-                    strategy.rollback(m.getMessage());
+                    ActiveMQDestination originalDestination = m.getMessage().getOriginalDestination();
+                    if (originalDestination != null) {
+                        for (Destination destination : regionBroker.getDestinations(originalDestination)) {
+                            DeadLetterStrategy strategy = destination.getDeadLetterStrategy();
+                            strategy.rollback(m.getMessage());
+                        }
+                    }
                 }
             } finally {
                 messagesLock.writeLock().unlock();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/DLQRetryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/DLQRetryTest.java
new file mode 100644
index 0000000..e2875fb
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/DLQRetryTest.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jmx;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DLQRetryTest extends EmbeddedBrokerTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DLQRetryTest.class);
+
+    protected MBeanServer mbeanServer;
+    protected String domain = "org.apache.activemq";
+    protected String bindAddress;
+
+    protected Connection connection;
+
+    public void testDefaultDLQ() throws Exception {
+
+        // broker uses DLQ defined for this destination
+        String destinationName = "retry.test.default";
+
+        String objectNameStr = broker.getBrokerObjectName().toString();
+        objectNameStr += ",destinationType=Queue,destinationName=ActiveMQ.DLQ";
+
+        invokeRetryDLQ(destinationName, objectNameStr);
+    }
+
+
+    public void testIndividualDLQ() throws Exception {
+
+        // broker has an individual DLQ defined for this destination
+        String destinationName = "retry.test.individual";
+
+        String objectNameStr = broker.getBrokerObjectName().toString();
+        objectNameStr += ",destinationType=Queue,destinationName=DLQ." + destinationName;
+
+        invokeRetryDLQ(destinationName, objectNameStr);
+
+    }
+
+
+    private void invokeRetryDLQ(String destinationName, String mbeanName) throws Exception {
+        // Send some messages
+        connection = connectionFactory.createConnection();
+        try {
+
+            connection.start();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(new ActiveMQQueue(destinationName));
+            Message message = session.createTextMessage("Message testing default DLQ");
+            producer.send(message);
+            producer.close();
+
+            //create a consumer to rollback
+            String mesageID = consumeRollbackMessage(destinationName);
+
+
+            ObjectName queueViewMBeanName = assertRegisteredObjectName(mbeanName);
+            final QueueViewMBean DLQProxy = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+            assertEquals("Check message is on DLQ", 1, DLQProxy.getQueueSize());
+
+            boolean moveSuccess = DLQProxy.retryMessage(mesageID);
+            assertEquals("moveSuccess", true, moveSuccess);
+
+            assertEquals("Check message is off DLQ (after retry invoked)", 0, DLQProxy.getQueueSize());
+
+            // do rollbacks again, so it gets placed in the DLQ again
+            String mesageID_secondAttempt = consumeRollbackMessage(destinationName);
+
+            assertEquals("Ensure messageID is the same for first and second attempt", mesageID, mesageID_secondAttempt);
+
+            // check the DLQ as the message
+            assertEquals("Check message is on DLQ for second time", 1, DLQProxy.getQueueSize());
+
+        } finally {
+
+            connection.close();
+
+        }
+    }
+
+    private String consumeRollbackMessage(String destination) throws JMSException {
+        Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer messageConsumer = consumerSession.createConsumer(new ActiveMQQueue(destination));
+
+        Message message = null;
+        String messageID = null;
+        do {
+            message = messageConsumer.receive(3000);
+            if (message != null) {
+                LOG.info("rolling back " + message.getJMSMessageID());
+                messageID = message.getJMSMessageID();
+                consumerSession.rollback();
+            }
+
+        } while (message != null);
+
+        messageConsumer.close();
+        return messageID;
+    }
+
+    protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException {
+        ObjectName objectName = new ObjectName(name);
+        if (mbeanServer.isRegistered(objectName)) {
+            LOG.info("Bean Registered: " + objectName);
+        } else {
+            fail("Could not find MBean!: " + objectName);
+        }
+        return objectName;
+    }
+
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://localhost:0";
+        useTopic = false;
+        super.setUp();
+        mbeanServer = broker.getManagementContext().getMBeanServer();
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setUseJmx(true);
+        answer.setEnableStatistics(true);
+        answer.addConnector(bindAddress);
+
+        PolicyMap policyMap = new PolicyMap();
+        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
+        PolicyEntry pe = new PolicyEntry();
+
+        IndividualDeadLetterStrategy individualDeadLetterStrategy = new IndividualDeadLetterStrategy();
+        individualDeadLetterStrategy.setQueuePrefix("DLQ.");
+        pe.setDeadLetterStrategy(individualDeadLetterStrategy);
+
+        pe.setQueue("retry.test.individual");
+        entries.add(pe);
+
+        policyMap.setPolicyEntries(entries);
+        answer.setDestinationPolicy(policyMap);
+
+        ((KahaDBPersistenceAdapter) answer.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
+        answer.deleteAllMessages();
+        return answer;
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
+        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+        redeliveryPolicy.setMaximumRedeliveries(1);
+        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
+        return activeMQConnectionFactory;
+    }
+
+}
\ No newline at end of file


[activemq] 03/04: AMQ-7102 - don't track objectNames that have not been registered due to suppressMBean filter, fix and test

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit 9efb1fe2a66169f379ecd1a9cbd8d8e7a1e6e611
Author: gtully <ga...@gmail.com>
AuthorDate: Thu Nov 15 17:45:18 2018 +0000

    AMQ-7102 - don't track objectNames that have not been registered due to suppressMBean filter, fix and test
    
    (cherry picked from commit 9cb680c0bad9fb3f92807d0f49e02505c544e3e9)
---
 .../activemq/broker/jmx/AsyncAnnotatedMBean.java   | 10 +++----
 .../activemq/broker/jmx/ManagedRegionBroker.java   | 34 ++++++++++++++--------
 .../broker/jmx/SelectiveMBeanRegistrationTest.java | 22 ++++++++++++++
 3 files changed, 49 insertions(+), 17 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java
index 7871a21..bc7a826 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.management.MBeanException;
 import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectInstance;
 import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
@@ -52,7 +53,7 @@ public class AsyncAnnotatedMBean extends AnnotatedMBean {
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    public static void registerMBean(ExecutorService executor, long timeout, ManagementContext context, Object object, ObjectName objectName) throws Exception {
+    public static ObjectInstance registerMBean(ExecutorService executor, long timeout, ManagementContext context, Object object, ObjectName objectName) throws Exception {
 
         if (timeout < 0 && executor != null) {
             throw new IllegalArgumentException("async timeout cannot be negative.");
@@ -67,15 +68,14 @@ public class AsyncAnnotatedMBean extends AnnotatedMBean {
         for (Class c : object.getClass().getInterfaces()) {
             if (mbeanName.equals(c.getName())) {
                 if (timeout == 0) {
-                    context.registerMBean(new AnnotatedMBean(object, c, objectName), objectName);
+                    return context.registerMBean(new AnnotatedMBean(object, c, objectName), objectName);
                 } else {
-                    context.registerMBean(new AsyncAnnotatedMBean(executor, timeout, object, c, objectName), objectName);
+                    return context.registerMBean(new AsyncAnnotatedMBean(executor, timeout, object, c, objectName), objectName);
                 }
-                return;
             }
         }
 
-        context.registerMBean(object, objectName);
+        return context.registerMBean(object, objectName);
     }
 
     @Override
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
index fa9584d..3eaf28b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
@@ -324,8 +324,9 @@ public class ManagedRegionBroker extends RegionBroker {
             }
         }
         try {
-            AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
-            registeredMBeans.add(key);
+            if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key) != null) {
+                registeredMBeans.add(key);
+            }
         } catch (Throwable e) {
             LOG.warn("Failed to register MBean {}", key);
             LOG.debug("Failure reason: ", e);
@@ -380,8 +381,9 @@ public class ManagedRegionBroker extends RegionBroker {
         }
 
         try {
-            AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
-            registeredMBeans.add(key);
+            if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key) != null) {
+                registeredMBeans.add(key);
+            }
         } catch (Throwable e) {
             LOG.warn("Failed to register MBean {}", key);
             LOG.debug("Failure reason: ", e);
@@ -444,8 +446,9 @@ public class ManagedRegionBroker extends RegionBroker {
         }
 
         try {
-            AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
-            registeredMBeans.add(key);
+            if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key) != null) {
+                registeredMBeans.add(key);
+            }
         } catch (Throwable e) {
             LOG.warn("Failed to register MBean {}", key);
             LOG.debug("Failure reason: ", e);
@@ -520,8 +523,9 @@ public class ManagedRegionBroker extends RegionBroker {
             SubscriptionView view = new InactiveDurableSubscriptionView(this, brokerService, key.getClientId(), info, subscription);
 
             try {
-                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
-                registeredMBeans.add(objectName);
+                if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName) != null) {
+                    registeredMBeans.add(objectName);
+                }
             } catch (Throwable e) {
                 LOG.warn("Failed to register MBean {}", key);
                 LOG.debug("Failure reason: ", e);
@@ -770,8 +774,9 @@ public class ManagedRegionBroker extends RegionBroker {
                     view = new AbortSlowConsumerStrategyView(this, strategy);
                 }
 
-                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
-                registeredMBeans.add(objectName);
+                if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName) != null) {
+                    registeredMBeans.add(objectName);
+                }
             }
         } catch (Exception e) {
             LOG.warn("Failed to register MBean {}", strategy);
@@ -785,8 +790,9 @@ public class ManagedRegionBroker extends RegionBroker {
             ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
             if (!registeredMBeans.contains(objectName))  {
                 RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
-                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
-                registeredMBeans.add(objectName);
+                if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName) != null) {
+                    registeredMBeans.add(objectName);
+                }
             }
         } catch (Exception e) {
             LOG.warn("Failed to register prepared transaction MBean {}", transaction);
@@ -837,4 +843,8 @@ public class ManagedRegionBroker extends RegionBroker {
         ObjectName objName = BrokerMBeanSupport.createDestinationName(brokerObjectName.toString(), "Queue", queueName);
         return queues.get(objName);
     }
+
+    public Set<ObjectName> getRegisteredMbeans() {
+        return registeredMBeans;
+    }
 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/SelectiveMBeanRegistrationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/SelectiveMBeanRegistrationTest.java
index a1c6aec..b6ea4a0 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/SelectiveMBeanRegistrationTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/SelectiveMBeanRegistrationTest.java
@@ -74,6 +74,11 @@ public class SelectiveMBeanRegistrationTest  {
 
         session.createConsumer(queue);
 
+        // create a plain topic
+        Destination topic = session.createTopic("ATopic");
+        session.createConsumer(topic);
+
+
         final ManagedRegionBroker managedRegionBroker = (ManagedRegionBroker) brokerService.getBroker().getAdaptor(ManagedRegionBroker.class);
 
         // mbean exists
@@ -87,6 +92,10 @@ public class SelectiveMBeanRegistrationTest  {
         // but it is not registered
         assertFalse(mbeanServer.isRegistered(managedRegionBroker.getQueueSubscribers()[0]));
 
+        // and is not tracked
+        assertFalse("not tracked as registered", managedRegionBroker.getRegisteredMbeans().contains(managedRegionBroker.getQueueSubscribers()[0]));
+
+
         // verify dynamicProducer suppressed
         session.createProducer(null);
 
@@ -105,9 +114,22 @@ public class SelectiveMBeanRegistrationTest  {
         Set<ObjectInstance> mbeans = mbeanServer.queryMBeans(query, null);
         assertEquals(0, mbeans.size());
 
+        assertFalse("producer not tracked as registered", managedRegionBroker.getRegisteredMbeans().contains(managedRegionBroker.getDynamicDestinationProducers()[0]));
+
+
         query = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationName=ActiveMQ.Advisory.*,*");
         mbeans = mbeanServer.queryMBeans(query, null);
         assertEquals(0, mbeans.size());
+
+        ObjectName[] topicNames = managedRegionBroker.getTopics();
+        assertTrue("Some topics registered", topicNames.length > 0);
+        for (ObjectName objectName : topicNames) {
+            if (objectName.getKeyProperty("destinationName").contains("Advisory")) {
+                assertFalse("advisory topic not tracked as registered: " + objectName, managedRegionBroker.getRegisteredMbeans().contains(objectName));
+            } else {
+                assertTrue("topic tracked as registered: " + objectName, managedRegionBroker.getRegisteredMbeans().contains(objectName));
+            }
+        }
     }
 
 


[activemq] 04/04: AMQ-7102 - Improve ManagedRegionBroker performance by replacing registeredMBeans CopyOnWriteSet with a Concurrent hashmap backed set

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit e652e18b5de54ecb4686b2a69e0e43067954a0f2
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
AuthorDate: Wed Jul 31 12:55:24 2019 -0400

    AMQ-7102 - Improve ManagedRegionBroker performance by replacing
    registeredMBeans CopyOnWriteSet with a Concurrent hashmap backed set
    
    (cherry picked from commit 09003e8fef27f51b83a5ee4242709c2b20821a83)
---
 .../main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
index 3eaf28b..e856c5d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
@@ -96,7 +96,7 @@ public class ManagedRegionBroker extends RegionBroker {
     private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
     private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
     private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
-    private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
+    private final Set<ObjectName> registeredMBeans = new ConcurrentHashMap<>().newKeySet();
     /* This is the first broker in the broker interceptor chain. */
     private Broker contextBroker;
 


[activemq] 02/04: AMQ-7094 - track the objectName with an annotated mbean such that the jmx audit log event can extract that target of an mbean operation, fix and test

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit 70b2a4318375a573bdbda61303f5cfc19d33ff7d
Author: gtully <ga...@gmail.com>
AuthorDate: Thu Nov 8 12:10:01 2018 +0000

    AMQ-7094 - track the objectName with an annotated mbean such that the jmx audit log event can extract that target of an mbean operation, fix and test
    
    (cherry picked from commit d2b0affedb38c5439bce2fb5a8e321bc5d0ec713)
---
 .../apache/activemq/broker/jmx/AnnotatedMBean.java | 26 +++++++-
 .../activemq/broker/jmx/AsyncAnnotatedMBean.java   | 12 ++--
 .../activemq/broker/util/JMXAuditLogEntry.java     | 13 +++-
 .../java/org/apache/activemq/jmx/DLQRetryTest.java |  1 +
 .../org/apache/activemq/jmx/JmxAuditLogTest.java   | 75 ++++++++++++++++++++++
 .../org/apache/activemq/jmx/JmxCreateNCTest.java   |  3 +
 6 files changed, 120 insertions(+), 10 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AnnotatedMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AnnotatedMBean.java
index c75d8a8..dc772c2 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AnnotatedMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AnnotatedMBean.java
@@ -68,6 +68,8 @@ public class AnnotatedMBean extends StandardMBean {
         }
     }
 
+    private final ObjectName objectName;
+
     private static byte byteFromProperty(String s) {
         byte val = OFF;
         String config = System.getProperty(s, "").toLowerCase(Locale.ENGLISH);
@@ -88,7 +90,7 @@ public class AnnotatedMBean extends StandardMBean {
 
         for (Class c : object.getClass().getInterfaces()) {
             if (mbeanName.equals(c.getName())) {
-                context.registerMBean(new AnnotatedMBean(object, c), objectName);
+                context.registerMBean(new AnnotatedMBean(object, c, objectName), objectName);
                 return;
             }
         }
@@ -97,13 +99,15 @@ public class AnnotatedMBean extends StandardMBean {
     }
 
     /** Instance where the MBean interface is implemented by another object. */
-    public <T> AnnotatedMBean(T impl, Class<T> mbeanInterface) throws NotCompliantMBeanException {
+    public <T> AnnotatedMBean(T impl, Class<T> mbeanInterface, ObjectName objectName) throws NotCompliantMBeanException {
         super(impl, mbeanInterface);
+        this.objectName = objectName;
     }
 
     /** Instance where the MBean interface is implemented by this object. */
-    protected AnnotatedMBean(Class<?> mbeanInterface) throws NotCompliantMBeanException {
+    protected AnnotatedMBean(Class<?> mbeanInterface, ObjectName objectName) throws NotCompliantMBeanException {
         super(mbeanInterface);
+        this.objectName = objectName;
     }
 
     /** {@inheritDoc} */
@@ -212,6 +216,7 @@ public class AnnotatedMBean extends StandardMBean {
             entry = new JMXAuditLogEntry();
             entry.setUser(caller);
             entry.setTimestamp(System.currentTimeMillis());
+            entry.setTarget(extractTargetTypeProperty(objectName));
             entry.setOperation(this.getMBeanInfo().getClassName() + "." + s);
 
             try
@@ -245,6 +250,21 @@ public class AnnotatedMBean extends StandardMBean {
         return result;
     }
 
+    // keep brokerName last b/c objectNames include the brokerName
+    final static String[] targetPropertiesCandidates = new String[] {"destinationName", "networkConnectorName", "connectorName", "connectionName", "brokerName"};
+    private String extractTargetTypeProperty(ObjectName objectName) {
+        String result = null;
+        for (String attr: targetPropertiesCandidates) {
+            try {
+                result = objectName.getKeyProperty(attr);
+                if (result != null) {
+                    break;
+                }
+            } catch (NullPointerException ok) {}
+        }
+        return result;
+    }
+
     private Method getMBeanMethod(Class clazz, String methodName, String[] signature) throws ReflectiveOperationException {
         Class[] parameterTypes = new Class[signature.length];
         for (int i = 0; i < signature.length; i++) {
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java
index 7460e16..7871a21 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java
@@ -36,15 +36,15 @@ public class AsyncAnnotatedMBean extends AnnotatedMBean {
     private ExecutorService executor;
     private long timeout = 0;
 
-    public <T> AsyncAnnotatedMBean(ExecutorService executor, long timeout, T impl, Class<T> mbeanInterface) throws NotCompliantMBeanException {
-        super(impl, mbeanInterface);
+    public <T> AsyncAnnotatedMBean(ExecutorService executor, long timeout, T impl, Class<T> mbeanInterface, ObjectName objectName) throws NotCompliantMBeanException {
+        super(impl, mbeanInterface, objectName);
 
         this.executor = executor;
         this.timeout = timeout;
     }
 
-    protected AsyncAnnotatedMBean(Class<?> mbeanInterface) throws NotCompliantMBeanException {
-        super(mbeanInterface);
+    protected AsyncAnnotatedMBean(Class<?> mbeanInterface, ObjectName objectName) throws NotCompliantMBeanException {
+        super(mbeanInterface, objectName);
     }
 
     protected Object asyncInvole(String s, Object[] objects, String[] strings) throws MBeanException, ReflectionException {
@@ -67,9 +67,9 @@ public class AsyncAnnotatedMBean extends AnnotatedMBean {
         for (Class c : object.getClass().getInterfaces()) {
             if (mbeanName.equals(c.getName())) {
                 if (timeout == 0) {
-                    context.registerMBean(new AnnotatedMBean(object, c), objectName);
+                    context.registerMBean(new AnnotatedMBean(object, c, objectName), objectName);
                 } else {
-                    context.registerMBean(new AsyncAnnotatedMBean(executor, timeout, object, c), objectName);
+                    context.registerMBean(new AsyncAnnotatedMBean(executor, timeout, object, c, objectName), objectName);
                 }
                 return;
             }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/util/JMXAuditLogEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/util/JMXAuditLogEntry.java
index 7e0e0e3..3ced281 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/util/JMXAuditLogEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/util/JMXAuditLogEntry.java
@@ -21,14 +21,25 @@ import java.util.Arrays;
 public class JMXAuditLogEntry extends AuditLogEntry {
     public static final String[] VERBS = new String[] {" called ", " ended "};
     private int state = 0;
+    protected String target;
 
     public void complete() {
         setTimestamp(System.currentTimeMillis());
         state = 1;
     }
 
+    public String getTarget() {
+        return target;
+    }
+
+    public void setTarget(String target) {
+        this.target = target;
+    }
+
     @Override
     public String toString() {
-        return user.trim() + VERBS[state] + operation + Arrays.toString((Object[])parameters.get("arguments")) + " at " + getFormattedTime();
+        return user.trim() + VERBS[state] + operation + Arrays.toString((Object[])parameters.get("arguments"))
+                + (target != null ? " on " + target : "")
+                + " at " + getFormattedTime();
     }
 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/DLQRetryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/DLQRetryTest.java
index e2875fb..e4f3c16 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/DLQRetryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/DLQRetryTest.java
@@ -151,6 +151,7 @@ public class DLQRetryTest extends EmbeddedBrokerTestSupport {
     }
 
     protected void setUp() throws Exception {
+        System.setProperty("org.apache.activemq.audit", "all");
         bindAddress = "tcp://localhost:0";
         useTopic = false;
         super.setUp();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxAuditLogTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxAuditLogTest.java
index e6f1083..5256b96 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxAuditLogTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxAuditLogTest.java
@@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
 import org.apache.activemq.broker.jmx.ManagementContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -62,10 +64,13 @@ public class JmxAuditLogTest extends TestSupport
 
       broker = new BrokerService();
       broker.setUseJmx(true);
+      broker.setDeleteAllMessagesOnStartup(true);
       portToUse = findOpenPort();
       broker.setManagementContext(createManagementContext("broker", portToUse));
       broker.setPopulateUserNameInMBeans(true);
       broker.setDestinations(createDestinations());
+      TransportConnector transportConnector = broker.addConnector("tcp://0.0.0.0:0");
+      transportConnector.setName("TCP");
       broker.start();
    }
 
@@ -154,4 +159,74 @@ public class JmxAuditLogTest extends TestSupport
       assertEquals("got two messages", 2, logCount.get());
 
    }
+
+   @Test
+   public void testNameTargetVisible() throws Exception
+   {
+      Logger log4jLogger = Logger.getLogger("org.apache.activemq.audit");
+      log4jLogger.setLevel(Level.INFO);
+      final AtomicInteger logCount = new AtomicInteger(0);
+      final AtomicBoolean gotEnded = new AtomicBoolean(false);
+      final AtomicBoolean gotQueueName = new AtomicBoolean(false);
+      final AtomicBoolean gotBrokerName = new AtomicBoolean(false);
+      final AtomicBoolean gotConnectorName = new AtomicBoolean(false);
+
+      final String queueName = queue.getQueueName();
+      Appender appender = new DefaultTestAppender()
+      {
+         @Override
+         public void doAppend(LoggingEvent event)
+         {
+            if (event.getMessage() instanceof String)
+            {
+               String message = (String) event.getMessage();
+               System.out.println(message);
+               if (message.contains(VERBS[0])) {
+                  if (message.contains(queueName)) {
+                     gotQueueName.set(true);
+                  }
+                  if (message.contains(broker.getBrokerName())) {
+                     gotBrokerName.set(true);
+                  }
+
+                  if (message.contains("TCP")) {
+                     gotConnectorName.set(true);
+                  }
+               }
+
+               if (message.contains(VERBS[1])) {
+                  gotEnded.set(true);
+               }
+            }
+            logCount.incrementAndGet();
+         }
+      };
+      log4jLogger.addAppender(appender);
+
+      MBeanServerConnection conn = createJMXConnector(portToUse);
+      ObjectName queueObjName = new ObjectName(broker.getBrokerObjectName() + ",destinationType=Queue,destinationName=" + queueName);
+
+      Object[] params = {};
+      String[] signature = {};
+
+      conn.invoke(queueObjName, "purge", params, signature);
+
+      assertTrue("got ended statement", gotEnded.get());
+      assertEquals("got two messages", 2, logCount.get());
+      assertTrue("got queueName in called statement", gotQueueName.get());
+
+      // call broker to verify brokerName
+      conn.invoke(broker.getBrokerObjectName(), "resetStatistics", params, signature);
+      assertEquals("got 4 messages", 4, logCount.get());
+      assertTrue("got brokerName in called statement", gotBrokerName.get());
+
+
+      ObjectName transportConnectorON = BrokerMBeanSupport.createConnectorName(broker.getBrokerObjectName(), "clientConnectors", "TCP");
+      conn.invoke(transportConnectorON, "stop", params, signature);
+      assertEquals("got messages", 6, logCount.get());
+      assertTrue("got connectorName in called statement", gotConnectorName.get());
+
+      log4jLogger.removeAppender(appender);
+
+   }
 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxCreateNCTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxCreateNCTest.java
index e96c596..1d878b4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxCreateNCTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxCreateNCTest.java
@@ -38,6 +38,9 @@ public class JmxCreateNCTest {
 
     @Test
     public void testBridgeRegistration() throws Exception {
+
+        System.setProperty("org.apache.activemq.audit", "all");
+
         BrokerService broker = new BrokerService();
         broker.setBrokerName(BROKER_NAME);
         broker.setUseJmx(true); // explicitly set this so no funny issues