You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/05/18 13:44:25 UTC
activemq git commit: AMQ-6707 - fix destination filter delegate
param, refactor-auto-gen method; jees
Repository: activemq
Updated Branches:
refs/heads/master 2eff835ee -> 01384c714
AMQ-6707 - fix destination filter delegate param, refactor-auto-gen method; jees
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/01384c71
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/01384c71
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/01384c71
Branch: refs/heads/master
Commit: 01384c714dbe0405d876b93849e6fff5ec048bff
Parents: 2eff835
Author: gtully <ga...@gmail.com>
Authored: Fri May 18 14:44:05 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri May 18 14:44:05 2018 +0100
----------------------------------------------------------------------
.../broker/region/DestinationFilter.java | 2 +-
.../activemq/store/jdbc/XACompletionTest.java | 62 +++++++++++++++-----
2 files changed, 48 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/01384c71/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
index dad8501..1897c23 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
@@ -380,7 +380,7 @@ public class DestinationFilter implements Destination {
@Override
public void clearPendingMessages(int pendingAdditionsCount) {
- next.clearPendingMessages(0);
+ next.clearPendingMessages(pendingAdditionsCount);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/01384c71/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
index 50cb1c9..6aef533 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
@@ -21,14 +21,22 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnection;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.filter.AnyDestination;
+import org.apache.activemq.filter.DestinationMap;
+import org.apache.activemq.jaas.GroupPrincipal;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.security.AuthorizationPlugin;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
+import org.apache.activemq.security.SimpleAuthorizationMap;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait;
import org.apache.activemq.wireformat.WireFormat;
@@ -73,12 +81,12 @@ public class XACompletionTest extends TestSupport {
@Parameterized.Parameter
public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;
- @Parameterized.Parameters(name="store={0}")
+ @Parameterized.Parameters(name = "store={0}")
public static Iterable<Object[]> getTestParameters() {
- return Arrays.asList(new Object[][]{ {TestSupport.PersistenceAdapterChoice.KahaDB},{PersistenceAdapterChoice.JDBC} });
+ return Arrays.asList(new Object[][]{{TestSupport.PersistenceAdapterChoice.KahaDB}, {PersistenceAdapterChoice.JDBC}});
}
- @Before
+ @Before
public void setUp() throws Exception {
broker = createBroker();
}
@@ -293,7 +301,7 @@ public class XACompletionTest extends TestSupport {
assertNotNull("message gone", browsed);
LOG.info("Try receive... after");
- for (int i=0; i<10; i++) {
+ for (int i = 0; i < 10; i++) {
Message message = regularReceive("TEST");
assertNotNull("message gone", message);
}
@@ -383,7 +391,7 @@ public class XACompletionTest extends TestSupport {
LOG.info("Try receive... after rollback");
- for (int i=0;i<10; i++) {
+ for (int i = 0; i < 10; i++) {
Message message = regularReceive("TEST");
assertNotNull("message gone: " + i, message);
}
@@ -464,6 +472,7 @@ public class XACompletionTest extends TestSupport {
// set maxBatchSize=1
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + 1);
+ factory.setWatchTopicAdvisories(false);
javax.jms.Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -472,6 +481,7 @@ public class XACompletionTest extends TestSupport {
consumer.close();
ActiveMQConnectionFactory receiveFactory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
+ receiveFactory.setWatchTopicAdvisories(false);
// recover/rollback the second tx
ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
@@ -483,7 +493,7 @@ public class XACompletionTest extends TestSupport {
xids = xaResource.recover(XAResource.TMSTARTRSCAN);
xaResource.recover(XAResource.TMNOFLAGS);
- for (int i=0; i< xids.length; i++) {
+ for (int i = 0; i < xids.length; i++) {
xaResource.rollback(xids[i]);
}
@@ -523,7 +533,7 @@ public class XACompletionTest extends TestSupport {
final Xid tid = createXid();
byte[] branch = tid.getBranchQualifier();
- final byte[] branch2 = Arrays.copyOf(branch, branch.length);
+ final byte[] branch2 = Arrays.copyOf(branch, branch.length);
branch2[0] = '!';
Xid branchTid = new Xid() {
@@ -639,7 +649,7 @@ public class XACompletionTest extends TestSupport {
final Xid tid = createXid();
byte[] branch = tid.getBranchQualifier();
- final byte[] branch2 = Arrays.copyOf(branch, branch.length);
+ final byte[] branch2 = Arrays.copyOf(branch, branch.length);
branch2[0] = '!';
Xid branchTid = new Xid() {
@@ -771,7 +781,7 @@ public class XACompletionTest extends TestSupport {
final Xid tid = createXid();
byte[] branch = tid.getBranchQualifier();
- final byte[] branch2 = Arrays.copyOf(branch, branch.length);
+ final byte[] branch2 = Arrays.copyOf(branch, branch.length);
branch2[0] = '!';
Xid branchTid = new Xid() {
@@ -870,6 +880,7 @@ public class XACompletionTest extends TestSupport {
private Message regularReceive(String qName) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+ factory.setWatchTopicAdvisories(false);
return regularReceiveWith(factory, qName);
}
@@ -889,6 +900,7 @@ public class XACompletionTest extends TestSupport {
private int drainUnack(int limit, String qName) throws Exception {
int drained = 0;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + limit);
+ factory.setWatchTopicAdvisories(false);
javax.jms.Connection connection = factory.createConnection();
try {
connection.start();
@@ -897,7 +909,8 @@ public class XACompletionTest extends TestSupport {
MessageConsumer consumer = session.createConsumer(destination);
while (drained < limit && consumer.receive(2000) != null) {
drained++;
- };
+ }
+ ;
consumer.close();
} finally {
connection.close();
@@ -921,6 +934,7 @@ public class XACompletionTest extends TestSupport {
connection.close();
}
}
+
protected void sendMessages(int messagesExpected) throws Exception {
sendMessagesWith(factory, messagesExpected);
}
@@ -933,9 +947,9 @@ public class XACompletionTest extends TestSupport {
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- for (int i=0; i<messagesExpected; i++) {
- LOG.debug("Sending message " + (i+1) + " of " + messagesExpected);
- producer.send(session.createTextMessage("test message " + (i+1)));
+ for (int i = 0; i < messagesExpected; i++) {
+ LOG.debug("Sending message " + (i + 1) + " of " + messagesExpected);
+ producer.send(session.createTextMessage("test message " + (i + 1)));
}
connection.close();
}
@@ -950,9 +964,9 @@ public class XACompletionTest extends TestSupport {
PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG, XID FROM ACTIVEMQ_MSGS");
ResultSet result = statement.executeQuery();
LOG.info("Messages in broker db...");
- while(result.next()) {
+ while (result.next()) {
long id = result.getLong(1);
- org.apache.activemq.command.Message message = (org.apache.activemq.command.Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
+ org.apache.activemq.command.Message message = (org.apache.activemq.command.Message) wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
String xid = result.getString(3);
LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", XID:" + xid + ", MSG: " + message);
}
@@ -985,6 +999,24 @@ public class XACompletionTest extends TestSupport {
setPersistenceAdapter(broker, persistenceAdapterChoice);
broker.setPersistent(true);
connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+
+ // ensure we run through a destination filter
+ final String id = "a";
+ AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin();
+ SimpleAuthorizationMap map = new SimpleAuthorizationMap();
+ DestinationMap destinationMap = new DestinationMap();
+ GroupPrincipal anaGroup = new GroupPrincipal(id);
+ destinationMap.put(new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">")}), anaGroup);
+ map.setWriteACLs(destinationMap);
+ map.setAdminACLs(destinationMap);
+ map.setReadACLs(destinationMap);
+ authorizationPlugin.setMap(map);
+ SimpleAuthenticationPlugin simpleAuthenticationPlugin = new SimpleAuthenticationPlugin();
+ simpleAuthenticationPlugin.setAnonymousAccessAllowed(true);
+ simpleAuthenticationPlugin.setAnonymousGroup(id);
+ simpleAuthenticationPlugin.setAnonymousUser(id);
+
+ broker.setPlugins(new BrokerPlugin[]{simpleAuthenticationPlugin, authorizationPlugin});
broker.start();
return broker;
}