You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/04/12 16:35:33 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1111 Avoid deadlock on AMQP delivery during close

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 3ff9057ac -> 851803daa


ARTEMIS-1111 Avoid deadlock on AMQP delivery during close


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/930df5b6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/930df5b6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/930df5b6

Branch: refs/heads/master
Commit: 930df5b6639e4cd4e7459e5daff1fd6469e80a22
Parents: 3ff9057
Author: Martyn Taylor <mt...@redhat.com>
Authored: Wed Apr 12 14:38:06 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Apr 12 12:35:17 2017 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        | 73 ++++++++++++++------
 .../amqp/proton/AMQPConnectionContext.java      | 44 +++++++++---
 .../amqp/proton/AMQPSessionContext.java         | 26 +++++--
 .../proton/ProtonServerReceiverContext.java     | 17 +++--
 .../amqp/proton/ProtonServerSenderContext.java  | 65 ++++++++++++-----
 .../amqp/proton/handler/ProtonHandler.java      | 66 +++++++++++-------
 .../transaction/ProtonTransactionHandler.java   | 37 +++++++---
 .../tests/integration/amqp/ProtonTest.java      | 39 +++++++++++
 8 files changed, 275 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 58d51db..2682e0f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -92,10 +92,6 @@ public class AMQPSessionCallback implements SessionCallback {
 
    private final AtomicBoolean draining = new AtomicBoolean(false);
 
-   public Object getProtonLock() {
-      return connection.getLock();
-   }
-
    public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
                               ProtonProtocolManager manager,
                               AMQPConnectionContext connection,
@@ -203,19 +199,31 @@ public class AMQPSessionCallback implements SessionCallback {
       serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false);
    }
 
-   public void createTemporaryQueue(String address, String queueName, RoutingType routingType, String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName),  routingType, SimpleString.toSimpleString(filter), true, false);
+   public void createTemporaryQueue(String address,
+                                    String queueName,
+                                    RoutingType routingType,
+                                    String filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false);
    }
 
-   public void createUnsharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
+   public void createUnsharedDurableQueue(String address,
+                                          RoutingType routingType,
+                                          String queueName,
+                                          String filter) throws Exception {
       serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false);
    }
 
-   public void createSharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
+   public void createSharedDurableQueue(String address,
+                                        RoutingType routingType,
+                                        String queueName,
+                                        String filter) throws Exception {
       serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false);
    }
 
-   public void createSharedVolatileQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
+   public void createSharedVolatileQueue(String address,
+                                         RoutingType routingType,
+                                         String queueName,
+                                         String filter) throws Exception {
       serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true);
    }
 
@@ -250,7 +258,9 @@ public class AMQPSessionCallback implements SessionCallback {
       return bindingQueryResult.isExists();
    }
 
-   public AddressQueryResult addressQuery(String addressName, RoutingType routingType, boolean autoCreate) throws Exception {
+   public AddressQueryResult addressQuery(String addressName,
+                                          RoutingType routingType,
+                                          boolean autoCreate) throws Exception {
       AddressQueryResult addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
 
       if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
@@ -395,9 +405,13 @@ public class AMQPSessionCallback implements SessionCallback {
       condition.setDescription(errorMessage);
       Rejected rejected = new Rejected();
       rejected.setError(condition);
-      synchronized (connection.getLock()) {
+
+      connection.lock();
+      try {
          delivery.disposition(rejected);
          delivery.settle();
+      } finally {
+         connection.unlock();
       }
       connection.flush();
    }
@@ -415,7 +429,8 @@ public class AMQPSessionCallback implements SessionCallback {
          manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
             @Override
             public void done() {
-               synchronized (connection.getLock()) {
+               connection.lock();
+               try {
                   if (delivery.getRemoteState() instanceof TransactionalState) {
                      TransactionalState txAccepted = new TransactionalState();
                      txAccepted.setOutcome(Accepted.getInstance());
@@ -426,15 +441,20 @@ public class AMQPSessionCallback implements SessionCallback {
                      delivery.disposition(Accepted.getInstance());
                   }
                   delivery.settle();
+               } finally {
+                  connection.unlock();
                }
                connection.flush();
             }
 
             @Override
             public void onError(int errorCode, String errorMessage) {
-               synchronized (connection.getLock()) {
+               connection.lock();
+               try {
                   receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
                   connection.flush();
+               } finally {
+                  connection.unlock();
                }
             }
          });
@@ -449,9 +469,12 @@ public class AMQPSessionCallback implements SessionCallback {
                                    final Receiver receiver) {
       try {
          if (address == null) {
-            synchronized (connection.getLock()) {
+            connection.lock();
+            try {
                receiver.flow(credits);
                connection.flush();
+            } finally {
+               connection.unlock();
             }
             return;
          }
@@ -505,9 +528,12 @@ public class AMQPSessionCallback implements SessionCallback {
       try {
          return plugSender.deliverMessage(ref, deliveryCount);
       } catch (Exception e) {
-         synchronized (connection.getLock()) {
+         connection.lock();
+         try {
             plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
             connection.flush();
+         } finally {
+            connection.unlock();
          }
          throw new IllegalStateException("Can't deliver message " + e, e);
       }
@@ -538,13 +564,14 @@ public class AMQPSessionCallback implements SessionCallback {
    @Override
    public void disconnect(ServerConsumer consumer, String queueName) {
       ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
+      connection.lock();
       try {
-         synchronized (connection.getLock()) {
-            ((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
-            connection.flush();
-         }
+         ((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
+         connection.flush();
       } catch (ActiveMQAMQPException e) {
          logger.error("Error closing link for " + consumer.getQueue().getAddress());
+      } finally {
+         connection.unlock();
       }
    }
 
@@ -567,18 +594,18 @@ public class AMQPSessionCallback implements SessionCallback {
       return protonSPI.newTransaction();
    }
 
-
    public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
       return serverSession.getMatchingQueue(address, routingType);
    }
 
-
-   public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
+   public SimpleString getMatchingQueue(SimpleString address,
+                                        SimpleString queueName,
+                                        RoutingType routingType) throws Exception {
       return serverSession.getMatchingQueue(address, queueName, routingType);
    }
 
    public AddressInfo getAddress(SimpleString address) {
-      return  serverSession.getAddress(address);
+      return serverSession.getAddress(address);
    }
 
    public void removeTemporaryQueue(String address) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index a884f0d..2c968c7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
@@ -128,10 +129,18 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
       return false;
    }
 
-   public Object getLock() {
+   public ReentrantLock getLock() {
       return handler.getLock();
    }
 
+   public void lock() {
+      handler.getLock().lock();
+   }
+
+   public void unlock() {
+      handler.getLock().unlock();
+   }
+
    public int capacity() {
       return handler.capacity();
    }
@@ -319,7 +328,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
       handler.flushBytes();
    }
 
-
    @Override
    public void pushBytes(ByteBuf bytes) {
       connectionCallback.onTransport(bytes, this);
@@ -327,7 +335,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
 
    @Override
    public void onRemoteOpen(Connection connection) throws Exception {
-      synchronized (getLock()) {
+      lock();
+      try {
          try {
             initInternal();
          } catch (Exception e) {
@@ -342,6 +351,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
             connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
             connection.open();
          }
+      } finally {
+         unlock();
       }
       initialise();
 
@@ -367,9 +378,12 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
 
    @Override
    public void onRemoteClose(Connection connection) {
-      synchronized (getLock()) {
+      lock();
+      try {
          connection.close();
          connection.free();
+      } finally {
+         unlock();
       }
 
       for (AMQPSessionContext protonSession : sessions.values()) {
@@ -390,8 +404,11 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
    @Override
    public void onRemoteOpen(Session session) throws Exception {
       getSessionExtension(session).initialise();
-      synchronized (getLock()) {
+      lock();
+      try {
          session.open();
+      } finally {
+         unlock();
       }
    }
 
@@ -401,9 +418,12 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
 
    @Override
    public void onRemoteClose(Session session) throws Exception {
-      synchronized (getLock()) {
+      lock();
+      try {
          session.close();
          session.free();
+      } finally {
+         unlock();
       }
 
       AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
@@ -428,10 +448,14 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
 
    @Override
    public void onRemoteClose(Link link) throws Exception {
-      synchronized (getLock()) {
+      lock();
+      try {
          link.close();
          link.free();
+      } finally {
+         unlock();
       }
+
       ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
       if (linkContext != null) {
          linkContext.close(true);
@@ -440,11 +464,13 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
 
    @Override
    public void onRemoteDetach(Link link) throws Exception {
-      synchronized (getLock()) {
+      lock();
+      try {
          link.detach();
          link.free();
+      } finally {
+         unlock();
       }
-
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index c2c1f2d..72833e3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -147,9 +147,12 @@ public class AMQPSessionContext extends ProtonInitializable {
       coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));
 
       receiver.setContext(transactionHandler);
-      synchronized (connection.getLock()) {
+      connection.lock();
+      try {
          receiver.open();
          receiver.flow(connection.getAmqpCredits());
+      } finally {
+         connection.unlock();
       }
    }
 
@@ -163,16 +166,23 @@ public class AMQPSessionContext extends ProtonInitializable {
          senders.put(sender, protonSender);
          serverSenders.put(protonSender.getBrokerConsumer(), protonSender);
          sender.setContext(protonSender);
-         synchronized (connection.getLock()) {
+         connection.lock();
+         try {
             sender.open();
+         } finally {
+            connection.unlock();
          }
+
          protonSender.start();
       } catch (ActiveMQAMQPException e) {
          senders.remove(sender);
          sender.setSource(null);
          sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-         synchronized (connection.getLock()) {
+         connection.lock();
+         try {
             sender.close();
+         } finally {
+            connection.unlock();
          }
       }
    }
@@ -191,15 +201,21 @@ public class AMQPSessionContext extends ProtonInitializable {
          protonReceiver.initialise();
          receivers.put(receiver, protonReceiver);
          receiver.setContext(protonReceiver);
-         synchronized (connection.getLock()) {
+         connection.lock();
+         try {
             receiver.open();
+         } finally {
+            connection.unlock();
          }
       } catch (ActiveMQAMQPException e) {
          receivers.remove(receiver);
          receiver.setTarget(null);
          receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-         synchronized (connection.getLock()) {
+         connection.lock();
+         try {
             receiver.close();
+         } finally {
+            connection.unlock();
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 20ef1df..2606482 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -117,7 +117,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
          if (remoteDesiredCapabilities != null) {
             List<Symbol> list = Arrays.asList(remoteDesiredCapabilities);
             if (list.contains(AmqpSupport.DELAYED_DELIVERY)) {
-               receiver.setOfferedCapabilities(new Symbol[] {AmqpSupport.DELAYED_DELIVERY});
+               receiver.setOfferedCapabilities(new Symbol[]{AmqpSupport.DELAYED_DELIVERY});
             }
          }
       }
@@ -179,9 +179,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
          condition.setCondition(Symbol.valueOf("failed"));
          condition.setDescription(e.getMessage());
          rejected.setError(condition);
-         synchronized (connection.getLock()) {
+         connection.lock();
+         try {
             delivery.disposition(rejected);
             delivery.settle();
+         } finally {
+            connection.unlock();
          }
       }
    }
@@ -210,16 +213,22 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       if (sessionSPI != null) {
          sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
       } else {
-         synchronized (connection.getLock()) {
+         connection.lock();
+         try {
             receiver.flow(credits);
+         } finally {
+            connection.unlock();
          }
          connection.flush();
       }
    }
 
    public void drain(int credits) {
-      synchronized (connection.getLock()) {
+      connection.lock();
+      try {
          receiver.drain(credits);
+      } finally {
+         connection.unlock();
       }
       connection.flush();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index ca14f97..756a3d9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -20,6 +20,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -95,7 +96,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    private boolean isVolatile = false;
    private String tempQueueName;
 
-   public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
+   public ProtonServerSenderContext(AMQPConnectionContext connection,
+                                    Sender sender,
+                                    AMQPSessionContext protonSession,
+                                    AMQPSessionCallback server) {
       super();
       this.connection = connection;
       this.sender = sender;
@@ -246,7 +250,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          }
          //check to see if the client has defined how we act
          boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source);
-         if (clientDefined)  {
+         if (clientDefined) {
             multicast = hasCapabilities(TOPIC, source);
             AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
             if (!addressQueryResult.isExists()) {
@@ -293,9 +297,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                supportedFilters.put(filter.getKey(), filter.getValue());
             }
 
-
             if (queueNameToUse != null) {
-               SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST  );
+               SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST);
                queue = matchingAnycastQueue.toString();
             }
             //if the address specifies a broker configured queue then we always use this, treat it as a queue
@@ -313,8 +316,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                if (result.isExists()) {
                   // If a client reattaches to a durable subscription with a different no-local
                   // filter value, selector or address then we must recreate the queue (JMS semantics).
-                  if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) ||
-                     (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
+                  if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
 
                      if (result.getConsumerCount() == 0) {
                         sessionSPI.deleteQueue(queue);
@@ -392,7 +394,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
       boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
       try {
-         brokerConsumer = (Consumer)sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
+         brokerConsumer = (Consumer) sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
       } catch (ActiveMQAMQPResourceLimitExceededException e1) {
          throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
       } catch (Exception e) {
@@ -404,7 +406,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       return connection.getRemoteContainer();
    }
 
-
    /*
     * close the session
     */
@@ -415,8 +416,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          sender.setCondition(condition);
       }
       protonSession.removeSender(sender);
-      synchronized (connection.getLock()) {
+      connection.lock();
+      try {
          sender.close();
+      } finally {
+         connection.unlock();
       }
       connection.flush();
 
@@ -442,7 +446,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             Source source = (Source) sender.getSource();
             if (source != null && source.getAddress() != null && multicast) {
                String queueName = source.getAddress();
-               QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse,  false);
+               QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false);
                if (result.isExists() && source.getDynamic()) {
                   sessionSPI.deleteQueue(queueName);
                } else {
@@ -489,8 +493,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
          DeliveryState remoteState;
 
-         synchronized (connection.getLock()) {
+         connection.lock();
+         try {
             remoteState = delivery.getRemoteState();
+         } finally {
+            connection.unlock();
          }
 
          boolean settleImmediate = true;
@@ -509,8 +516,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                         TransactionalState txAccepted = new TransactionalState();
                         txAccepted.setOutcome(Accepted.getInstance());
                         txAccepted.setTxnId(txState.getTxnId());
-                        synchronized (connection.getLock()) {
+                        connection.lock();
+                        try {
                            delivery.disposition(txAccepted);
+                        } finally {
+                           connection.unlock();
                         }
                      }
                      // we have to individual ack as we can't guarantee we will get the delivery
@@ -556,7 +566,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                   Modified modification = (Modified) remoteState;
 
                   if (Boolean.TRUE.equals(modification.getUndeliverableHere())) {
-                     message.rejectConsumer(((Consumer)brokerConsumer).sequentialID());
+                     message.rejectConsumer(((Consumer) brokerConsumer).sequentialID());
                   }
 
                   if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
@@ -585,8 +595,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    }
 
    public void settle(Delivery delivery) {
-      synchronized (connection.getLock()) {
+      connection.lock();
+      try {
          delivery.settle();
+      } finally {
+         connection.unlock();
       }
    }
 
@@ -617,10 +630,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
          int size = nettyBuffer.writerIndex();
 
-         synchronized (connection.getLock()) {
-            if (sender.getLocalState() == EndpointState.CLOSED) {
+         while (!connection.getLock().tryLock(1, TimeUnit.SECONDS)) {
+            if (closed || sender.getLocalState() == EndpointState.CLOSED) {
+               // If we're waiting on the connection lock, the link might be in the process of closing.  If this happens
+               // we return.
                return 0;
+            } else {
+               if (log.isDebugEnabled()) {
+                  log.debug("Couldn't get lock on deliverMessage " + this);
+               }
             }
+         }
+
+         try {
             final Delivery delivery;
             delivery = sender.delivery(tag, 0, tag.length);
             delivery.setMessageFormat((int) message.getMessageFormat());
@@ -636,10 +658,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             } else {
                sender.advance();
             }
+            connection.flush();
+         } finally {
+            connection.unlock();
          }
 
-         connection.flush();
-
          return size;
       } finally {
          nettyBuffer.release();
@@ -659,7 +682,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       return false;
    }
 
-   private static String createQueueName(String clientId, String pubId, boolean shared, boolean global, boolean isVolatile) {
+   private static String createQueueName(String clientId,
+                                         String pubId,
+                                         boolean shared,
+                                         boolean global,
+                                         boolean isVolatile) {
       String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId;
       if (shared) {
          if (queue.contains("|")) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 91b252b..fc6cbf6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -58,7 +59,7 @@ public class ProtonHandler extends ProtonInitializable {
 
    private Sasl serverSasl;
 
-   private final Object lock = new Object();
+   private final ReentrantLock lock = new ReentrantLock();
 
    private final long creationTime;
 
@@ -79,38 +80,41 @@ public class ProtonHandler extends ProtonInitializable {
    }
 
    public long tick(boolean firstTick) {
+      lock.lock();
       try {
-         synchronized (lock) {
-            if (!firstTick) {
-               try {
-                  if (connection.getLocalState() != EndpointState.CLOSED) {
-                     long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
-                     if (transport.isClosed()) {
-                        throw new IllegalStateException("Channel was inactive for to long");
-                     }
-                     return rescheduleAt;
+         if (!firstTick) {
+            try {
+               if (connection.getLocalState() != EndpointState.CLOSED) {
+                  long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+                  if (transport.isClosed()) {
+                     throw new IllegalStateException("Channel was inactive for to long");
                   }
-               } catch (Exception e) {
-                  log.warn(e.getMessage(), e);
-                  transport.close();
-                  connection.setCondition(new ErrorCondition());
+                  return rescheduleAt;
                }
-               return 0;
+            } catch (Exception e) {
+               log.warn(e.getMessage(), e);
+               transport.close();
+               connection.setCondition(new ErrorCondition());
             }
-            return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+            return 0;
          }
+         return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
       } finally {
+         lock.unlock();
          flushBytes();
       }
    }
 
    public int capacity() {
-      synchronized (lock) {
+      lock.lock();
+      try {
          return transport.capacity();
+      } finally {
+         lock.unlock();
       }
    }
 
-   public Object getLock() {
+   public ReentrantLock getLock() {
       return lock;
    }
 
@@ -142,7 +146,8 @@ public class ProtonHandler extends ProtonInitializable {
    }
 
    public void flushBytes() {
-      synchronized (lock) {
+      lock.lock();
+      try {
          while (true) {
             int pending = transport.pending();
 
@@ -161,17 +166,19 @@ public class ProtonHandler extends ProtonInitializable {
 
             transport.pop(pending);
          }
+      } finally {
+         lock.unlock();
       }
    }
 
-
    public SASLResult getSASLResult() {
       return saslResult;
    }
 
    public void inputBuffer(ByteBuf buffer) {
       dataReceived = true;
-      synchronized (lock) {
+      lock.lock();
+      try {
          while (buffer.readableBytes() > 0) {
             int capacity = transport.capacity();
 
@@ -208,6 +215,8 @@ public class ProtonHandler extends ProtonInitializable {
                break;
             }
          }
+      } finally {
+         lock.unlock();
       }
    }
 
@@ -224,20 +233,26 @@ public class ProtonHandler extends ProtonInitializable {
    }
 
    public void flush() {
-      synchronized (lock) {
+      lock.lock();
+      try {
          transport.process();
          checkServerSASL();
+      } finally {
+         lock.unlock();
       }
 
       dispatch();
    }
 
    public void close(ErrorCondition errorCondition) {
-      synchronized (lock) {
+      lock.lock();
+      try {
          if (errorCondition != null) {
             connection.setCondition(errorCondition);
          }
          connection.close();
+      } finally {
+         lock.unlock();
       }
 
       flush();
@@ -283,7 +298,8 @@ public class ProtonHandler extends ProtonInitializable {
    private void dispatch() {
       Event ev;
 
-      synchronized (lock) {
+      lock.lock();
+      try {
          if (inDispatch) {
             // Avoid recursion from events
             return;
@@ -309,6 +325,8 @@ public class ProtonHandler extends ProtonInitializable {
          } finally {
             inDispatch = false;
          }
+      } finally {
+         lock.unlock();
       }
 
       flushBytes();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index f817ed4..4579f1c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -72,7 +72,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
          ByteBuffer buffer;
          MessageImpl msg;
 
-         synchronized (connection.getLock()) {
+         connection.lock();
+         try {
             // Replenish coordinator receiver credit on exhaustion so sender can continue
             // transaction declare and discahrge operations.
             if (receiver.getCredit() < amqpLowMark) {
@@ -94,6 +95,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
             receiver.advance();
 
             msg = decodeMessage(buffer);
+         } finally {
+            connection.unlock();
          }
 
          Object action = ((AmqpValue) msg.getBody()).getValue();
@@ -102,45 +105,63 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
             Binary txID = sessionSPI.newTransaction();
             Declared declared = new Declared();
             declared.setTxnId(txID);
-            synchronized (connection.getLock()) {
+            connection.lock();
+            try {
                delivery.disposition(declared);
+            } finally {
+               connection.unlock();
             }
          } else if (action instanceof Discharge) {
             Discharge discharge = (Discharge) action;
 
             Binary txID = discharge.getTxnId();
-            ProtonTransactionImpl tx = (ProtonTransactionImpl)sessionSPI.getTransaction(txID, true);
+            ProtonTransactionImpl tx = (ProtonTransactionImpl) sessionSPI.getTransaction(txID, true);
             tx.discharge();
 
             if (discharge.getFail()) {
                tx.rollback();
-               synchronized (connection.getLock()) {
+               connection.lock();
+               try {
                   delivery.disposition(new Accepted());
+               } finally {
+                  connection.unlock();
                }
                connection.flush();
             } else {
                tx.commit();
-               synchronized (connection.getLock()) {
+               connection.lock();
+               try {
                   delivery.disposition(new Accepted());
+               } finally {
+                  connection.unlock();
                }
                connection.flush();
             }
          }
       } catch (ActiveMQAMQPException amqpE) {
          log.warn(amqpE.getMessage(), amqpE);
-         synchronized (connection.getLock()) {
+         connection.lock();
+         try {
             delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
+         } finally {
+            connection.unlock();
          }
          connection.flush();
       } catch (Throwable e) {
          log.warn(e.getMessage(), e);
-         synchronized (connection.getLock()) {
+         connection.lock();
+         try {
             delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
+         } finally {
+            connection.unlock();
          }
          connection.flush();
       } finally {
-         synchronized (connection.getLock()) {
+         connection.lock();
+         try {
             delivery.settle();
+         } finally {
+            connection.unlock();
          }
          connection.flush();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/930df5b6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index 09a44dd..4ee94c2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -1585,6 +1585,45 @@ public class ProtonTest extends ProtonTestBase {
    }
 
    @Test
+   public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable {
+      String name = "exampleQueue1";
+
+      int numMessages = 50;
+
+      System.out.println("1. Send messages into queue");
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue queue = session.createQueue(name);
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("Message temporary");
+         p.send(message);
+      }
+      p.close();
+      session.close();
+
+      System.out.println("2. Receive one by one, each in its own session");
+      for (int i = 0; i < numMessages; i++) {
+         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         queue = session.createQueue(name);
+         MessageConsumer c = session.createConsumer(queue);
+         Message m = c.receive(1000);
+         p.close();
+         session.close();
+      }
+
+      System.out.println("3. Try to receive 10 in the same session");
+      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      queue = session.createQueue(name);
+      MessageConsumer c = session.createConsumer(queue);
+      for (int i = 0; i < numMessages; i++) {
+         Message m = c.receive(1000);
+      }
+      p.close();
+      session.close();
+   }
+
+   @Test
    public void testSimpleObject() throws Throwable {
       final int numMessages = 1;
       long time = System.currentTimeMillis();


[2/2] activemq-artemis git commit: This closes #1202

Posted by cl...@apache.org.
This closes #1202


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/851803da
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/851803da
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/851803da

Branch: refs/heads/master
Commit: 851803daa144b42951721bb1965fa30b74d15992
Parents: 3ff9057 930df5b
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Apr 12 12:35:26 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Apr 12 12:35:26 2017 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        | 73 ++++++++++++++------
 .../amqp/proton/AMQPConnectionContext.java      | 44 +++++++++---
 .../amqp/proton/AMQPSessionContext.java         | 26 +++++--
 .../proton/ProtonServerReceiverContext.java     | 17 +++--
 .../amqp/proton/ProtonServerSenderContext.java  | 65 ++++++++++++-----
 .../amqp/proton/handler/ProtonHandler.java      | 66 +++++++++++-------
 .../transaction/ProtonTransactionHandler.java   | 37 +++++++---
 .../tests/integration/amqp/ProtonTest.java      | 39 +++++++++++
 8 files changed, 275 insertions(+), 92 deletions(-)
----------------------------------------------------------------------