You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/04/12 16:35:33 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1111 Avoid deadlock on
AMQP delivery during close
Repository: activemq-artemis
Updated Branches:
refs/heads/master 3ff9057ac -> 851803daa
ARTEMIS-1111 Avoid deadlock on AMQP delivery during close
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/930df5b6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/930df5b6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/930df5b6
Branch: refs/heads/master
Commit: 930df5b6639e4cd4e7459e5daff1fd6469e80a22
Parents: 3ff9057
Author: Martyn Taylor <mt...@redhat.com>
Authored: Wed Apr 12 14:38:06 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Apr 12 12:35:17 2017 -0400
----------------------------------------------------------------------
.../amqp/broker/AMQPSessionCallback.java | 73 ++++++++++++++------
.../amqp/proton/AMQPConnectionContext.java | 44 +++++++++---
.../amqp/proton/AMQPSessionContext.java | 26 +++++--
.../proton/ProtonServerReceiverContext.java | 17 +++--
.../amqp/proton/ProtonServerSenderContext.java | 65 ++++++++++++-----
.../amqp/proton/handler/ProtonHandler.java | 66 +++++++++++-------
.../transaction/ProtonTransactionHandler.java | 37 +++++++---
.../tests/integration/amqp/ProtonTest.java | 39 +++++++++++
8 files changed, 275 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 58d51db..2682e0f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -92,10 +92,6 @@ public class AMQPSessionCallback implements SessionCallback {
private final AtomicBoolean draining = new AtomicBoolean(false);
- public Object getProtonLock() {
- return connection.getLock();
- }
-
public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
ProtonProtocolManager manager,
AMQPConnectionContext connection,
@@ -203,19 +199,31 @@ public class AMQPSessionCallback implements SessionCallback {
serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false);
}
- public void createTemporaryQueue(String address, String queueName, RoutingType routingType, String filter) throws Exception {
- serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false);
+ public void createTemporaryQueue(String address,
+ String queueName,
+ RoutingType routingType,
+ String filter) throws Exception {
+ serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false);
}
- public void createUnsharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
+ public void createUnsharedDurableQueue(String address,
+ RoutingType routingType,
+ String queueName,
+ String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false);
}
- public void createSharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
+ public void createSharedDurableQueue(String address,
+ RoutingType routingType,
+ String queueName,
+ String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false);
}
- public void createSharedVolatileQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
+ public void createSharedVolatileQueue(String address,
+ RoutingType routingType,
+ String queueName,
+ String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true);
}
@@ -250,7 +258,9 @@ public class AMQPSessionCallback implements SessionCallback {
return bindingQueryResult.isExists();
}
- public AddressQueryResult addressQuery(String addressName, RoutingType routingType, boolean autoCreate) throws Exception {
+ public AddressQueryResult addressQuery(String addressName,
+ RoutingType routingType,
+ boolean autoCreate) throws Exception {
AddressQueryResult addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
@@ -395,9 +405,13 @@ public class AMQPSessionCallback implements SessionCallback {
condition.setDescription(errorMessage);
Rejected rejected = new Rejected();
rejected.setError(condition);
- synchronized (connection.getLock()) {
+
+ connection.lock();
+ try {
delivery.disposition(rejected);
delivery.settle();
+ } finally {
+ connection.unlock();
}
connection.flush();
}
@@ -415,7 +429,8 @@ public class AMQPSessionCallback implements SessionCallback {
manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
@Override
public void done() {
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance());
@@ -426,15 +441,20 @@ public class AMQPSessionCallback implements SessionCallback {
delivery.disposition(Accepted.getInstance());
}
delivery.settle();
+ } finally {
+ connection.unlock();
}
connection.flush();
}
@Override
public void onError(int errorCode, String errorMessage) {
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
connection.flush();
+ } finally {
+ connection.unlock();
}
}
});
@@ -449,9 +469,12 @@ public class AMQPSessionCallback implements SessionCallback {
final Receiver receiver) {
try {
if (address == null) {
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
receiver.flow(credits);
connection.flush();
+ } finally {
+ connection.unlock();
}
return;
}
@@ -505,9 +528,12 @@ public class AMQPSessionCallback implements SessionCallback {
try {
return plugSender.deliverMessage(ref, deliveryCount);
} catch (Exception e) {
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
connection.flush();
+ } finally {
+ connection.unlock();
}
throw new IllegalStateException("Can't deliver message " + e, e);
}
@@ -538,13 +564,14 @@ public class AMQPSessionCallback implements SessionCallback {
@Override
public void disconnect(ServerConsumer consumer, String queueName) {
ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
+ connection.lock();
try {
- synchronized (connection.getLock()) {
- ((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
- connection.flush();
- }
+ ((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
+ connection.flush();
} catch (ActiveMQAMQPException e) {
logger.error("Error closing link for " + consumer.getQueue().getAddress());
+ } finally {
+ connection.unlock();
}
}
@@ -567,18 +594,18 @@ public class AMQPSessionCallback implements SessionCallback {
return protonSPI.newTransaction();
}
-
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
return serverSession.getMatchingQueue(address, routingType);
}
-
- public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
+ public SimpleString getMatchingQueue(SimpleString address,
+ SimpleString queueName,
+ RoutingType routingType) throws Exception {
return serverSession.getMatchingQueue(address, queueName, routingType);
}
public AddressInfo getAddress(SimpleString address) {
- return serverSession.getAddress(address);
+ return serverSession.getAddress(address);
}
public void removeTemporaryQueue(String address) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index a884f0d..2c968c7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
@@ -128,10 +129,18 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
return false;
}
- public Object getLock() {
+ public ReentrantLock getLock() {
return handler.getLock();
}
+ public void lock() {
+ handler.getLock().lock();
+ }
+
+ public void unlock() {
+ handler.getLock().unlock();
+ }
+
public int capacity() {
return handler.capacity();
}
@@ -319,7 +328,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
handler.flushBytes();
}
-
@Override
public void pushBytes(ByteBuf bytes) {
connectionCallback.onTransport(bytes, this);
@@ -327,7 +335,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onRemoteOpen(Connection connection) throws Exception {
- synchronized (getLock()) {
+ lock();
+ try {
try {
initInternal();
} catch (Exception e) {
@@ -342,6 +351,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
connection.open();
}
+ } finally {
+ unlock();
}
initialise();
@@ -367,9 +378,12 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onRemoteClose(Connection connection) {
- synchronized (getLock()) {
+ lock();
+ try {
connection.close();
connection.free();
+ } finally {
+ unlock();
}
for (AMQPSessionContext protonSession : sessions.values()) {
@@ -390,8 +404,11 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onRemoteOpen(Session session) throws Exception {
getSessionExtension(session).initialise();
- synchronized (getLock()) {
+ lock();
+ try {
session.open();
+ } finally {
+ unlock();
}
}
@@ -401,9 +418,12 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onRemoteClose(Session session) throws Exception {
- synchronized (getLock()) {
+ lock();
+ try {
session.close();
session.free();
+ } finally {
+ unlock();
}
AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
@@ -428,10 +448,14 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onRemoteClose(Link link) throws Exception {
- synchronized (getLock()) {
+ lock();
+ try {
link.close();
link.free();
+ } finally {
+ unlock();
}
+
ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
if (linkContext != null) {
linkContext.close(true);
@@ -440,11 +464,13 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onRemoteDetach(Link link) throws Exception {
- synchronized (getLock()) {
+ lock();
+ try {
link.detach();
link.free();
+ } finally {
+ unlock();
}
-
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index c2c1f2d..72833e3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -147,9 +147,12 @@ public class AMQPSessionContext extends ProtonInitializable {
coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));
receiver.setContext(transactionHandler);
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
receiver.open();
receiver.flow(connection.getAmqpCredits());
+ } finally {
+ connection.unlock();
}
}
@@ -163,16 +166,23 @@ public class AMQPSessionContext extends ProtonInitializable {
senders.put(sender, protonSender);
serverSenders.put(protonSender.getBrokerConsumer(), protonSender);
sender.setContext(protonSender);
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
sender.open();
+ } finally {
+ connection.unlock();
}
+
protonSender.start();
} catch (ActiveMQAMQPException e) {
senders.remove(sender);
sender.setSource(null);
sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
sender.close();
+ } finally {
+ connection.unlock();
}
}
}
@@ -191,15 +201,21 @@ public class AMQPSessionContext extends ProtonInitializable {
protonReceiver.initialise();
receivers.put(receiver, protonReceiver);
receiver.setContext(protonReceiver);
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
receiver.open();
+ } finally {
+ connection.unlock();
}
} catch (ActiveMQAMQPException e) {
receivers.remove(receiver);
receiver.setTarget(null);
receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
receiver.close();
+ } finally {
+ connection.unlock();
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 20ef1df..2606482 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -117,7 +117,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (remoteDesiredCapabilities != null) {
List<Symbol> list = Arrays.asList(remoteDesiredCapabilities);
if (list.contains(AmqpSupport.DELAYED_DELIVERY)) {
- receiver.setOfferedCapabilities(new Symbol[] {AmqpSupport.DELAYED_DELIVERY});
+ receiver.setOfferedCapabilities(new Symbol[]{AmqpSupport.DELAYED_DELIVERY});
}
}
}
@@ -179,9 +179,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
condition.setCondition(Symbol.valueOf("failed"));
condition.setDescription(e.getMessage());
rejected.setError(condition);
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
delivery.disposition(rejected);
delivery.settle();
+ } finally {
+ connection.unlock();
}
}
}
@@ -210,16 +213,22 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (sessionSPI != null) {
sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
} else {
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
receiver.flow(credits);
+ } finally {
+ connection.unlock();
}
connection.flush();
}
}
public void drain(int credits) {
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
receiver.drain(credits);
+ } finally {
+ connection.unlock();
}
connection.flush();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index ca14f97..756a3d9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -20,6 +20,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
@@ -95,7 +96,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private boolean isVolatile = false;
private String tempQueueName;
- public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
+ public ProtonServerSenderContext(AMQPConnectionContext connection,
+ Sender sender,
+ AMQPSessionContext protonSession,
+ AMQPSessionCallback server) {
super();
this.connection = connection;
this.sender = sender;
@@ -246,7 +250,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
//check to see if the client has defined how we act
boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source);
- if (clientDefined) {
+ if (clientDefined) {
multicast = hasCapabilities(TOPIC, source);
AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
if (!addressQueryResult.isExists()) {
@@ -293,9 +297,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
supportedFilters.put(filter.getKey(), filter.getValue());
}
-
if (queueNameToUse != null) {
- SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST );
+ SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST);
queue = matchingAnycastQueue.toString();
}
//if the address specifies a broker configured queue then we always use this, treat it as a queue
@@ -313,8 +316,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (result.isExists()) {
// If a client reattaches to a durable subscription with a different no-local
// filter value, selector or address then we must recreate the queue (JMS semantics).
- if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) ||
- (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
+ if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
if (result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue);
@@ -392,7 +394,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
try {
- brokerConsumer = (Consumer)sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
+ brokerConsumer = (Consumer) sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
} catch (ActiveMQAMQPResourceLimitExceededException e1) {
throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
} catch (Exception e) {
@@ -404,7 +406,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return connection.getRemoteContainer();
}
-
/*
* close the session
*/
@@ -415,8 +416,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
sender.setCondition(condition);
}
protonSession.removeSender(sender);
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
sender.close();
+ } finally {
+ connection.unlock();
}
connection.flush();
@@ -442,7 +446,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
Source source = (Source) sender.getSource();
if (source != null && source.getAddress() != null && multicast) {
String queueName = source.getAddress();
- QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false);
+ QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false);
if (result.isExists() && source.getDynamic()) {
sessionSPI.deleteQueue(queueName);
} else {
@@ -489,8 +493,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
DeliveryState remoteState;
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
remoteState = delivery.getRemoteState();
+ } finally {
+ connection.unlock();
}
boolean settleImmediate = true;
@@ -509,8 +516,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance());
txAccepted.setTxnId(txState.getTxnId());
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
delivery.disposition(txAccepted);
+ } finally {
+ connection.unlock();
}
}
// we have to individual ack as we can't guarantee we will get the delivery
@@ -556,7 +566,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
Modified modification = (Modified) remoteState;
if (Boolean.TRUE.equals(modification.getUndeliverableHere())) {
- message.rejectConsumer(((Consumer)brokerConsumer).sequentialID());
+ message.rejectConsumer(((Consumer) brokerConsumer).sequentialID());
}
if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
@@ -585,8 +595,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
public void settle(Delivery delivery) {
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
delivery.settle();
+ } finally {
+ connection.unlock();
}
}
@@ -617,10 +630,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
int size = nettyBuffer.writerIndex();
- synchronized (connection.getLock()) {
- if (sender.getLocalState() == EndpointState.CLOSED) {
+ while (!connection.getLock().tryLock(1, TimeUnit.SECONDS)) {
+ if (closed || sender.getLocalState() == EndpointState.CLOSED) {
+ // If we're waiting on the connection lock, the link might be in the process of closing. If this happens
+ // we return.
return 0;
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Couldn't get lock on deliverMessage " + this);
+ }
}
+ }
+
+ try {
final Delivery delivery;
delivery = sender.delivery(tag, 0, tag.length);
delivery.setMessageFormat((int) message.getMessageFormat());
@@ -636,10 +658,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else {
sender.advance();
}
+ connection.flush();
+ } finally {
+ connection.unlock();
}
- connection.flush();
-
return size;
} finally {
nettyBuffer.release();
@@ -659,7 +682,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return false;
}
- private static String createQueueName(String clientId, String pubId, boolean shared, boolean global, boolean isVolatile) {
+ private static String createQueueName(String clientId,
+ String pubId,
+ boolean shared,
+ boolean global,
+ boolean isVolatile) {
String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId;
if (shared) {
if (queue.contains("|")) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 91b252b..fc6cbf6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
@@ -58,7 +59,7 @@ public class ProtonHandler extends ProtonInitializable {
private Sasl serverSasl;
- private final Object lock = new Object();
+ private final ReentrantLock lock = new ReentrantLock();
private final long creationTime;
@@ -79,38 +80,41 @@ public class ProtonHandler extends ProtonInitializable {
}
public long tick(boolean firstTick) {
+ lock.lock();
try {
- synchronized (lock) {
- if (!firstTick) {
- try {
- if (connection.getLocalState() != EndpointState.CLOSED) {
- long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
- if (transport.isClosed()) {
- throw new IllegalStateException("Channel was inactive for to long");
- }
- return rescheduleAt;
+ if (!firstTick) {
+ try {
+ if (connection.getLocalState() != EndpointState.CLOSED) {
+ long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+ if (transport.isClosed()) {
+ throw new IllegalStateException("Channel was inactive for to long");
}
- } catch (Exception e) {
- log.warn(e.getMessage(), e);
- transport.close();
- connection.setCondition(new ErrorCondition());
+ return rescheduleAt;
}
- return 0;
+ } catch (Exception e) {
+ log.warn(e.getMessage(), e);
+ transport.close();
+ connection.setCondition(new ErrorCondition());
}
- return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+ return 0;
}
+ return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
} finally {
+ lock.unlock();
flushBytes();
}
}
public int capacity() {
- synchronized (lock) {
+ lock.lock();
+ try {
return transport.capacity();
+ } finally {
+ lock.unlock();
}
}
- public Object getLock() {
+ public ReentrantLock getLock() {
return lock;
}
@@ -142,7 +146,8 @@ public class ProtonHandler extends ProtonInitializable {
}
public void flushBytes() {
- synchronized (lock) {
+ lock.lock();
+ try {
while (true) {
int pending = transport.pending();
@@ -161,17 +166,19 @@ public class ProtonHandler extends ProtonInitializable {
transport.pop(pending);
}
+ } finally {
+ lock.unlock();
}
}
-
public SASLResult getSASLResult() {
return saslResult;
}
public void inputBuffer(ByteBuf buffer) {
dataReceived = true;
- synchronized (lock) {
+ lock.lock();
+ try {
while (buffer.readableBytes() > 0) {
int capacity = transport.capacity();
@@ -208,6 +215,8 @@ public class ProtonHandler extends ProtonInitializable {
break;
}
}
+ } finally {
+ lock.unlock();
}
}
@@ -224,20 +233,26 @@ public class ProtonHandler extends ProtonInitializable {
}
public void flush() {
- synchronized (lock) {
+ lock.lock();
+ try {
transport.process();
checkServerSASL();
+ } finally {
+ lock.unlock();
}
dispatch();
}
public void close(ErrorCondition errorCondition) {
- synchronized (lock) {
+ lock.lock();
+ try {
if (errorCondition != null) {
connection.setCondition(errorCondition);
}
connection.close();
+ } finally {
+ lock.unlock();
}
flush();
@@ -283,7 +298,8 @@ public class ProtonHandler extends ProtonInitializable {
private void dispatch() {
Event ev;
- synchronized (lock) {
+ lock.lock();
+ try {
if (inDispatch) {
// Avoid recursion from events
return;
@@ -309,6 +325,8 @@ public class ProtonHandler extends ProtonInitializable {
} finally {
inDispatch = false;
}
+ } finally {
+ lock.unlock();
}
flushBytes();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index f817ed4..4579f1c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -72,7 +72,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
ByteBuffer buffer;
MessageImpl msg;
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
// Replenish coordinator receiver credit on exhaustion so sender can continue
// transaction declare and discahrge operations.
if (receiver.getCredit() < amqpLowMark) {
@@ -94,6 +95,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
receiver.advance();
msg = decodeMessage(buffer);
+ } finally {
+ connection.unlock();
}
Object action = ((AmqpValue) msg.getBody()).getValue();
@@ -102,45 +105,63 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
Binary txID = sessionSPI.newTransaction();
Declared declared = new Declared();
declared.setTxnId(txID);
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
delivery.disposition(declared);
+ } finally {
+ connection.unlock();
}
} else if (action instanceof Discharge) {
Discharge discharge = (Discharge) action;
Binary txID = discharge.getTxnId();
- ProtonTransactionImpl tx = (ProtonTransactionImpl)sessionSPI.getTransaction(txID, true);
+ ProtonTransactionImpl tx = (ProtonTransactionImpl) sessionSPI.getTransaction(txID, true);
tx.discharge();
if (discharge.getFail()) {
tx.rollback();
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
delivery.disposition(new Accepted());
+ } finally {
+ connection.unlock();
}
connection.flush();
} else {
tx.commit();
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
delivery.disposition(new Accepted());
+ } finally {
+ connection.unlock();
}
connection.flush();
}
}
} catch (ActiveMQAMQPException amqpE) {
log.warn(amqpE.getMessage(), amqpE);
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
+ } finally {
+ connection.unlock();
}
connection.flush();
} catch (Throwable e) {
log.warn(e.getMessage(), e);
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
+ } finally {
+ connection.unlock();
}
connection.flush();
} finally {
- synchronized (connection.getLock()) {
+ connection.lock();
+ try {
delivery.settle();
+ } finally {
+ connection.unlock();
}
connection.flush();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index 09a44dd..4ee94c2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -1585,6 +1585,45 @@ public class ProtonTest extends ProtonTestBase {
}
@Test
+ public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable {
+ String name = "exampleQueue1";
+
+ int numMessages = 50;
+
+ System.out.println("1. Send messages into queue");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue queue = session.createQueue(name);
+ MessageProducer p = session.createProducer(queue);
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage message = session.createTextMessage();
+ message.setText("Message temporary");
+ p.send(message);
+ }
+ p.close();
+ session.close();
+
+ System.out.println("2. Receive one by one, each in its own session");
+ for (int i = 0; i < numMessages; i++) {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = session.createQueue(name);
+ MessageConsumer c = session.createConsumer(queue);
+ Message m = c.receive(1000);
+ p.close();
+ session.close();
+ }
+
+ System.out.println("3. Try to receive 10 in the same session");
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = session.createQueue(name);
+ MessageConsumer c = session.createConsumer(queue);
+ for (int i = 0; i < numMessages; i++) {
+ Message m = c.receive(1000);
+ }
+ p.close();
+ session.close();
+ }
+
+ @Test
public void testSimpleObject() throws Throwable {
final int numMessages = 1;
long time = System.currentTimeMillis();
[2/2] activemq-artemis git commit: This closes #1202
Posted by cl...@apache.org.
This closes #1202
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/851803da
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/851803da
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/851803da
Branch: refs/heads/master
Commit: 851803daa144b42951721bb1965fa30b74d15992
Parents: 3ff9057 930df5b
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Apr 12 12:35:26 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Apr 12 12:35:26 2017 -0400
----------------------------------------------------------------------
.../amqp/broker/AMQPSessionCallback.java | 73 ++++++++++++++------
.../amqp/proton/AMQPConnectionContext.java | 44 +++++++++---
.../amqp/proton/AMQPSessionContext.java | 26 +++++--
.../proton/ProtonServerReceiverContext.java | 17 +++--
.../amqp/proton/ProtonServerSenderContext.java | 65 ++++++++++++-----
.../amqp/proton/handler/ProtonHandler.java | 66 +++++++++++-------
.../transaction/ProtonTransactionHandler.java | 37 +++++++---
.../tests/integration/amqp/ProtonTest.java | 39 +++++++++++
8 files changed, 275 insertions(+), 92 deletions(-)
----------------------------------------------------------------------