You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/18 00:36:16 UTC
[01/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5277 - jdbc store make use of
entryLocator on ack
Repository: activemq
Updated Branches:
refs/heads/activemq-5.10.x fc244f48e -> b41f92127
https://issues.apache.org/jira/browse/AMQ-5277 - jdbc store make use of entryLocator on ack
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7f797046
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7f797046
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7f797046
Branch: refs/heads/activemq-5.10.x
Commit: 7f797046951b9cf2ef114582ac8e72ba88989110
Parents: fc244f4
Author: gtully <ga...@gmail.com>
Authored: Mon Jul 14 16:58:47 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Tue Dec 16 21:50:54 2014 -0500
----------------------------------------------------------------------
.../java/org/apache/activemq/broker/region/BaseDestination.java | 5 +++--
.../java/org/apache/activemq/store/jdbc/JDBCMessageStore.java | 5 ++++-
2 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/7f797046/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index c3841c8..03513aa 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -782,10 +782,11 @@ public abstract class BaseDestination implements Destination {
ack.copy(a);
ack = a;
// Convert to non-ranged.
- ack.setFirstMessageId(node.getMessageId());
- ack.setLastMessageId(node.getMessageId());
ack.setMessageCount(1);
}
+ // always use node messageId so we can access entry/data Location
+ ack.setFirstMessageId(node.getMessageId());
+ ack.setLastMessageId(node.getMessageId());
return ack;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7f797046/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 968b928..76ecced 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -207,7 +207,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
- long seq = persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0];
+ long seq = ack.getLastMessageId().getEntryLocator() != null ?
+ (Long) ack.getLastMessageId().getEntryLocator() :
+ persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0];
// Get a connection and remove the message from the DB
TransactionContext c = persistenceAdapter.getTransactionContext(context);
@@ -309,6 +311,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
+ msg.getMessageId().setEntryLocator(sequenceId);
listener.recoverMessage(msg);
lastRecoveredSequenceId.set(sequenceId);
lastRecoveredPriority.set(msg.getPriority());
[04/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5198
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5198
member variable scheduler assignment escapes the synchronization block
before the scheduler instance is fully initialized.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/060e8aee
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/060e8aee
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/060e8aee
Branch: refs/heads/activemq-5.10.x
Commit: 060e8aee78385bf61400284c609b417dfb4975bb
Parents: 9f61fd5
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jul 18 10:11:25 2014 -0400
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 14:47:52 2014 -0500
----------------------------------------------------------------------
.../src/main/java/org/apache/activemq/ActiveMQConnection.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/060e8aee/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index 2df8607..9252310 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -2556,8 +2556,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
if (result == null) {
checkClosed();
try {
- result = scheduler = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
- scheduler.start();
+ result = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
+ result.start();
+ scheduler = result;
} catch(Exception e) {
throw JMSExceptionSupport.create(e);
}
[09/17] activemq git commit: Disable JMX on the test brokers as it's
not needed in these tests,
prevents some failures and speeds things up a little.
Posted by ha...@apache.org.
Disable JMX on the test brokers as it's not needed in these tests,
prevents some failures and speeds things up a little.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/31015094
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/31015094
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/31015094
Branch: refs/heads/activemq-5.10.x
Commit: 31015094aae9945deed406ebf9bbe65aee85fe6a
Parents: d8b126d
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jul 30 09:40:09 2014 -0400
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 15:07:36 2014 -0500
----------------------------------------------------------------------
.../activemq/jms/pool/PooledConnectionFactoryTest.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/31015094/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java
index e20a605..2d3404f 100644
--- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java
@@ -58,7 +58,8 @@ public class PooledConnectionFactoryTest {
@Test
public void testClearAllConnections() throws Exception {
- ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+ ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(
+ "vm://broker1?marshal=false&broker.persistent=false&broker.useJmx=false");
PooledConnectionFactory cf = new PooledConnectionFactory();
cf.setConnectionFactory(amq);
cf.setMaxConnections(3);
@@ -89,7 +90,8 @@ public class PooledConnectionFactoryTest {
@Test
public void testMaxConnectionsAreCreated() throws Exception {
- ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+ ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(
+ "vm://broker1?marshal=false&broker.persistent=false&broker.useJmx=false");
PooledConnectionFactory cf = new PooledConnectionFactory();
cf.setConnectionFactory(amq);
cf.setMaxConnections(3);
@@ -108,7 +110,8 @@ public class PooledConnectionFactoryTest {
@Test
public void testConnectionsAreRotated() throws Exception {
- ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+ ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(
+ "vm://broker1?marshal=false&broker.persistent=false&broker.useJmx=false");
PooledConnectionFactory cf = new PooledConnectionFactory();
cf.setConnectionFactory(amq);
cf.setMaxConnections(10);
@@ -149,7 +152,8 @@ public class PooledConnectionFactoryTest {
@Test
public void testConnectionsArePooledAsyncCreate() throws Exception {
- final ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+ final ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(
+ "vm://broker1?marshal=false&broker.persistent=false&broker.useJmx=false");
final PooledConnectionFactory cf = new PooledConnectionFactory();
cf.setConnectionFactory(amq);
cf.setMaxConnections(1);
@@ -205,6 +209,7 @@ public class PooledConnectionFactoryTest {
BrokerService brokerService = new BrokerService();
brokerService.setPersistent(false);
+ brokerService.setUseJmx(false);
brokerService.addConnector("tcp://localhost:0");
brokerService.start();
brokerService.waitUntilStarted();
[08/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5080 - revert short circut of
recover when failover transport is not connected - using
maxstartupMaxReconnectAttempts=x provides a better alternative. tm recovery
need not start a man
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5080 - revert short circut of recover when failover transport is not connected - using maxstartupMaxReconnectAttempts=x provides a better alternative. tm recovery need not start a managed connection or call getConnection before a call to xaresource.recover
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d8b126db
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d8b126db
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d8b126db
Branch: refs/heads/activemq-5.10.x
Commit: d8b126dbed7e34f7bbd7fefa43127f2938f5f56f
Parents: e4183ec
Author: gtully <ga...@gmail.com>
Authored: Mon Jul 28 15:54:12 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 14:57:55 2014 -0500
----------------------------------------------------------------------
.../java/org/apache/activemq/TransactionContext.java | 13 ++-----------
1 file changed, 2 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8b126db/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
index 8e36ed2..c86f448 100755
--- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
+++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
@@ -643,20 +643,11 @@ public class TransactionContext implements XAResource {
}
public Xid[] recover(int flag) throws XAException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Recover: " + flag);
- }
+ LOG.debug("recover({})", flag);
TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
try {
this.connection.checkClosedOrFailed();
- final FailoverTransport failoverTransport = this.connection.getTransport().narrow(FailoverTransport.class);
- if (failoverTransport != null && !failoverTransport.isConnected()) {
- // otherwise call will block on reconnect forfeting any app level periodic check
- XAException xaException = new XAException("Failover transport not connected: " + this.getConnection());
- xaException.errorCode = XAException.XAER_RMERR;
- throw xaException;
- }
this.connection.ensureConnectionInfoSent();
DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
@@ -668,7 +659,7 @@ public class TransactionContext implements XAResource {
answer = new XATransactionId[data.length];
System.arraycopy(data, 0, answer, 0, data.length);
}
- LOG.trace("recover({})={}", flag, answer);
+ LOG.debug("recover({})={}", flag, answer);
return answer;
} catch (JMSException e) {
throw toXAException(e);
[11/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5290
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5290
Add some utility methods that are useful for restoring past
subscriptions.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/07bfc1ef
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/07bfc1ef
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/07bfc1ef
Branch: refs/heads/activemq-5.10.x
Commit: 07bfc1ef0e5887e7d4ec7c1e8768d2e0cca4e254
Parents: 9b4f6ac
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jul 30 11:31:57 2014 -0400
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 15:09:45 2014 -0500
----------------------------------------------------------------------
.../store/PersistenceAdapterSupport.java | 134 +++++++++++++++++--
1 file changed, 125 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/07bfc1ef/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterSupport.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterSupport.java
index aca4574..491c5ed 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterSupport.java
@@ -16,26 +16,61 @@
*/
package org.apache.activemq.store;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.SubscriptionInfo;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.SubscriptionInfo;
+
/**
* Used to implement common PersistenceAdapter methods.
*/
public class PersistenceAdapterSupport {
- static public List<SubscriptionInfo> listSubscriptions(PersistenceAdapter pa, String clientId) throws IOException {
+ private static final DestinationMatcher MATCH_ALL = new AlwaysMatches();
+
+ /**
+ * Provides an interface for a Destination matching object that can be used to
+ * search for specific destinations from a persistence adapter.
+ */
+ public interface DestinationMatcher {
+
+ /**
+ * Given a Destination object, return true if the destination matches some defined
+ * search criteria, false otherwise.
+ *
+ * @param destination
+ * the destination to inspect.
+ *
+ * @return true if the destination matches the target criteria, false otherwise.
+ */
+ boolean matches(ActiveMQDestination destination);
+
+ }
+
+ /**
+ * Searches the set of subscriptions from the given persistence adapter and returns all those
+ * that belong to the given ClientId value.
+ *
+ * @param adapter
+ * the persistence adapter instance to search within.
+ * @param clientId
+ * the client ID value used to filter the subscription set.
+ *
+ * @return a list of all subscriptions belonging to the given client.
+ *
+ * @throws IOException if an error occurs while listing the stored subscriptions.
+ */
+ static public List<SubscriptionInfo> listSubscriptions(PersistenceAdapter adapter, String clientId) throws IOException {
ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
- for (ActiveMQDestination destination : pa.getDestinations()) {
- if( destination.isTopic() ) {
- TopicMessageStore store = pa.createTopicMessageStore((ActiveMQTopic) destination);
+ for (ActiveMQDestination destination : adapter.getDestinations()) {
+ if (destination.isTopic()) {
+ TopicMessageStore store = adapter.createTopicMessageStore((ActiveMQTopic) destination);
for (SubscriptionInfo sub : store.getAllSubscriptions()) {
- if(clientId==sub.getClientId() || clientId.equals(sub.getClientId()) ) {
+ if (clientId == sub.getClientId() || clientId.equals(sub.getClientId())) {
rc.add(sub);
}
}
@@ -44,4 +79,85 @@ public class PersistenceAdapterSupport {
return rc;
}
+ /**
+ * Provides a means of querying the persistence adapter for a list of ActiveMQQueue instances.
+ *
+ * @param adapter
+ * the persistence adapter instance to query.
+ *
+ * @return a List<ActiveMQQeue> with all the queue destinations.
+ *
+ * @throws IOException if an error occurs while reading the destinations.
+ */
+ static public List<ActiveMQQueue> listQueues(PersistenceAdapter adapter) throws IOException {
+ return listQueues(adapter, MATCH_ALL);
+ }
+
+ /**
+ * Provides a means of querying the persistence adapter for a list of ActiveMQQueue instances
+ * that match some given search criteria.
+ *
+ * @param adapter
+ * the persistence adapter instance to query.
+ * @param matcher
+ * the DestinationMatcher instance used to find the target destinations.
+ *
+ * @return a List<ActiveMQQeue> with all the matching destinations.
+ *
+ * @throws IOException if an error occurs while reading the destinations.
+ */
+ static public List<ActiveMQQueue> listQueues(PersistenceAdapter adapter, DestinationMatcher matcher) throws IOException {
+ ArrayList<ActiveMQQueue> rc = new ArrayList<ActiveMQQueue>();
+ for (ActiveMQDestination destination : adapter.getDestinations()) {
+ if (destination.isQueue() && matcher.matches(destination)) {
+ rc.add((ActiveMQQueue) destination);
+ }
+ }
+ return rc;
+ }
+
+ /**
+ * Provides a means of querying the persistence adapter for a list of ActiveMQTopic instances.
+ *
+ * @param adapter
+ * the persistence adapter instance to query.
+ *
+ * @return a List<ActiveMQTopic> with all the topic destinations.
+ *
+ * @throws IOException if an error occurs while reading the destinations.
+ */
+ static public List<ActiveMQTopic> listTopics(PersistenceAdapter adapter) throws IOException {
+ return listTopics(adapter, MATCH_ALL);
+ }
+
+ /**
+ * Provides a means of querying the persistence adapter for a list of ActiveMQTopic instances
+ * that match some given search criteria.
+ *
+ * @param adapter
+ * the persistence adapter instance to query.
+ * @param matcher
+ * the DestinationMatcher instance used to find the target destinations.
+ *
+ * @return a List<ActiveMQTopic> with all the matching destinations.
+ *
+ * @throws IOException if an error occurs while reading the destinations.
+ */
+ static public List<ActiveMQTopic> listTopics(PersistenceAdapter adapter, DestinationMatcher matcher) throws IOException {
+ ArrayList<ActiveMQTopic> rc = new ArrayList<ActiveMQTopic>();
+ for (ActiveMQDestination destination : adapter.getDestinations()) {
+ if (destination.isTopic() && matcher.matches(destination)) {
+ rc.add((ActiveMQTopic) destination);
+ }
+ }
+ return rc;
+ }
+
+ private static class AlwaysMatches implements DestinationMatcher {
+
+ @Override
+ public boolean matches(ActiveMQDestination destination) {
+ return true;
+ }
+ }
}
[15/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5304 - still missing unit test case
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5304 - still missing unit test case
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d8e30267
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d8e30267
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d8e30267
Branch: refs/heads/activemq-5.10.x
Commit: d8e30267341cba342f3c624650bbc6338f612711
Parents: 902692e
Author: Torsten Mielke <tm...@redhat.com>
Authored: Fri Aug 1 15:18:46 2014 +0200
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 16:47:19 2014 -0500
----------------------------------------------------------------------
.../security/TempDestinationAuthorizationEntry.java | 13 +++++++++++++
.../activemq/security/XBeanAuthorizationMap.java | 5 +++++
2 files changed, 18 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8e30267/activemq-broker/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java b/activemq-broker/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java
index 8f6a68a..fe19e56 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java
@@ -30,6 +30,19 @@ public class TempDestinationAuthorizationEntry extends AuthorizationEntry {
// we don't need to check if destination is specified since
// the TempDestinationAuthorizationEntry should map to all temp
// destinations
+
+
+ if (adminRoles != null) {
+ setAdminACLs(parseACLs(adminRoles));
+ }
+
+ if (writeRoles != null) {
+ setWriteACLs(parseACLs(writeRoles));
+ }
+
+ if (readRoles != null) {
+ setReadACLs(parseACLs(readRoles));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8e30267/activemq-spring/src/main/java/org/apache/activemq/security/XBeanAuthorizationMap.java
----------------------------------------------------------------------
diff --git a/activemq-spring/src/main/java/org/apache/activemq/security/XBeanAuthorizationMap.java b/activemq-spring/src/main/java/org/apache/activemq/security/XBeanAuthorizationMap.java
index c8b71e5..8d43efb 100644
--- a/activemq-spring/src/main/java/org/apache/activemq/security/XBeanAuthorizationMap.java
+++ b/activemq-spring/src/main/java/org/apache/activemq/security/XBeanAuthorizationMap.java
@@ -57,6 +57,11 @@ public class XBeanAuthorizationMap extends DefaultAuthorizationMap implements In
}
((XBeanAuthorizationEntry)entry).afterPropertiesSet();
}
+
+ // also check group class of temp destination ACL
+ if (getTempDestinationAuthorizationEntry() != null && getTempDestinationAuthorizationEntry().getGroupClass() != null) {
+ getTempDestinationAuthorizationEntry().afterPropertiesSet();
+ }
super.setEntries(authorizationEntries);
}
[10/17] activemq git commit: Web-console does not initialize under
Spring-4.x because dispatcher-servlet.xml has very old bean definition. This
closes #37 Signed-off-by: Daniel Kulp
Posted by ha...@apache.org.
Web-console does not initialize under Spring-4.x because dispatcher-servlet.xml has very old bean definition.
This closes #37
Signed-off-by: Daniel Kulp <dk...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9b4f6ac9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9b4f6ac9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9b4f6ac9
Branch: refs/heads/activemq-5.10.x
Commit: 9b4f6ac90e647045e8504ac716f58c4da4a20bcb
Parents: 3101509
Author: Andreas Kuhtz <an...@gmail.com>
Authored: Wed Jul 30 14:35:11 2014 +0200
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 15:08:56 2014 -0500
----------------------------------------------------------------------
.../main/webapp/WEB-INF/dispatcher-servlet.xml | 29 +++++++++++---------
1 file changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/9b4f6ac9/activemq-web-console/src/main/webapp/WEB-INF/dispatcher-servlet.xml
----------------------------------------------------------------------
diff --git a/activemq-web-console/src/main/webapp/WEB-INF/dispatcher-servlet.xml b/activemq-web-console/src/main/webapp/WEB-INF/dispatcher-servlet.xml
index da02abe..5aa4951 100644
--- a/activemq-web-console/src/main/webapp/WEB-INF/dispatcher-servlet.xml
+++ b/activemq-web-console/src/main/webapp/WEB-INF/dispatcher-servlet.xml
@@ -15,10 +15,13 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
-<beans>
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" >
- <bean id="handlerMapping" class="org.apache.activemq.web.handler.BindingBeanNameUrlHandlerMapping" singleton="false">
+ <bean id="handlerMapping" class="org.apache.activemq.web.handler.BindingBeanNameUrlHandlerMapping" scope="prototype">
<!--
<property name="uriToClassNames">
<props><prop key="/foo.action">org.apache.activemq.web.controller.Foo</prop></props>
@@ -26,16 +29,16 @@
-->
</bean>
- <bean name="/createDestination.action" class="org.apache.activemq.web.controller.CreateDestination" autowire="constructor" singleton="false"/>
- <bean name="/deleteDestination.action" class="org.apache.activemq.web.controller.DeleteDestination" autowire="constructor" singleton="false"/>
- <bean name="/createSubscriber.action" class="org.apache.activemq.web.controller.CreateSubscriber" autowire="constructor" singleton="false"/>
- <bean name="/deleteSubscriber.action" class="org.apache.activemq.web.controller.DeleteSubscriber" autowire="constructor" singleton="false"/>
- <bean name="/sendMessage.action" class="org.apache.activemq.web.controller.SendMessage" autowire="constructor" singleton="false"/>
- <bean name="/purgeDestination.action" class="org.apache.activemq.web.controller.PurgeDestination" autowire="constructor" singleton="false"/>
- <bean name="/deleteMessage.action" class="org.apache.activemq.web.controller.DeleteMessage" autowire="constructor" singleton="false"/>
- <bean name="/copyMessage.action" class="org.apache.activemq.web.controller.CopyMessage" autowire="constructor" singleton="false"/>
- <bean name="/moveMessage.action" class="org.apache.activemq.web.controller.MoveMessage" autowire="constructor" singleton="false"/>
- <bean name="/deleteJob.action" class="org.apache.activemq.web.controller.DeleteJob" autowire="constructor" singleton="false"/>
+ <bean name="/createDestination.action" class="org.apache.activemq.web.controller.CreateDestination" autowire="constructor" scope="prototype"/>
+ <bean name="/deleteDestination.action" class="org.apache.activemq.web.controller.DeleteDestination" autowire="constructor" scope="prototype"/>
+ <bean name="/createSubscriber.action" class="org.apache.activemq.web.controller.CreateSubscriber" autowire="constructor" scope="prototype"/>
+ <bean name="/deleteSubscriber.action" class="org.apache.activemq.web.controller.DeleteSubscriber" autowire="constructor" scope="prototype"/>
+ <bean name="/sendMessage.action" class="org.apache.activemq.web.controller.SendMessage" autowire="constructor" scope="prototype"/>
+ <bean name="/purgeDestination.action" class="org.apache.activemq.web.controller.PurgeDestination" autowire="constructor" scope="prototype"/>
+ <bean name="/deleteMessage.action" class="org.apache.activemq.web.controller.DeleteMessage" autowire="constructor" scope="prototype"/>
+ <bean name="/copyMessage.action" class="org.apache.activemq.web.controller.CopyMessage" autowire="constructor" scope="prototype"/>
+ <bean name="/moveMessage.action" class="org.apache.activemq.web.controller.MoveMessage" autowire="constructor" scope="prototype"/>
+ <bean name="/deleteJob.action" class="org.apache.activemq.web.controller.DeleteJob" autowire="constructor" scope="prototype"/>
<!--
- This bean resolves specific types of exception to corresponding error views.
[07/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5289 - add forwardCount to
destinationStatistics - allow local consumption to be accounted with
dequeueCount - forwardCount so forwarded messages are not accounted for num
hops times
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5289 - add forwardCount to destinationStatistics - allow local consumption to be accounted with dequeueCount - forwardCount so forwarded messages are not accounted for num hops times
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e4183ec4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e4183ec4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e4183ec4
Branch: refs/heads/activemq-5.10.x
Commit: e4183ec48d51efb086ca4d9ce1cbc458931f98c6
Parents: 9146785
Author: gtully <ga...@gmail.com>
Authored: Fri Jul 25 11:46:36 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 14:56:21 2014 -0500
----------------------------------------------------------------------
.../apache/activemq/broker/jmx/DestinationView.java | 5 +++++
.../activemq/broker/jmx/DestinationViewMBean.java | 10 ++++++++++
.../broker/region/DestinationStatistics.java | 10 ++++++++++
.../org/apache/activemq/broker/region/Queue.java | 3 +++
.../activemq/broker/region/TopicSubscription.java | 3 +++
.../activemq/console/command/DstatCommand.java | 9 ++++++---
.../org/apache/activemq/broker/jmx/MBeanTest.java | 1 +
.../activemq/network/DemandForwardingBridgeTest.java | 15 +++++++++++++++
.../usecases/ThreeBrokerTopicNetworkTest.java | 4 ++++
9 files changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index 3f62943..ec6fe7c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -90,6 +90,11 @@ public class DestinationView implements DestinationViewMBean {
}
@Override
+ public long getForwardCount() {
+ return destination.getDestinationStatistics().getForwards().getCount();
+ }
+
+ @Override
public long getDispatchCount() {
return destination.getDestinationStatistics().getDispatched().getCount();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
index f83d47e..a42bcfa 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
@@ -70,6 +70,16 @@ public interface DestinationViewMBean {
long getDequeueCount();
/**
+ * Returns the number of messages that have been acknowledged by network subscriptions from the
+ * destination.
+ *
+ * @return The number of messages that have been acknowledged by network subscriptions from the
+ * destination.
+ */
+ @MBeanInfo("Number of messages that have been forwarded (to a networked broker) from the destination.")
+ long getForwardCount();
+
+ /**
* Returns the number of messages that have been dispatched but not
* acknowledged
*
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
index ee2b478..0a9176e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
@@ -31,6 +31,7 @@ public class DestinationStatistics extends StatsImpl {
protected CountStatisticImpl enqueues;
protected CountStatisticImpl dequeues;
+ protected CountStatisticImpl forwards;
protected CountStatisticImpl consumers;
protected CountStatisticImpl producers;
protected CountStatisticImpl messages;
@@ -49,6 +50,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination");
+ forwards = new CountStatisticImpl("forwards", "The number of messages that have been forwarded to a networked broker from the destination");
inflight = new CountStatisticImpl("inflight", "The number of messages dispatched but awaiting acknowledgement");
expired = new CountStatisticImpl("expired", "The number of messages that have expired");
@@ -86,6 +88,10 @@ public class DestinationStatistics extends StatsImpl {
return dequeues;
}
+ public CountStatisticImpl getForwards() {
+ return forwards;
+ }
+
public CountStatisticImpl getInflight() {
return inflight;
}
@@ -137,6 +143,7 @@ public class DestinationStatistics extends StatsImpl {
super.reset();
enqueues.reset();
dequeues.reset();
+ forwards.reset();
dispatched.reset();
inflight.reset();
expired.reset();
@@ -151,6 +158,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues.setEnabled(enabled);
dispatched.setEnabled(enabled);
dequeues.setEnabled(enabled);
+ forwards.setEnabled(enabled);
inflight.setEnabled(enabled);
expired.setEnabled(true);
consumers.setEnabled(enabled);
@@ -169,6 +177,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues.setParent(parent.enqueues);
dispatched.setParent(parent.dispatched);
dequeues.setParent(parent.dequeues);
+ forwards.setParent(parent.forwards);
inflight.setParent(parent.inflight);
expired.setParent(parent.expired);
consumers.setParent(parent.consumers);
@@ -183,6 +192,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues.setParent(null);
dispatched.setParent(null);
dequeues.setParent(null);
+ forwards.setParent(null);
inflight.setParent(null);
expired.setParent(null);
consumers.setParent(null);
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 06c74db..647ba68 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1810,6 +1810,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} finally {
messagesLock.writeLock().unlock();
}
+ if (sub != null && sub.getConsumerInfo().isNetworkSubscription()) {
+ getDestinationStatistics().getForwards().increment();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index d17fb2f..6b61379 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -285,6 +285,9 @@ public class TopicSubscription extends AbstractSubscription {
if (singleDestination && destination != null) {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
+ if (info.isNetworkSubscription()) {
+ destination.getDestinationStatistics().getForwards().add(ack.getMessageCount());
+ }
}
dequeueCounter.addAndGet(ack.getMessageCount());
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
index a6d6356..8a41d6a 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
@@ -95,7 +95,7 @@ public class DstatCommand extends AbstractJmxCommand {
// sort list so the names is A..Z
Collections.sort(queueList, new ObjectInstanceComparator());
- context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Memory %"));
+ context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %"));
// Iterate through the queue result
for (Object view : queueList) {
@@ -115,6 +115,7 @@ public class DstatCommand extends AbstractJmxCommand {
queueView.getConsumerCount(),
queueView.getEnqueueCount(),
queueView.getDequeueCount(),
+ queueView.getForwardCount(),
queueView.getMemoryPercentUsage()));
}
}
@@ -128,7 +129,7 @@ public class DstatCommand extends AbstractJmxCommand {
final String header = "%-50s %10s %10s %10s %10s %10s %10s";
final String tableRow = "%-50s %10d %10d %10d %10d %10d %10d";
- context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Memory %"));
+ context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %"));
Collections.sort(queueList, new ObjectInstanceComparator());
@@ -150,6 +151,7 @@ public class DstatCommand extends AbstractJmxCommand {
queueView.getConsumerCount(),
queueView.getEnqueueCount(),
queueView.getDequeueCount(),
+ queueView.getForwardCount(),
queueView.getMemoryPercentUsage()));
}
}
@@ -166,7 +168,7 @@ public class DstatCommand extends AbstractJmxCommand {
// sort list so the names is A..Z
Collections.sort(topicsList, new ObjectInstanceComparator());
- context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Memory %"));
+ context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %"));
// Iterate through the topics result
for (Object view : topicsList) {
@@ -186,6 +188,7 @@ public class DstatCommand extends AbstractJmxCommand {
topicView.getConsumerCount(),
topicView.getEnqueueCount(),
topicView.getDequeueCount(),
+ topicView.getForwardCount(),
topicView.getMemoryPercentUsage()));
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index a853b3e..30487c8 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -169,6 +169,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
assertTrue("use cache", queueNew.isUseCache());
assertTrue("cache enabled", queueNew.isCacheEnabled());
+ assertEquals("no forwards", 0, queueNew.getForwardCount());
}
public void testRemoveMessages() throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
index 1491ba2..020a511 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
@@ -21,6 +21,7 @@ import javax.jms.DeliveryMode;
import junit.framework.Test;
import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
@@ -72,6 +73,11 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
// Close consumer to cause the message to rollback.
connection1.send(consumerInfo1.createRemoveCommand());
+ final DestinationStatistics destinationStatistics = broker.getDestination(destination).getDestinationStatistics();
+ assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount());
+ assertEquals("broker dest stat dequeues", 0, destinationStatistics.getDequeues().getCount());
+ assertEquals("broker dest stat forwards", 0, destinationStatistics.getForwards().getCount());
+
// Now create remote consumer that should cause message to move to this
// remote consumer.
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
@@ -84,6 +90,15 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
return receiveMessage(connection2) != null;
}
}));
+
+ assertTrue("broker dest stat forwards", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 1 == destinationStatistics.getForwards().getCount();
+ }
+ }));
+ assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount());
+ assertEquals("remote broker dest stat dequeues", 1, remoteBroker.getDestination(destination).getDestinationStatistics().getDequeues().getCount());
}
public void initCombosForTestAddConsumerThenSend() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
index 33963b7..99deb28 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.MessageIdList;
/**
@@ -77,6 +78,9 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount());
assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount());
+
+ assertEquals("Correct forwards from A", MESSAGE_COUNT,
+ brokers.get("BrokerA").broker.getDestination(ActiveMQDestination.transform(dest)).getDestinationStatistics().getForwards().getCount());
}
public void initCombosForTestABandBCbrokerNetworkWithSelectors() {
[17/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5311 - encode xaErrorCode in
xaexception message
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5311 - encode xaErrorCode in xaexception message
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b41f9212
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b41f9212
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b41f9212
Branch: refs/heads/activemq-5.10.x
Commit: b41f92127c30f0d185a8c0dde03788b32b270b5e
Parents: 38b3140
Author: gtully <ga...@gmail.com>
Authored: Thu Aug 7 13:42:03 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 16:49:06 2014 -0500
----------------------------------------------------------------------
.../activemq/broker/TransactionBroker.java | 3 +--
.../activemq/transaction/XATransaction.java | 24 +++++++++++---------
.../org/apache/activemq/TransactionContext.java | 15 ++++++++++--
.../ActiveMQXAConnectionFactoryTest.java | 2 +-
4 files changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/b41f9212/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
index 26ddfab..4b5ff8f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
@@ -365,8 +365,7 @@ public class TransactionBroker extends BrokerFilter {
return transaction;
}
if (xid.isXATransaction()) {
- XAException e = new XAException("Transaction '" + xid + "' has not been started.");
- e.errorCode = XAException.XAER_NOTA;
+ XAException e = XATransaction.newXAException("Transaction '" + xid + "' has not been started.", XAException.XAER_NOTA);
throw e;
} else {
throw new JMSException("Transaction '" + xid + "' has not been started.");
http://git-wip-us.apache.org/repos/asf/activemq/blob/b41f9212/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java b/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java
index 3062125..8e31ec7 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transaction;
import java.io.IOException;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
+import org.apache.activemq.TransactionContext;
import org.apache.activemq.broker.TransactionBroker;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.TransactionId;
@@ -89,23 +90,20 @@ public class XATransaction extends Transaction {
} catch (Throwable t) {
LOG.warn("Store COMMIT FAILED: ", t);
rollback();
- XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back");
- xae.errorCode = XAException.XA_RBOTHER;
+ XAException xae = newXAException("STORE COMMIT FAILED: Transaction rolled back", XAException.XA_RBOTHER);
xae.initCause(t);
throw xae;
}
}
private void illegalStateTransition(String callName) throws XAException {
- XAException xae = new XAException("Cannot call " + callName + " now.");
- xae.errorCode = XAException.XAER_PROTO;
+ XAException xae = newXAException("Cannot call " + callName + " now.", XAException.XAER_PROTO);
throw xae;
}
private void checkForPreparedState(boolean onePhase) throws XAException {
if (!onePhase) {
- XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared");
- xae.errorCode = XAException.XAER_PROTO;
+ XAException xae = newXAException("Cannot do 2 phase commit if the transaction has not been prepared", XAException.XAER_PROTO);
throw xae;
}
}
@@ -118,8 +116,7 @@ public class XATransaction extends Transaction {
} catch (Throwable e) {
LOG.warn("PRE-PREPARE FAILED: ", e);
rollback();
- XAException xae = new XAException("PRE-PREPARE FAILED: Transaction rolled back");
- xae.errorCode = XAException.XA_RBOTHER;
+ XAException xae = newXAException("PRE-PREPARE FAILED: Transaction rolled back", XAException.XA_RBOTHER);
xae.initCause(e);
throw xae;
}
@@ -155,7 +152,7 @@ public class XATransaction extends Transaction {
doPostRollback();
break;
default:
- throw new XAException("Invalid state");
+ throw newXAException("Invalid state: " + getState(), XAException.XA_RBPROTO);
}
}
@@ -167,13 +164,18 @@ public class XATransaction extends Transaction {
// I guess this could happen. Post commit task failed
// to execute properly.
LOG.warn("POST ROLLBACK FAILED: ", e);
- XAException xae = new XAException("POST ROLLBACK FAILED");
- xae.errorCode = XAException.XAER_RMERR;
+ XAException xae = newXAException("POST ROLLBACK FAILED", XAException.XAER_RMERR);
xae.initCause(e);
throw xae;
}
}
+ public static XAException newXAException(String s, int errorCode) {
+ XAException xaException = new XAException(s + " " + TransactionContext.xaErrorCodeMarker + errorCode);
+ xaException.errorCode = errorCode;
+ return xaException;
+ }
+
@Override
public int prepare() throws XAException, IOException {
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/b41f9212/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
index c86f448..a444d61 100755
--- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
+++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
@@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
*/
public class TransactionContext implements XAResource {
+ public static final String xaErrorCodeMarker = "xaErrorCode:";
private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
// XATransactionId -> ArrayList of TransactionContext objects
@@ -786,8 +787,7 @@ public class TransactionContext implements XAResource {
xae.errorCode = original.errorCode;
if (xae.errorCode == XA_OK) {
// detail not unmarshalled see: org.apache.activemq.openwire.v1.BaseDataStreamMarshaller.createThrowable
- // so use a valid generic error code in place of ok
- xae.errorCode = XAException.XAER_RMERR;
+ xae.errorCode = parseFromMessageOr(original.getMessage(), XAException.XAER_RMERR);
}
xae.initCause(original);
return xae;
@@ -799,6 +799,17 @@ public class TransactionContext implements XAResource {
return xae;
}
+ private int parseFromMessageOr(String message, int fallbackCode) {
+ final String marker = "xaErrorCode:";
+ final int index = message.lastIndexOf(marker);
+ if (index > -1) {
+ try {
+ return Integer.parseInt(message.substring(index + marker.length()));
+ } catch (Exception ignored) {}
+ }
+ return fallbackCode;
+ }
+
public ActiveMQConnection getConnection() {
return connection;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/b41f9212/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
index 5fd6e5c..4b89851 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
@@ -468,7 +468,7 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport {
fail("Expected xa exception on no tx");
} catch (XAException expected) {
LOG.info("got expected xa", expected);
- assertTrue("not zero", expected.errorCode != XAResource.XA_OK);
+ assertEquals("no tx", XAException.XAER_NOTA, expected.errorCode);
}
connection.close();
broker.stop();
[14/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5016 - evolve the class in a
serialization compat way so that exiting kahadb stores can be read,
long field addition
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5016 - evolve the class in a serialization compat way so that exiting kahadb stores can be read, long field addition
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/902692e3
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/902692e3
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/902692e3
Branch: refs/heads/activemq-5.10.x
Commit: 902692e3ac5bb2d425f1fbd3e44648015f72a8f3
Parents: 283e3ce
Author: gtully <ga...@gmail.com>
Authored: Wed Aug 6 15:19:50 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 16:46:08 2014 -0500
----------------------------------------------------------------------
.../org/apache/activemq/util/BitArrayBin.java | 24 ++++++++++----------
1 file changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/902692e3/activemq-client/src/main/java/org/apache/activemq/util/BitArrayBin.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/BitArrayBin.java b/activemq-client/src/main/java/org/apache/activemq/util/BitArrayBin.java
index 528144e..d988ae1 100755
--- a/activemq-client/src/main/java/org/apache/activemq/util/BitArrayBin.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/BitArrayBin.java
@@ -29,9 +29,9 @@ public class BitArrayBin implements Serializable {
private static final long serialVersionUID = 1L;
private final LinkedList<BitArray> list;
private int maxNumberOfArrays;
- private long firstIndex = -1;
+ private int firstIndex = -1; // leave 'int' for old serialization compatibility and introduce new 'long' field
private long lastInOrderBit=-1;
-
+ private long longFirstIndex=-1;
/**
* Create a BitArrayBin to a certain window size (number of messages to
* keep)
@@ -90,7 +90,7 @@ public class BitArrayBin implements Serializable {
* @return true/false
*/
public boolean getBit(long index) {
- boolean answer = index >= firstIndex;
+ boolean answer = index >= longFirstIndex;
BitArray ba = getBitArray(index);
if (ba != null) {
int offset = getOffset(index);
@@ -119,7 +119,7 @@ public class BitArrayBin implements Serializable {
int overShoot = bin - maxNumberOfArrays + 1;
while (overShoot > 0) {
list.removeFirst();
- firstIndex += BitArray.LONG_SIZE;
+ longFirstIndex += BitArray.LONG_SIZE;
list.add(new BitArray());
overShoot--;
}
@@ -143,10 +143,10 @@ public class BitArrayBin implements Serializable {
*/
private int getBin(long index) {
int answer = 0;
- if (firstIndex < 0) {
- firstIndex = (int) (index - (index % BitArray.LONG_SIZE));
- } else if (firstIndex >= 0) {
- answer = (int)((index - firstIndex) / BitArray.LONG_SIZE);
+ if (longFirstIndex < 0) {
+ longFirstIndex = (int) (index - (index % BitArray.LONG_SIZE));
+ } else if (longFirstIndex >= 0) {
+ answer = (int)((index - longFirstIndex) / BitArray.LONG_SIZE);
}
return answer;
}
@@ -159,8 +159,8 @@ public class BitArrayBin implements Serializable {
*/
private int getOffset(long index) {
int answer = 0;
- if (firstIndex >= 0) {
- answer = (int)((index - firstIndex) - (BitArray.LONG_SIZE * getBin(index)));
+ if (longFirstIndex >= 0) {
+ answer = (int)((index - longFirstIndex) - (BitArray.LONG_SIZE * getBin(index)));
}
return answer;
}
@@ -168,8 +168,8 @@ public class BitArrayBin implements Serializable {
public long getLastSetIndex() {
long result = -1;
- if (firstIndex >=0) {
- result = firstIndex;
+ if (longFirstIndex >=0) {
+ result = longFirstIndex;
BitArray last = null;
for (int lastBitArrayIndex = maxNumberOfArrays -1; lastBitArrayIndex >= 0; lastBitArrayIndex--) {
last = list.get(lastBitArrayIndex);
[02/17] activemq git commit: Fix small typo in the docs
Posted by ha...@apache.org.
Fix small typo in the docs
This closes #35
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/119f3e95
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/119f3e95
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/119f3e95
Branch: refs/heads/activemq-5.10.x
Commit: 119f3e959f0329b8912af5afcf68930af107444c
Parents: 7f79704
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jul 16 10:35:23 2014 -0400
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Tue Dec 16 21:54:07 2014 -0500
----------------------------------------------------------------------
.../java/org/apache/activemq/security/AuthenticationUser.java | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/119f3e95/activemq-broker/src/main/java/org/apache/activemq/security/AuthenticationUser.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/security/AuthenticationUser.java b/activemq-broker/src/main/java/org/apache/activemq/security/AuthenticationUser.java
index 65d34d5..7f51d06 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/security/AuthenticationUser.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/security/AuthenticationUser.java
@@ -17,10 +17,10 @@
package org.apache.activemq.security;
/**
- * A helper object used to configure simple authentiaction plugin
- *
+ * A helper object used to configure simple authentication plugin
+ *
* @org.apache.xbean.XBean
- *
+ *
*/
public class AuthenticationUser {
@@ -60,5 +60,4 @@ public class AuthenticationUser {
public void setUsername(String username) {
this.username = username;
}
-
}
[03/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5279 - ensure poison on failover
redelivery only when delivery is not pending else where
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5279 - ensure poison on failover redelivery only when delivery is not pending else where
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9f61fd51
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9f61fd51
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9f61fd51
Branch: refs/heads/activemq-5.10.x
Commit: 9f61fd51a39fe97b9e0421953974ef165f07e7e7
Parents: 119f3e9
Author: gtully <ga...@gmail.com>
Authored: Thu Jul 17 17:07:19 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 14:46:28 2014 -0500
----------------------------------------------------------------------
.../org/apache/activemq/ActiveMQConnection.java | 6 +-
.../activemq/ActiveMQMessageConsumer.java | 67 ++++++++++++--------
.../failover/FailoverTransactionTest.java | 29 ++++-----
3 files changed, 60 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/9f61fd51/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index 326310c..2df8607 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -2215,7 +2215,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
@Override
@Deprecated
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
- return createInputStream(dest, messageSelector, noLocal, -1);
+ return createInputStream(dest, messageSelector, noLocal, -1);
}
@Override
@@ -2571,6 +2571,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
return this.executor;
}
+ protected CopyOnWriteArrayList<ActiveMQSession> getSessions() {
+ return sessions;
+ }
+
/**
* @return the checkForDuplicates
*/
http://git-wip-us.apache.org/repos/asf/activemq/blob/9f61fd51/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index e17a1bb..a60a7ac 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -1413,36 +1413,24 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
} else {
- if (!session.isTransacted()) {
- LOG.warn("Duplicate non transacted dispatch to consumer: " + getConsumerId() + ", poison acking: " + md);
- posionAck(md, "Duplicate non transacted delivery to " + getConsumerId());
- } else {
+ // deal with duplicate delivery
+ ConsumerId consumerWithPendingTransaction;
+ if (redeliveryExpectedInCurrentTransaction(md, true)) {
if (LOG.isDebugEnabled()) {
- LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
- }
- boolean needsPoisonAck = false;
- synchronized (deliveredMessages) {
- if (previouslyDeliveredMessages != null) {
- previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
- } else {
- // delivery while pending redelivery to another consumer on the same connection
- // not waiting for redelivery will help here
- needsPoisonAck = true;
- }
+ LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage());
}
- if (needsPoisonAck) {
- LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
- + " consumer on this connection, failoverRedeliveryWaitPeriod="
- + failoverRedeliveryWaitPeriod + ". Message: " + md);
- posionAck(md, "Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
- + session.getConnection().getConnectionInfo().getConnectionId());
+ if (transactedIndividualAck) {
+ immediateIndividualTransactedAck(md);
} else {
- if (transactedIndividualAck) {
- immediateIndividualTransactedAck(md);
- } else {
- session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
- }
+ session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
}
+ } else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) {
+ LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(), md.getMessage(), consumerWithPendingTransaction);
+ session.getConnection().rollbackDuplicate(this, md.getMessage());
+ dispatch(md);
+ } else {
+ LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
+ posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
}
}
}
@@ -1456,6 +1444,33 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
+ private boolean redeliveryExpectedInCurrentTransaction(MessageDispatch md, boolean markReceipt) {
+ if (session.isTransacted()) {
+ synchronized (deliveredMessages) {
+ if (previouslyDeliveredMessages != null) {
+ if (previouslyDeliveredMessages.containsKey(md.getMessage().getMessageId())) {
+ if (markReceipt) {
+ previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
+ }
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ private ConsumerId redeliveryPendingInCompetingTransaction(MessageDispatch md) {
+ for (ActiveMQSession activeMQSession: session.connection.getSessions()) {
+ for (ActiveMQMessageConsumer activeMQMessageConsumer : activeMQSession.consumers) {
+ if (activeMQMessageConsumer.redeliveryExpectedInCurrentTransaction(md, false)) {
+ return activeMQMessageConsumer.getConsumerId();
+ }
+ }
+ }
+ return null;
+ }
+
// async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
private void clearDeliveredList() {
if (clearDeliveredList) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/9f61fd51/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index c365d37..9b41713 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -831,7 +831,7 @@ public class FailoverTransactionTest extends TestSupport {
setDefaultPersistenceAdapter(broker);
broker.start();
- assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+ assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
LOG.info("received message count: " + receivedMessages.size());
@@ -841,7 +841,7 @@ public class FailoverTransactionTest extends TestSupport {
if (gotTransactionRolledBackException.get()) {
assertNotNull("should be available again after commit rollback ex", msg);
} else {
- assertNull("should be nothing left for consumer as recieve should have committed", msg);
+ assertNull("should be nothing left for consumer as receive should have committed", msg);
}
consumerSession1.commit();
@@ -1103,8 +1103,8 @@ public class FailoverTransactionTest extends TestSupport {
connection.close();
}
- public void testPoisonOnDeliveryWhilePending() throws Exception {
- LOG.info("testPoisonOnDeliveryWhilePending()");
+ public void testReDeliveryWhilePending() throws Exception {
+ LOG.info("testReDeliveryWhilePending()");
broker = createBroker(true);
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
@@ -1134,8 +1134,7 @@ public class FailoverTransactionTest extends TestSupport {
final Vector<Exception> exceptions = new Vector<Exception>();
- // commit may fail if other consumer gets the message on restart, it will be seen as a duplicate on the connection
- // but with no transaction and it pending on another consumer it will be poison
+ // commit may fail if other consumer gets the message on restart
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async commit...");
@@ -1149,24 +1148,24 @@ public class FailoverTransactionTest extends TestSupport {
}
});
- assertNull("consumer2 not get a message while pending to 1 or consumed by 1", consumer2.receive(2000));
assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
- // either message consumed or sent to dlq via poison on redelivery to wrong consumer
- // message should not be available again in any event
+ // either message redelivered in existing tx or consumed by consumer2
+ // should not be available again in any event
assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000));
// consumer replay is hashmap order dependent on a failover connection state recover so need to deal with both cases
if (exceptions.isEmpty()) {
- // commit succeeded, message was redelivered to the correct consumer after restart so commit was fine
+ LOG.info("commit succeeded, message was redelivered to the correct consumer after restart so commit was fine");
+ assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000));
} else {
- // message should be in dlq
- MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
- TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000);
- assertNotNull("found message in dlq", dlqMessage);
- assertEquals("text matches", "Test message", dlqMessage.getText());
+ LOG.info("commit failed, consumer2 should get it", exceptions.get(0));
+ assertNotNull("consumer2 got message", consumer2.receive(2000));
consumerSession.commit();
+ // no message should be in dlq
+ MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
+ assertNull("nothing in the dlq", dlqConsumer.receive(5000));
}
connection.close();
}
[13/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5304 - ignore the test for now
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5304 - ignore the test for now
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/283e3ced
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/283e3ced
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/283e3ced
Branch: refs/heads/activemq-5.10.x
Commit: 283e3ced1e6ac72e064b5f25ebaa50e207f27c32
Parents: 6bb5abf
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Wed Aug 6 15:23:58 2014 +0200
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 16:45:23 2014 -0500
----------------------------------------------------------------------
.../apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/283e3ced/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
----------------------------------------------------------------------
diff --git a/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java b/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
index 0f84c1b..4c6e2a9 100644
--- a/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
+++ b/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
@@ -17,6 +17,8 @@
package org.apache.activemq.karaf.itest;
import java.util.concurrent.Callable;
+
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.Option;
@@ -70,6 +72,7 @@ public class ActiveMQBrokerFeatureTest extends AbstractJmsFeatureTest {
}
@Test
+ @Ignore
public void testTemporaryDestinations() throws Throwable {
Connection connection = getConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
[16/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5304 - applying groupClass to
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5304 - applying groupClass to <tempDestinationAuthorizationEntry>
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/38b3140c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/38b3140c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/38b3140c
Branch: refs/heads/activemq-5.10.x
Commit: 38b3140cc0ff9fc9026ef76cc4a75fbb0bb98286
Parents: d8e3026
Author: Torsten Mielke <tm...@redhat.com>
Authored: Thu Aug 7 11:08:38 2014 +0200
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 16:47:35 2014 -0500
----------------------------------------------------------------------
.../activemq/security/TempDestinationAuthorizationEntry.java | 6 ------
.../org/apache/activemq/security/XBeanAuthorizationMap.java | 8 +++++++-
2 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/38b3140c/activemq-broker/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java b/activemq-broker/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java
index fe19e56..71f46f6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java
@@ -27,11 +27,6 @@ package org.apache.activemq.security;
public class TempDestinationAuthorizationEntry extends AuthorizationEntry {
public void afterPropertiesSet() throws Exception {
- // we don't need to check if destination is specified since
- // the TempDestinationAuthorizationEntry should map to all temp
- // destinations
-
-
if (adminRoles != null) {
setAdminACLs(parseACLs(adminRoles));
}
@@ -44,5 +39,4 @@ public class TempDestinationAuthorizationEntry extends AuthorizationEntry {
setReadACLs(parseACLs(readRoles));
}
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/38b3140c/activemq-spring/src/main/java/org/apache/activemq/security/XBeanAuthorizationMap.java
----------------------------------------------------------------------
diff --git a/activemq-spring/src/main/java/org/apache/activemq/security/XBeanAuthorizationMap.java b/activemq-spring/src/main/java/org/apache/activemq/security/XBeanAuthorizationMap.java
index 8d43efb..08ca5b3 100644
--- a/activemq-spring/src/main/java/org/apache/activemq/security/XBeanAuthorizationMap.java
+++ b/activemq-spring/src/main/java/org/apache/activemq/security/XBeanAuthorizationMap.java
@@ -59,9 +59,15 @@ public class XBeanAuthorizationMap extends DefaultAuthorizationMap implements In
}
// also check group class of temp destination ACL
- if (getTempDestinationAuthorizationEntry() != null && getTempDestinationAuthorizationEntry().getGroupClass() != null) {
+ // use the group class of the <authorizationMap> entry if this temp
+ // destination entry has no group class specified.
+ if (getTempDestinationAuthorizationEntry() != null) {
+ if (getTempDestinationAuthorizationEntry().getGroupClass() == null) {
+ getTempDestinationAuthorizationEntry().setGroupClass(groupClass);
+ }
getTempDestinationAuthorizationEntry().afterPropertiesSet();
}
+
super.setEntries(authorizationEntries);
}
[05/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-4000 - fix is good enable test so
we can close the issue
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-4000 - fix is good enable test so we can close the issue
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/903733b8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/903733b8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/903733b8
Branch: refs/heads/activemq-5.10.x
Commit: 903733b824e0583aa92ee2aefe9c10a9f20b3fb1
Parents: 060e8ae
Author: gtully <ga...@gmail.com>
Authored: Mon Jul 21 13:43:08 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 14:49:28 2014 -0500
----------------------------------------------------------------------
activemq-unit-tests/pom.xml | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/903733b8/activemq-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index 94fa637..b4eb36b 100755
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -617,8 +617,6 @@
<exclude>**/StoreQueueCursorJournalNoDuplicateTest.*</exclude>
<exclude>**/ThreeBrokerVirtualTopicNetworkAMQPATest.*</exclude>
<exclude>**/LevelDBXARecoveryBrokerTest.*</exclude>
- <!-- https://issues.apache.org/jira/browse/AMQ-4000 -->
- <exclude>**/DurableSubInBrokerNetworkTest.*</exclude>
</excludes>
</configuration>
</plugin>
[12/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5304 - providing test case
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5304 - providing test case
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6bb5abfc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6bb5abfc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6bb5abfc
Branch: refs/heads/activemq-5.10.x
Commit: 6bb5abfcf4706629090f6b0c39f7a523a14866e0
Parents: 07bfc1e
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Wed Aug 6 15:22:16 2014 +0200
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 16:45:03 2014 -0500
----------------------------------------------------------------------
.../karaf/itest/ActiveMQBrokerFeatureTest.java | 17 +++++++++++++++++
.../org/apache/activemq/karaf/itest/activemq.xml | 4 ++++
2 files changed, 21 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/6bb5abfc/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
----------------------------------------------------------------------
diff --git a/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java b/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
index bdc0cd7..0f84c1b 100644
--- a/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
+++ b/activemq-karaf-itest/src/test/java/org/apache/activemq/karaf/itest/ActiveMQBrokerFeatureTest.java
@@ -23,7 +23,13 @@ import org.ops4j.pax.exam.Option;
import org.ops4j.pax.exam.junit.Configuration;
import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@RunWith(JUnit4TestRunner.class)
@@ -63,4 +69,15 @@ public class ActiveMQBrokerFeatureTest extends AbstractJmsFeatureTest {
assertEquals("got our message", nameAndPayload, consumeMessage(nameAndPayload));
}
+ @Test
+ public void testTemporaryDestinations() throws Throwable {
+ Connection connection = getConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue temporaryQueue = session.createTemporaryQueue();
+ session.createProducer(temporaryQueue).send(session.createTextMessage("TEST"));
+ Message msg = session.createConsumer(temporaryQueue).receive(3000);
+ assertNotNull("Didn't receive the message", msg);
+ connection.close();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/6bb5abfc/activemq-karaf-itest/src/test/resources/org/apache/activemq/karaf/itest/activemq.xml
----------------------------------------------------------------------
diff --git a/activemq-karaf-itest/src/test/resources/org/apache/activemq/karaf/itest/activemq.xml b/activemq-karaf-itest/src/test/resources/org/apache/activemq/karaf/itest/activemq.xml
index 8932dfc..714b9fc 100644
--- a/activemq-karaf-itest/src/test/resources/org/apache/activemq/karaf/itest/activemq.xml
+++ b/activemq-karaf-itest/src/test/resources/org/apache/activemq/karaf/itest/activemq.xml
@@ -62,6 +62,10 @@
<authorizationEntry topic=">" read="admin" write="admin" admin="admin"/>
<authorizationEntry topic="ActiveMQ.Advisory.>" read="admin" write="admin" admin="admin"/>
</authorizationEntries>
+
+ <tempDestinationAuthorizationEntry>
+ <tempDestinationAuthorizationEntry read="admin" write="admin" admin="admin"/>
+ </tempDestinationAuthorizationEntry>
</authorizationMap>
</map>
</authorizationPlugin>
[06/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5080 - some additional trace
logging
Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5080 - some additional trace logging
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9146785b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9146785b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9146785b
Branch: refs/heads/activemq-5.10.x
Commit: 9146785b28d618379ba4f8f98c18ff430e96234a
Parents: 903733b
Author: gtully <ga...@gmail.com>
Authored: Thu Jul 24 15:18:42 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 14:54:37 2014 -0500
----------------------------------------------------------------------
.../src/main/java/org/apache/activemq/TransactionContext.java | 3 ++-
.../org/apache/activemq/ra/ActiveMQManagedConnection.java | 5 +++++
.../java/org/apache/activemq/ra/ActiveMQResourceAdapter.java | 7 ++-----
.../java/org/apache/activemq/ra/LocalAndXATransaction.java | 6 ++++++
4 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/9146785b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
index e780783..8e36ed2 100755
--- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
+++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
@@ -653,7 +653,7 @@ public class TransactionContext implements XAResource {
final FailoverTransport failoverTransport = this.connection.getTransport().narrow(FailoverTransport.class);
if (failoverTransport != null && !failoverTransport.isConnected()) {
// otherwise call will block on reconnect forfeting any app level periodic check
- XAException xaException = new XAException("Failover transport not connected: " + this.getConnection().getTransport());
+ XAException xaException = new XAException("Failover transport not connected: " + this.getConnection());
xaException.errorCode = XAException.XAER_RMERR;
throw xaException;
}
@@ -829,6 +829,7 @@ public class TransactionContext implements XAResource {
public String toString() {
return "TransactionContext{" +
"transactionId=" + transactionId +
+ ",connection=" + connection +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/9146785b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnection.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnection.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnection.java
index a694c12..8d53be1 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnection.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnection.java
@@ -399,4 +399,9 @@ public class ActiveMQManagedConnection implements ManagedConnection, ExceptionLi
return transactionContext;
}
+ @Override
+ public String toString() {
+ return "[" + super.toString() + "," + physicalConnection +"]";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/9146785b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
index b772beb..855ca43 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
@@ -352,12 +352,9 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
if (connection != null) {
try {
connection.close();
- } catch (JMSException ignored) {
-
- } finally {
- setConnection(original);
- }
+ } catch (JMSException ignored) {}
}
+ setConnection(original);
}
}};
http://git-wip-us.apache.org/repos/asf/activemq/blob/9146785b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
index f93ee0f..1dd7423 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
@@ -132,6 +132,7 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction {
public Xid[] recover(int arg0) throws XAException {
Xid[] answer = null;
+ LOG.trace("{} recover({})", new Object[]{this, arg0});
answer = transactionContext.recover(arg0);
LOG.trace("{} recover({}) = {}", new Object[]{this, arg0, answer});
return answer;
@@ -163,4 +164,9 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction {
transactionContext.cleanup();
inManagedTx = false;
}
+
+ @Override
+ public String toString() {
+ return "[" + super.toString() + "," + transactionContext + "]";
+ }
}