You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/05/18 13:44:25 UTC

activemq git commit: AMQ-6707 - fix destination filter delegate param, refactor-auto-gen method; jees

Repository: activemq
Updated Branches:
  refs/heads/master 2eff835ee -> 01384c714


AMQ-6707 - fix destination filter delegate param, refactor-auto-gen method; jees


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/01384c71
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/01384c71
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/01384c71

Branch: refs/heads/master
Commit: 01384c714dbe0405d876b93849e6fff5ec048bff
Parents: 2eff835
Author: gtully <ga...@gmail.com>
Authored: Fri May 18 14:44:05 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri May 18 14:44:05 2018 +0100

----------------------------------------------------------------------
 .../broker/region/DestinationFilter.java        |  2 +-
 .../activemq/store/jdbc/XACompletionTest.java   | 62 +++++++++++++++-----
 2 files changed, 48 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/01384c71/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
index dad8501..1897c23 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
@@ -380,7 +380,7 @@ public class DestinationFilter implements Destination {
 
     @Override
     public void clearPendingMessages(int pendingAdditionsCount) {
-        next.clearPendingMessages(0);
+        next.clearPendingMessages(pendingAdditionsCount);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/01384c71/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
index 50cb1c9..6aef533 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
@@ -21,14 +21,22 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQXAConnection;
 import org.apache.activemq.ActiveMQXAConnectionFactory;
 import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.filter.AnyDestination;
+import org.apache.activemq.filter.DestinationMap;
+import org.apache.activemq.jaas.GroupPrincipal;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.security.AuthorizationPlugin;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
+import org.apache.activemq.security.SimpleAuthorizationMap;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.Wait;
 import org.apache.activemq.wireformat.WireFormat;
@@ -73,12 +81,12 @@ public class XACompletionTest extends TestSupport {
     @Parameterized.Parameter
     public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;
 
-    @Parameterized.Parameters(name="store={0}")
+    @Parameterized.Parameters(name = "store={0}")
     public static Iterable<Object[]> getTestParameters() {
-        return Arrays.asList(new Object[][]{ {TestSupport.PersistenceAdapterChoice.KahaDB},{PersistenceAdapterChoice.JDBC} });
+        return Arrays.asList(new Object[][]{{TestSupport.PersistenceAdapterChoice.KahaDB}, {PersistenceAdapterChoice.JDBC}});
     }
 
-        @Before
+    @Before
     public void setUp() throws Exception {
         broker = createBroker();
     }
@@ -293,7 +301,7 @@ public class XACompletionTest extends TestSupport {
         assertNotNull("message gone", browsed);
 
         LOG.info("Try receive... after");
-        for (int i=0; i<10; i++) {
+        for (int i = 0; i < 10; i++) {
             Message message = regularReceive("TEST");
             assertNotNull("message gone", message);
         }
@@ -383,7 +391,7 @@ public class XACompletionTest extends TestSupport {
 
 
         LOG.info("Try receive... after rollback");
-        for (int i=0;i<10; i++) {
+        for (int i = 0; i < 10; i++) {
             Message message = regularReceive("TEST");
             assertNotNull("message gone: " + i, message);
         }
@@ -464,6 +472,7 @@ public class XACompletionTest extends TestSupport {
 
         // set maxBatchSize=1
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + 1);
+        factory.setWatchTopicAdvisories(false);
         javax.jms.Connection connection = factory.createConnection();
         connection.start();
         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -472,6 +481,7 @@ public class XACompletionTest extends TestSupport {
         consumer.close();
 
         ActiveMQConnectionFactory receiveFactory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
+        receiveFactory.setWatchTopicAdvisories(false);
 
         // recover/rollback the second tx
         ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
@@ -483,7 +493,7 @@ public class XACompletionTest extends TestSupport {
         xids = xaResource.recover(XAResource.TMSTARTRSCAN);
         xaResource.recover(XAResource.TMNOFLAGS);
 
-        for (int i=0; i< xids.length; i++) {
+        for (int i = 0; i < xids.length; i++) {
             xaResource.rollback(xids[i]);
         }
 
@@ -523,7 +533,7 @@ public class XACompletionTest extends TestSupport {
 
         final Xid tid = createXid();
         byte[] branch = tid.getBranchQualifier();
-        final byte[] branch2  = Arrays.copyOf(branch, branch.length);
+        final byte[] branch2 = Arrays.copyOf(branch, branch.length);
         branch2[0] = '!';
 
         Xid branchTid = new Xid() {
@@ -639,7 +649,7 @@ public class XACompletionTest extends TestSupport {
 
         final Xid tid = createXid();
         byte[] branch = tid.getBranchQualifier();
-        final byte[] branch2  = Arrays.copyOf(branch, branch.length);
+        final byte[] branch2 = Arrays.copyOf(branch, branch.length);
         branch2[0] = '!';
 
         Xid branchTid = new Xid() {
@@ -771,7 +781,7 @@ public class XACompletionTest extends TestSupport {
 
         final Xid tid = createXid();
         byte[] branch = tid.getBranchQualifier();
-        final byte[] branch2  = Arrays.copyOf(branch, branch.length);
+        final byte[] branch2 = Arrays.copyOf(branch, branch.length);
         branch2[0] = '!';
 
         Xid branchTid = new Xid() {
@@ -870,6 +880,7 @@ public class XACompletionTest extends TestSupport {
 
     private Message regularReceive(String qName) throws Exception {
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        factory.setWatchTopicAdvisories(false);
         return regularReceiveWith(factory, qName);
     }
 
@@ -889,6 +900,7 @@ public class XACompletionTest extends TestSupport {
     private int drainUnack(int limit, String qName) throws Exception {
         int drained = 0;
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + limit);
+        factory.setWatchTopicAdvisories(false);
         javax.jms.Connection connection = factory.createConnection();
         try {
             connection.start();
@@ -897,7 +909,8 @@ public class XACompletionTest extends TestSupport {
             MessageConsumer consumer = session.createConsumer(destination);
             while (drained < limit && consumer.receive(2000) != null) {
                 drained++;
-            };
+            }
+            ;
             consumer.close();
         } finally {
             connection.close();
@@ -921,6 +934,7 @@ public class XACompletionTest extends TestSupport {
             connection.close();
         }
     }
+
     protected void sendMessages(int messagesExpected) throws Exception {
         sendMessagesWith(factory, messagesExpected);
     }
@@ -933,9 +947,9 @@ public class XACompletionTest extends TestSupport {
         MessageProducer producer = session.createProducer(destination);
         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
 
-        for (int i=0; i<messagesExpected; i++) {
-            LOG.debug("Sending message " + (i+1) + " of " + messagesExpected);
-            producer.send(session.createTextMessage("test message " + (i+1)));
+        for (int i = 0; i < messagesExpected; i++) {
+            LOG.debug("Sending message " + (i + 1) + " of " + messagesExpected);
+            producer.send(session.createTextMessage("test message " + (i + 1)));
         }
         connection.close();
     }
@@ -950,9 +964,9 @@ public class XACompletionTest extends TestSupport {
         PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG, XID FROM ACTIVEMQ_MSGS");
         ResultSet result = statement.executeQuery();
         LOG.info("Messages in broker db...");
-        while(result.next()) {
+        while (result.next()) {
             long id = result.getLong(1);
-            org.apache.activemq.command.Message message = (org.apache.activemq.command.Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
+            org.apache.activemq.command.Message message = (org.apache.activemq.command.Message) wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
             String xid = result.getString(3);
             LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", XID:" + xid + ", MSG: " + message);
         }
@@ -985,6 +999,24 @@ public class XACompletionTest extends TestSupport {
         setPersistenceAdapter(broker, persistenceAdapterChoice);
         broker.setPersistent(true);
         connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+
+        // ensure we run through a destination filter
+        final String id = "a";
+        AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin();
+        SimpleAuthorizationMap map = new SimpleAuthorizationMap();
+        DestinationMap destinationMap = new DestinationMap();
+        GroupPrincipal anaGroup = new GroupPrincipal(id);
+        destinationMap.put(new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">")}), anaGroup);
+        map.setWriteACLs(destinationMap);
+        map.setAdminACLs(destinationMap);
+        map.setReadACLs(destinationMap);
+        authorizationPlugin.setMap(map);
+        SimpleAuthenticationPlugin simpleAuthenticationPlugin = new SimpleAuthenticationPlugin();
+        simpleAuthenticationPlugin.setAnonymousAccessAllowed(true);
+        simpleAuthenticationPlugin.setAnonymousGroup(id);
+        simpleAuthenticationPlugin.setAnonymousUser(id);
+
+        broker.setPlugins(new BrokerPlugin[]{simpleAuthenticationPlugin, authorizationPlugin});
         broker.start();
         return broker;
     }