You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/21 23:56:18 UTC
[62/68] [abbrv] activemq-artemis git commit: fix AMQ1924Test
fix AMQ1924Test
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8791d389
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8791d389
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8791d389
Branch: refs/heads/refactor-openwire
Commit: 8791d3899bd5328ae882854caf171abc3acf570d
Parents: 14a77e3
Author: Howard Gao <ho...@gmail.com>
Authored: Tue Mar 15 23:11:39 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Mar 21 18:54:50 2016 -0400
----------------------------------------------------------------------
.../openwire/OpenWireProtocolManager.java | 5 ++
.../core/server/impl/ServerConsumerImpl.java | 64 +++++++++++++++-----
.../transport/failover/AMQ1925Test.java | 54 +++++++++--------
3 files changed, 82 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8791d389/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index abfcca5..3cb1215 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -488,6 +488,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
if (txSession != null) {
txSession.rollback(info);
}
+ else if (info.getTransactionId().isLocalTransaction()) {
+ //during a broker restart, recovered local transaction may not be registered
+ //in that case we ignore and let the tx removed silently by connection.
+ //see AMQ1925Test.testAMQ1925_TXBegin
+ }
else {
throw newXAException("Transaction '" + info.getTransactionId() + "' has not been started.", XAException.XAER_NOTA);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8791d389/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index b2ca0df..cb2cd38 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -89,6 +89,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private Object protocolContext;
+ private final ActiveMQServer server;
+
private SlowConsumerDetectionListener slowConsumerListener;
/**
@@ -153,8 +155,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
final SessionCallback callback,
final boolean preAcknowledge,
final boolean strictUpdateDeliveryCount,
- final ManagementService managementService) throws Exception {
- this(id, session, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, true, null);
+ final ManagementService managementService,
+ final ActiveMQServer server) throws Exception {
+ this(id, session, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, true, null, server);
}
public ServerConsumerImpl(final long id,
@@ -169,7 +172,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
final boolean strictUpdateDeliveryCount,
final ManagementService managementService,
final boolean supportLargeMessage,
- final Integer credits) throws Exception {
+ final Integer credits,
+ final ActiveMQServer server) throws Exception {
this.id = id;
this.filter = filter;
@@ -214,6 +218,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
availableCredits.set(credits);
}
}
+
+ this.server = server;
}
@Override
@@ -398,7 +404,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
finally {
lockDelivery.readLock().unlock();
+ callback.afterDelivery();
}
+
}
@Override
@@ -583,12 +591,19 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override
public void setStarted(final boolean started) {
synchronized (lock) {
- lockDelivery.writeLock().lock();
+ boolean locked = lockDelivery();
+
+ // This is to make sure nothing would sneak to the client while started = false
+ // the client will stop the session and perform a rollback in certain cases.
+ // in case something sneaks to the client you could get to messaging delivering forever until
+ // you restart the server
try {
this.started = browseOnly || started;
}
finally {
- lockDelivery.writeLock().unlock();
+ if (locked) {
+ lockDelivery.writeLock().unlock();
+ }
}
}
@@ -598,21 +613,38 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
}
- @Override
- public void setTransferring(final boolean transferring) {
- synchronized (lock) {
- this.transferring = transferring;
- }
-
- // This is to make sure that the delivery process has finished any pending delivery
- // otherwise a message may sneak in on the client while we are trying to stop the consumer
+ private boolean lockDelivery() {
try {
- lockDelivery.writeLock().lock();
+ if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
+ ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
+ if (server != null) {
+ server.threadDump();
+ }
+ return false;
+ }
+ return true;
}
- finally {
- lockDelivery.writeLock().unlock();
+ catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ return false;
}
+ }
+ @Override
+ public void setTransferring(final boolean transferring) {
+ synchronized (lock) {
+ // This is to make sure that the delivery process has finished any pending delivery
+ // otherwise a message may sneak in on the client while we are trying to stop the consumer
+ boolean locked = lockDelivery();
+ try {
+ this.transferring = transferring;
+ }
+ finally {
+ if (locked) {
+ lockDelivery.writeLock().unlock();
+ }
+ }
+ }
// Outside the lock
if (transferring) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8791d389/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
index 3d75905..564fd86 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
@@ -33,8 +33,6 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
-import javax.management.MBeanServer;
-import javax.management.MBeanServerFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -235,32 +233,39 @@ public class AMQ1925Test extends OpenwireArtemisBaseTest implements ExceptionLis
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
boolean restartDone = false;
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message message = consumer.receive(5000);
- Assert.assertNotNull(message);
+ try {
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message message = consumer.receive(5000);
+ Assert.assertNotNull(message);
- if (i == 222 && !restartDone) {
- // Simulate broker failure & restart
- bs.stop();
- bs = createNewServer();
- bs.start();
- restartDone = true;
- }
+ if (i == 222 && !restartDone) {
+ // Simulate broker failure & restart
+ bs.stop();
+ bs = createNewServer();
+ bs.start();
+ restartDone = true;
+ }
- Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
- try {
- session.commit();
- }
- catch (TransactionRolledBackException expectedOnOccasion) {
- log.info("got rollback: " + expectedOnOccasion);
- i--;
+ Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
+ try {
+ session.commit();
+ }
+ catch (TransactionRolledBackException expectedOnOccasion) {
+ log.info("got rollback: " + expectedOnOccasion);
+ i--;
+ }
}
+ Assert.assertNull(consumer.receive(500));
+ }
+ catch (Exception eee) {
+ log.error("got exception", eee);
+ throw eee;
+ }
+ finally {
+ consumer.close();
+ session.close();
+ connection.close();
}
- Assert.assertNull(consumer.receive(500));
-
- consumer.close();
- session.close();
- connection.close();
assertQueueEmpty();
Assert.assertNull("no exception on connection listener: " + exception, exception);
@@ -368,7 +373,6 @@ public class AMQ1925Test extends OpenwireArtemisBaseTest implements ExceptionLis
} catch (Exception e) {
log.error(e);
}
-
}
public void onException(JMSException exception) {