You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/10/07 14:38:15 UTC

[1/2] activemq-artemis git commit: This closes #814

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 20729e79f -> 6f6d9845f


This closes #814


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6f6d9845
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6f6d9845
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6f6d9845

Branch: refs/heads/master
Commit: 6f6d9845fe9f5a0b54a3bd0cda8a541feb150995
Parents: 20729e7 95c4fdd
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Oct 7 10:38:01 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 10:38:01 2016 -0400

----------------------------------------------------------------------
 .../core/management/ActiveMQServerControl.java  |  7 ++
 .../management/impl/JMSTopicControlImpl.java    |  2 +-
 .../amqp/broker/AMQPConnectionCallback.java     |  2 +-
 .../amqp/broker/AMQPSessionCallback.java        | 17 ++--
 .../ActiveMQProtonRemotingConnection.java       |  7 +-
 .../amqp/proton/AMQPConnectionContext.java      |  7 +-
 .../protocol/amqp/proton/AmqpSupport.java       |  3 +
 .../amqp/proton/ProtonServerSenderContext.java  | 11 ++-
 .../amqp/proton/handler/ProtonHandler.java      | 17 +++-
 .../impl/ActiveMQServerControlImpl.java         | 10 ++-
 .../transport/amqp/client/AmqpSession.java      | 14 +++-
 .../tests/integration/amqp/ProtonTest.java      | 85 +++++++++++++++++++-
 .../management/ActiveMQServerControlTest.java   | 26 ++++++
 .../ActiveMQServerControlUsingCoreTest.java     |  5 ++
 14 files changed, 190 insertions(+), 23 deletions(-)
----------------------------------------------------------------------



[2/2] activemq-artemis git commit: ARTEMIS-762 Reflect management changes in AMQP protocol

Posted by cl...@apache.org.
ARTEMIS-762 Reflect management changes in AMQP protocol


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/95c4fdd4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/95c4fdd4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/95c4fdd4

Branch: refs/heads/master
Commit: 95c4fdd4086ae522f1d2edfc537c06f820d10ac1
Parents: 20729e7
Author: Martyn Taylor <mt...@redhat.com>
Authored: Fri Sep 30 16:36:00 2016 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 10:38:01 2016 -0400

----------------------------------------------------------------------
 .../core/management/ActiveMQServerControl.java  |  7 ++
 .../management/impl/JMSTopicControlImpl.java    |  2 +-
 .../amqp/broker/AMQPConnectionCallback.java     |  2 +-
 .../amqp/broker/AMQPSessionCallback.java        | 17 ++--
 .../ActiveMQProtonRemotingConnection.java       |  7 +-
 .../amqp/proton/AMQPConnectionContext.java      |  7 +-
 .../protocol/amqp/proton/AmqpSupport.java       |  3 +
 .../amqp/proton/ProtonServerSenderContext.java  | 11 ++-
 .../amqp/proton/handler/ProtonHandler.java      | 17 +++-
 .../impl/ActiveMQServerControlImpl.java         | 10 ++-
 .../transport/amqp/client/AmqpSession.java      | 14 +++-
 .../tests/integration/amqp/ProtonTest.java      | 85 +++++++++++++++++++-
 .../management/ActiveMQServerControlTest.java   | 26 ++++++
 .../ActiveMQServerControlUsingCoreTest.java     |  5 ++
 14 files changed, 190 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 02819e5..01a8d74 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -512,6 +512,13 @@ public interface ActiveMQServerControl {
    void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name) throws Exception;
 
    /**
+    * Destroys the queue corresponding to the specified name.
+    */
+   @Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION)
+   void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name,
+                     @Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers) throws Exception;
+
+   /**
     * Enables message counters for this server.
     */
    @Operation(desc = "Enable message counters", impact = MBeanOperationInfo.ACTION)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java
index d8c4179..cd8e4e0 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java
@@ -238,7 +238,7 @@ public class JMSTopicControlImpl extends StandardMBean implements TopicControl {
          throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
       }
       ActiveMQServerControl serverControl = (ActiveMQServerControl) managementService.getResource(ResourceNames.CORE_SERVER);
-      serverControl.destroyQueue(queueName);
+      serverControl.destroyQueue(queueName, true);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 31abf87..4ced546 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -117,7 +117,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
             server.removeClientConnection(remoteContainerId);
          }
          connection.close();
-         amqpConnection.close();
+         amqpConnection.close(null);
       } finally {
          for (Transaction tx : transactions.values()) {
             try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 46ed1c9..c7ca446 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -43,6 +43,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternal
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
 import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
@@ -59,12 +60,14 @@ import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.message.ProtonJMessage;
+import org.jboss.logging.Logger;
 
 public class AMQPSessionCallback implements SessionCallback {
 
+   private static final Logger logger = Logger.getLogger(AMQPSessionCallback.class);
+
    protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
 
    private final AMQPConnectionCallback protonSPI;
@@ -467,9 +470,14 @@ public class AMQPSessionCallback implements SessionCallback {
 
    @Override
    public void disconnect(ServerConsumer consumer, String queueName) {
-      synchronized (connection.getLock()) {
-         ((Link) consumer.getProtocolContext()).close();
-         connection.flush();
+      ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
+      try {
+         synchronized (connection.getLock()) {
+            ((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
+            connection.flush();
+         }
+      } catch (ActiveMQAMQPException e) {
+         logger.error("Error closing link for " + consumer.getQueue().getAddress());
       }
    }
 
@@ -504,5 +512,4 @@ public class AMQPSessionCallback implements SessionCallback {
       protonSPI.removeTransaction(txid);
 
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
index 7f129a1..039da79 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
@@ -23,8 +23,10 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 
 /**
  * This is a Server's Connection representation used by ActiveMQ Artemis.
@@ -103,6 +105,9 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
 
    @Override
    public void disconnect(boolean criticalError) {
+      ErrorCondition errorCondition = new ErrorCondition();
+      errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED);
+      amqpConnection.close(errorCondition);
       getTransportConnection().close();
    }
 
@@ -111,7 +116,7 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
     */
    @Override
    public void disconnect(String scaleDownNodeID, boolean criticalError) {
-      getTransportConnection().close();
+      disconnect(criticalError);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 3d79026..70e4fd0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.VersionLoader;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Link;
@@ -132,8 +133,8 @@ public class AMQPConnectionContext extends ProtonInitializable {
       handler.flush();
    }
 
-   public void close() {
-      handler.close();
+   public void close(ErrorCondition errorCondition) {
+      handler.close(errorCondition);
    }
 
    protected AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException {
@@ -264,7 +265,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
             if (!connectionCallback.isSupportsAnonymous()) {
                connectionCallback.sendSASLSupported();
                connectionCallback.close();
-               handler.close();
+               handler.close(null);
             }
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index 13d7170..7bdbd2e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -53,6 +53,9 @@ public class AmqpSupport {
    public static final Symbol PRODUCT = Symbol.valueOf("product");
    public static final Symbol VERSION = Symbol.valueOf("version");
    public static final Symbol PLATFORM = Symbol.valueOf("platform");
+   public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted");
+   public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced");
+
 
    // Symbols used in configuration of newly opened links.
    public static final Symbol COPY = Symbol.getSymbol("copy");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 7ef4944..7d401fa 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -169,7 +169,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          // Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue
          String clientId = connection.getRemoteContainer();
          String pubId = sender.getName();
-         queue = clientId + ":" + pubId;
+         queue = createQueueName(clientId, pubId);
          boolean exists = sessionSPI.queueQuery(queue, false).isExists();
 
          /*
@@ -207,7 +207,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
                   String clientId = connection.getRemoteContainer();
                   String pubId = sender.getName();
-                  queue = clientId + ":" + pubId;
+                  queue = createQueueName(clientId, pubId);
                   QueueQueryResult result = sessionSPI.queueQuery(queue, false);
 
                   if (result.isExists()) {
@@ -307,7 +307,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                } else {
                   String clientId = connection.getRemoteContainer();
                   String pubId = sender.getName();
-                  String queue = clientId + ":" + pubId;
+                  String queue = createQueueName(clientId, pubId);
                   result = sessionSPI.queueQuery(queue, false);
                   if (result.isExists()) {
                      if (result.getConsumerCount() > 0) {
@@ -324,6 +324,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       }
    }
 
+
    @Override
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
       Object message = delivery.getContext();
@@ -478,4 +479,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       }
    }
 
+   private static String createQueueName(String clientId, String pubId) {
+      return clientId + "." + pubId;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 0d667b0..3c088d5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -248,6 +248,10 @@ public class ProtonHandler extends ProtonInitializable {
    }
 
    public void flush() {
+      flush(false);
+   }
+
+   private void flush(boolean wait) {
       synchronized (lock) {
          transport.process();
 
@@ -255,14 +259,21 @@ public class ProtonHandler extends ProtonInitializable {
 
       }
 
-      dispatchExecutor.execute(dispatchRunnable);
+      if (wait) {
+         dispatch();
+      } else {
+         dispatchExecutor.execute(dispatchRunnable);
+      }
    }
 
-   public void close() {
+   public void close(ErrorCondition errorCondition) {
       synchronized (lock) {
+         if (errorCondition != null) {
+            connection.setCondition(errorCondition);
+         }
          connection.close();
       }
-      flush();
+      flush(true);
    }
 
    protected void checkServerSASL() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 48f4374..b96fcbf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -697,20 +697,24 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
    }
 
    @Override
-   public void destroyQueue(final String name) throws Exception {
+   public void destroyQueue(final String name, final boolean removeConsumers) throws Exception {
       checkStarted();
 
       clearIO();
       try {
          SimpleString queueName = new SimpleString(name);
-
-         server.destroyQueue(queueName, null, true);
+         server.destroyQueue(queueName, null, !removeConsumers, removeConsumers);
       } finally {
          blockOnIO();
       }
    }
 
    @Override
+   public void destroyQueue(final String name) throws Exception {
+      destroyQueue(name, false);
+   }
+
+   @Override
    public int getConnectionCount() {
       checkStarted();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 65a69b7..936d4ef 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -207,10 +207,22 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
     * @throws Exception if an error occurs while creating the receiver.
     */
    public AmqpReceiver createReceiver(Source source) throws Exception {
+      return createReceiver(source, getNextReceiverId());
+   }
+
+   /**
+    * Create a receiver instance using the given Source
+    *
+    * @param source the caller created and configured Source used to create the receiver link.
+    * @param receiverId the receiver id to use.
+    * @return a newly created receiver that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createReceiver(Source source, String receiverId) throws Exception {
       checkClosed();
 
       final ClientFuture request = new ClientFuture();
-      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId());
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
 
       connection.getScheduler().execute(new Runnable() {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index e90a8b1..2a1e8c9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -58,7 +58,10 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.VersionLoader;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -71,6 +74,8 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -156,6 +161,7 @@ public class ProtonTest extends ProtonTestBase {
       server.createQueue(new SimpleString("amqp_testtopic" + "8"), new SimpleString("amqp_testtopic" + "8"), null, true, false);
       server.createQueue(new SimpleString("amqp_testtopic" + "9"), new SimpleString("amqp_testtopic" + "9"), null, true, false);
       server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);
+
       connection = createConnection();
 
    }
@@ -186,9 +192,9 @@ public class ProtonTest extends ProtonTestBase {
          session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          myDurSub = session.createDurableSubscriber(topic, "myDurSub");
          myDurSub.close();
-         Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub")));
+         Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")));
          session.unsubscribe("myDurSub");
-         Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub")));
+         Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")));
          session.close();
          connection.close();
       } finally {
@@ -741,6 +747,81 @@ public class ProtonTest extends ProtonTestBase {
    }
 
    @Test
+   public void testLinkDetachSentWhenQueueDeleted() throws Exception {
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      final AmqpConnection amqpConnection = client.connect();
+      try {
+         AmqpSession session = amqpConnection.createSession();
+
+         AmqpReceiver receiver = session.createReceiver(coreAddress);
+         server.destroyQueue(new SimpleString(coreAddress), null, false, true);
+
+         Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+               return amqpConnection.isClosed();
+            }
+         });
+         assertTrue(receiver.isClosed());
+      } finally {
+         amqpConnection.close();
+      }
+   }
+
+   @Test
+   public void testCloseIsSentOnConnectionClose() throws Exception {
+      connection.close();
+
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      final AmqpConnection amqpConnection = client.connect();
+      try {
+         for (RemotingConnection connection : server.getRemotingService().getConnections()) {
+            server.getRemotingService().removeConnection(connection);
+            connection.disconnect(true);
+         }
+
+         Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+               return amqpConnection.isClosed();
+            }
+         });
+
+         assertTrue(amqpConnection.isClosed());
+         assertEquals(AmqpSupport.CONNECTION_FORCED, amqpConnection.getConnection().getRemoteCondition().getCondition());
+      } finally {
+         amqpConnection.close();
+      }
+   }
+
+
+   @Test
+   public void testClientIdIsSetInSubscriptionList() throws Exception {
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.createConnection();
+      amqpConnection.setContainerId("testClient");
+      amqpConnection.setOfferedCapabilities(Arrays.asList(Symbol.getSymbol("topic")));
+      amqpConnection.connect();
+      try {
+         AmqpSession session = amqpConnection.createSession();
+
+         Source source = new Source();
+         source.setDurable(TerminusDurability.UNSETTLED_STATE);
+         source.setCapabilities(Symbol.getSymbol("topic"));
+         source.setAddress("jms.topic.mytopic");
+         AmqpReceiver receiver = session.createReceiver(source, "testSub");
+
+         SimpleString fo = new SimpleString("testClient.testSub:jms.topic.mytopic");
+         assertNotNull(server.locateQueue(fo));
+
+      } catch (Exception e) {
+         e.printStackTrace();
+      } finally {
+         amqpConnection.close();
+      }
+   }
+
+   @Test
    public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception {
 
       String queueName = "TestQueueName";

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index df3cf57..86d19db 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -250,6 +250,32 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
    }
 
    @Test
+   public void testCreateAndDestroyQueueClosingConsumers() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString name = RandomUtil.randomSimpleString();
+      boolean durable = true;
+
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
+
+      serverControl.createQueue(address.toString(), name.toString(), durable);
+
+      ServerLocator receiveLocator = createInVMNonHALocator();
+      ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
+      ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
+      ClientConsumer consumer = receiveClientSession.createConsumer(name);
+
+      Assert.assertFalse(consumer.isClosed());
+
+      checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
+      serverControl.destroyQueue(name.toString(), true);
+      Assert.assertTrue(consumer.isClosed());
+
+      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
+   }
+
+   @Test
    public void testCreateAndDestroyQueueWithNullFilter() throws Exception {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString name = RandomUtil.randomSimpleString();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 69949b6..777ddd2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -128,6 +128,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
          }
 
          @Override
+         public void destroyQueue(final String name, final boolean removeConsumers) throws Exception {
+            proxy.invokeOperation("destroyQueue", name, removeConsumers);
+         }
+
+         @Override
          public void disableMessageCounters() throws Exception {
             proxy.invokeOperation("disableMessageCounters");
          }