You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/05/24 16:06:41 UTC

[1/2] activemq-artemis git commit: ARTEMIS-535 - Improve amqp protocol to support topics

Repository: activemq-artemis
Updated Branches:
  refs/heads/master cd088888b -> ecd9c1362


ARTEMIS-535 - Improve amqp protocol to support topics

https://issues.apache.org/jira/browse/ARTEMIS-535


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/73f908b8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/73f908b8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/73f908b8

Branch: refs/heads/master
Commit: 73f908b8b495e0f66c310cc30b6e6744bb288c4e
Parents: cd08888
Author: Andy Taylor <an...@gmail.com>
Authored: Tue May 24 11:59:04 2016 +0100
Committer: Andy Taylor <an...@gmail.com>
Committed: Tue May 24 13:26:24 2016 +0100

----------------------------------------------------------------------
 .../protocol/proton/ProtonProtocolManager.java  |  16 +
 .../plug/ProtonSessionIntegrationCallback.java  |  20 ++
 .../plug/AMQPClientConnectionContext.java       |   2 +
 .../proton/plug/AMQPClientSessionContext.java   |   2 +
 .../org/proton/plug/AMQPSessionCallback.java    |   7 +
 .../plug/context/AbstractConnectionContext.java |  22 +-
 .../context/AbstractProtonContextSender.java    |   4 +-
 .../context/AbstractProtonReceiverContext.java  |   4 +-
 .../context/AbstractProtonSessionContext.java   |   6 +-
 .../plug/context/ProtonDeliveryHandler.java     |   6 +-
 .../plug/context/ProtonTransactionHandler.java  |   2 +-
 .../client/ProtonClientConnectionContext.java   |   5 +
 .../client/ProtonClientSessionContext.java      |   7 +-
 .../server/ProtonServerSenderContext.java       | 133 ++++++-
 .../test/minimalserver/MinimalSessionSPI.java   |  20 ++
 .../en/protocols-interoperability.md            |  15 +
 tests/integration-tests/pom.xml                 |  12 +
 .../integration/proton/ProtonPubSubTest.java    | 351 +++++++++++++++++++
 18 files changed, 606 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
index 40eb175..1b1699f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnectio
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationListener;
+import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -54,6 +55,12 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
 
    private final ProtonProtocolManagerFactory factory;
 
+   /*
+   * used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
+   * the address. This can be changed on the acceptor.
+   * */
+   private String pubSubPrefix = ActiveMQTopic.JMS_TOPIC_ADDRESS_PREFIX;
+
    public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
       this.factory = factory;
       this.server = server;
@@ -139,4 +146,13 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
    public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
    }
 
+   public String getPubSubPrefix() {
+      return pubSubPrefix;
+   }
+
+   public void setPubSubPrefix(String pubSubPrefix) {
+      this.pubSubPrefix = pubSubPrefix;
+   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 2350f9d..5b73acf 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -185,6 +185,16 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
    }
 
    @Override
+   public void createTemporaryQueue(String address, String queueName) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true);
+   }
+
+   @Override
+   public void createDurableQueue(String address, String queueName) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true);
+   }
+
+   @Override
    public boolean queueQuery(String queueName) throws Exception {
       boolean queryResult = false;
 
@@ -360,6 +370,16 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
       }
    }
 
+   @Override
+   public String getPubSubPrefix() {
+      return manager.getPubSubPrefix();
+   }
+
+   @Override
+   public void deleteQueue(String address) throws Exception {
+      manager.getServer().destroyQueue(new SimpleString(address));
+   }
+
    private void resetContext() {
       manager.getServer().getStorageManager().setContext(null);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java
index 786d0d7..1abd96f 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java
@@ -31,4 +31,6 @@ public interface AMQPClientConnectionContext extends AMQPConnectionContext {
    void clientOpen(ClientSASL sasl) throws Exception;
 
    AMQPClientSessionContext createClientSession() throws ActiveMQAMQPException;
+
+   void setContainer(String containerID);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
index 6cd0aa7..b518474 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
@@ -23,4 +23,6 @@ public interface AMQPClientSessionContext extends AMQPSessionContext {
    AMQPClientSenderContext createSender(String address, boolean preSettled) throws ActiveMQAMQPException;
 
    AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException;
+
+   AMQPClientReceiverContext createReceiver(String name, String address) throws ActiveMQAMQPException;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
index 0c0dbe0..630761f 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
@@ -40,6 +40,12 @@ public interface AMQPSessionCallback {
 
    void createTemporaryQueue(String queueName) throws Exception;
 
+   void createTemporaryQueue(String address, String queueName) throws Exception;
+
+   void createDurableQueue(String address, String queueName) throws Exception;
+
+   void deleteQueue(String address) throws Exception;
+
    boolean queueQuery(String queueName) throws Exception;
 
    void closeSender(Object brokerConsumer) throws Exception;
@@ -82,4 +88,5 @@ public interface AMQPSessionCallback {
                    int messageFormat,
                    ByteBuf messageEncoded) throws Exception;
 
+   String getPubSubPrefix();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
index 262dc2a..34e1873 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
@@ -32,6 +32,7 @@ import org.apache.qpid.proton.engine.Transport;
 import org.proton.plug.AMQPConnectionCallback;
 import org.proton.plug.AMQPConnectionContext;
 import org.proton.plug.SASLResult;
+import org.proton.plug.context.server.ProtonServerSenderContext;
 import org.proton.plug.exceptions.ActiveMQAMQPException;
 import org.proton.plug.handler.ProtonHandler;
 import org.proton.plug.handler.impl.DefaultEventHandler;
@@ -163,6 +164,14 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
       }
    }
 
+   public String getRemoteContainer() {
+      return handler.getConnection().getRemoteContainer();
+   }
+
+   public String getPubSubPrefix() {
+      return null;
+   }
+
    // This listener will perform a bunch of things here
    class LocalListener extends DefaultEventHandler {
 
@@ -265,7 +274,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
          link.close();
          ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
          if (linkContext != null) {
-            linkContext.close();
+            linkContext.close(true);
          }
       }
 
@@ -275,6 +284,15 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
       }
 
       @Override
+      public void onDetach(Link link) throws Exception {
+         Object context = link.getContext();
+         if (context instanceof ProtonServerSenderContext) {
+            ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
+            senderContext.close(false);
+         }
+      }
+
+      @Override
       public void onDelivery(Delivery delivery) throws Exception {
          ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext();
          if (handler != null) {
@@ -289,4 +307,6 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
 
    }
 
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
index 6b209b8..29e3459 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
@@ -67,7 +67,7 @@ public abstract class AbstractProtonContextSender extends ProtonInitializable im
    * close the session
    * */
    @Override
-   public void close() throws ActiveMQAMQPException {
+   public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
       closed = true;
       protonSession.removeSender(sender);
       synchronized (connection.getLock()) {
@@ -84,7 +84,7 @@ public abstract class AbstractProtonContextSender extends ProtonInitializable im
    public void close(ErrorCondition condition) throws ActiveMQAMQPException {
       closed = true;
       sender.setCondition(condition);
-      close();
+      close(false);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
index 4286140..ffc08d3 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
@@ -53,14 +53,14 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
    }
 
    @Override
-   public void close() throws ActiveMQAMQPException {
+   public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
       protonSession.removeReceiver(receiver);
    }
 
    @Override
    public void close(ErrorCondition condition) throws ActiveMQAMQPException {
       receiver.setCondition(condition);
-      close();
+      close(false);
    }
 
    public void flow(int credits) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
index abb3115..5b22944 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
@@ -85,7 +85,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i
       AbstractProtonContextSender protonConsumer = senders.remove(consumer);
       if (protonConsumer != null) {
          try {
-            protonConsumer.close();
+            protonConsumer.close(false);
          }
          catch (ActiveMQAMQPException e) {
             protonConsumer.getSender().setTarget(null);
@@ -116,7 +116,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i
 
       for (AbstractProtonReceiverContext protonProducer : receiversCopy) {
          try {
-            protonProducer.close();
+            protonProducer.close(false);
          }
          catch (Exception e) {
             e.printStackTrace();
@@ -130,7 +130,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i
 
       for (AbstractProtonContextSender protonConsumer : protonSendersClone) {
          try {
-            protonConsumer.close();
+            protonConsumer.close(false);
          }
          catch (Exception e) {
             e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
index ad7ff4f..d861394 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
@@ -29,7 +29,11 @@ public interface ProtonDeliveryHandler {
 
    void onMessage(Delivery delivery) throws ActiveMQAMQPException;
 
-   void close() throws ActiveMQAMQPException;
+   /*
+   * we have to distinguish between a remote close on the link and a close via a connection or session as the latter mean
+   * that a link reattach can happen and we need to keep the underlying resource (queue/subscription) around for pub subs
+   * */
+   void close(boolean remoteLinkClose) throws ActiveMQAMQPException;
 
    void close(ErrorCondition condition) throws ActiveMQAMQPException;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
index 1b32b32..e768bb4 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
@@ -116,7 +116,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
    }
 
    @Override
-   public void close() throws ActiveMQAMQPException {
+   public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
       //noop
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
index 76a7da9..f4a43c1 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
@@ -83,6 +83,11 @@ public class ProtonClientConnectionContext extends AbstractConnectionContext imp
    }
 
    @Override
+   public void setContainer(String containerID) {
+      handler.getConnection().setContainer(containerID);
+   }
+
+   @Override
    protected AbstractProtonSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException {
       AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this);
       AbstractProtonSessionContext protonSession = new ProtonClientSessionContext(sessionSPI, this, realSession);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
index 3b07a40..b3e96bb 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
@@ -64,12 +64,17 @@ public class ProtonClientSessionContext extends AbstractProtonSessionContext imp
 
    @Override
    public AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException {
+      return createReceiver(address, address);
+   }
+
+   @Override
+   public AMQPClientReceiverContext createReceiver(String name, String address) throws ActiveMQAMQPException {
       FutureRunnable futureRunnable = new FutureRunnable(1);
 
       ProtonClientReceiverContext amqpReceiver;
 
       synchronized (connection.getLock()) {
-         Receiver receiver = session.receiver(address);
+         Receiver receiver = session.receiver(name);
          Source source = new Source();
          source.setAddress(address);
          receiver.setSource(source);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
index ae1caa4..13b50e5 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
@@ -26,6 +26,8 @@ import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Modified;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@@ -50,6 +52,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
 
    private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector");
    private static final Symbol COPY = Symbol.valueOf("copy");
+   private static final Symbol TOPIC = Symbol.valueOf("topic");
 
    private Object brokerConsumer;
 
@@ -81,7 +84,10 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
       //todo add flow control
       try {
          // to do whatever you need to make the broker start sending messages to the consumer
-         sessionSPI.startSender(brokerConsumer);
+         //this could be null if a link reattach has happened
+         if (brokerConsumer != null) {
+            sessionSPI.startSender(brokerConsumer);
+         }
          //protonSession.getServerSession().receiveConsumerCredits(consumerID, -1);
       }
       catch (Exception e) {
@@ -105,26 +111,58 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
       /*
       * even tho the filter is a map it will only return a single filter unless a nolocal is also provided
       * */
-      Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
-      if (filter != null) {
-         selector = filter.getValue().getDescribed().toString();
-         // Validate the Selector.
-         try {
-            SelectorParser.parse(selector);
-         }
-         catch (FilterException e) {
-            close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
-            return;
+      if (source != null) {
+         Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
+         if (filter != null) {
+            selector = filter.getValue().getDescribed().toString();
+            // Validate the Selector.
+            try {
+               SelectorParser.parse(selector);
+            }
+            catch (FilterException e) {
+               close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
+               return;
+            }
          }
       }
 
+      /*
+      * if we have a capability for a topic (qpid-jms) or we are configured on this address to act like a topic then act
+      * like a subscription.
+      * */
+      boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
+
       //filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
 
       //if (filter != null) {
          //todo implement nolocal filter
       //}
-
-      if (source != null) {
+      if (source == null) {
+         // Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue
+         String clientId = connection.getRemoteContainer();
+         String pubId = sender.getName();
+         queue = clientId + ":" + pubId;
+         boolean exists = sessionSPI.queueQuery(queue);
+
+         /*
+         * If it exists then we know it is a subscription so we set the capabilities on the source so we can delete on a
+         * link remote close.
+         * */
+         if (exists) {
+            source = new org.apache.qpid.proton.amqp.messaging.Source();
+            source.setAddress(queue);
+            source.setDurable(TerminusDurability.UNSETTLED_STATE);
+            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+            source.setDistributionMode(COPY);
+            source.setCapabilities(TOPIC);
+            sender.setSource(source);
+         }
+         else {
+            sender.setCondition(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + sender.getName()));
+            sender.close();
+         }
+      }
+      else {
          if (source.getDynamic()) {
             //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
             // will be deleted on closing of the session
@@ -141,7 +179,36 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
          else {
             //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
             //be a queue bound to it so we nee to check this.
-            queue = source.getAddress();
+
+
+            if (isPubSub) {
+               // if we are a subscription and durable create a durable queue using the container id and link name
+               if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
+                                TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
+                  String clientId = connection.getRemoteContainer();
+                  String pubId = sender.getName();
+                  queue = clientId + ":" + pubId;
+                  boolean exists = sessionSPI.queueQuery(queue);
+                  if (!exists) {
+                     sessionSPI.createDurableQueue(source.getAddress(), queue);
+                  }
+               }
+               //otherwise we are a volatile subscription
+               else {
+                  queue = java.util.UUID.randomUUID().toString();
+                  try {
+                     sessionSPI.createTemporaryQueue(source.getAddress(), queue);
+                  }
+                  catch (Exception e) {
+                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
+                  }
+                  source.setAddress(queue);
+               }
+
+            }
+            else {
+               queue = source.getAddress();
+            }
             if (queue == null) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
             }
@@ -156,7 +223,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
             }
          }
 
-         boolean browseOnly = source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
+         boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
          try {
             brokerConsumer = sessionSPI.createSender(this, queue, selector, browseOnly);
          }
@@ -166,6 +233,12 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
       }
    }
 
+   private boolean isPubSub(Source source) {
+      String pubSubPrefix = sessionSPI.getPubSubPrefix();
+      return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix);
+   }
+
+
    /*
    * close the session
    * */
@@ -185,10 +258,23 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
    * close the session
    * */
    @Override
-   public void close() throws ActiveMQAMQPException {
-      super.close();
+   public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
+      super.close(remoteLinkClose);
+
       try {
          sessionSPI.closeSender(brokerConsumer);
+         //if this is a link close rather than a connection close or detach, we need to delete any durable resources for
+         // say pub subs
+         if (remoteLinkClose ) {
+            Source source = (Source)sender.getSource();
+            if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) {
+               String address = source.getAddress();
+               boolean exists = sessionSPI.queueQuery(address);
+               if (exists) {
+                  sessionSPI.deleteQueue(address);
+               }
+            }
+         }
       }
       catch (Exception e) {
          e.printStackTrace();
@@ -277,4 +363,17 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
       return performSend(serverMessage, message);
    }
 
+   private static boolean hasCapabilities(Symbol symbol, Source source) {
+      if (source != null) {
+         if (source.getCapabilities() != null) {
+            for (Symbol cap : source.getCapabilities()) {
+               if (symbol.equals(cap)) {
+                  return true;
+               }
+            }
+         }
+      }
+      return false;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
index 3578926..1e5839c 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
@@ -71,6 +71,26 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
    }
 
    @Override
+   public void createDurableQueue(String address, String queueName) throws Exception {
+
+   }
+
+   @Override
+   public void createTemporaryQueue(String address, String queueName) throws Exception {
+
+   }
+
+   @Override
+   public void deleteQueue(String address) throws Exception {
+
+   }
+
+   @Override
+   public String getPubSubPrefix() {
+      return null;
+   }
+
+   @Override
    public void onFlowConsumer(Object consumer, int credits, boolean drain) {
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/docs/user-manual/en/protocols-interoperability.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/protocols-interoperability.md b/docs/user-manual/en/protocols-interoperability.md
index 5baa7cb..65bacef 100644
--- a/docs/user-manual/en/protocols-interoperability.md
+++ b/docs/user-manual/en/protocols-interoperability.md
@@ -86,6 +86,21 @@ does not exist then an exception will be sent
 > For the next version we will add a flag to aut create durable queue
 > but for now you will have to add them via the configuration
 
+### AMQP and Topics
+ 
+Although amqp has no notion of topics it is still possible to treat amqp consumers or receivers as subscriptions rather 
+than just consumers on a queue. By default any receiving link that attaches to an address with the prefix `jms.topic.` 
+will be treated as a subscription and a subscription queue will be created. If the Terminus Durability is either UNSETTLED_STATE
+or CONFIGURATION then the queue will be made durable, similar to a JMS durable subscription and given a name made up from 
+the container id and the link name, something like `my-container-id:my-link-name`. if the Terminus Durability is configured 
+as NONE then a volatile queue will be created.
+
+The prefix can be changed by configuring the Acceptor and setting the `pubSubPrefix` like so
+  
+> <acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP;pubSubPrefix=foo.bar.</acceptor>
+
+Artemis also supports the qpid-jms client and will respect its use of topics regardless of the prefix used for the address. 
+
 ### AMQP and Coordinations - Handling Transactions
 
 An AMQP links target can also be a Coordinator, the Coordinator is used

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index e3106fc..90f5425 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -142,6 +142,18 @@
       </dependency>
       <dependency>
          <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-proton-plug</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-proton-plug</artifactId>
+         <version>${project.version}</version>
+         <scope>test</scope>
+         <type>test-jar</type>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
          <artifactId>artemis-hornetq-protocol</artifactId>
          <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java
new file mode 100644
index 0000000..bf4e38c
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.proton;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.proton.plug.AMQPClientConnectionContext;
+import org.proton.plug.AMQPClientReceiverContext;
+import org.proton.plug.AMQPClientSessionContext;
+import org.proton.plug.test.Constants;
+import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+
+public class ProtonPubSubTest extends ActiveMQTestBase {
+   private final String prefix = "foo.bar.";
+   private final String pubAddress = "pubAddress";
+   private final String prefixedPubAddress = prefix + "pubAddress";
+   private final SimpleString ssPubAddress = new SimpleString(pubAddress);
+   private final SimpleString ssprefixedPubAddress = new SimpleString(prefixedPubAddress);
+   private ActiveMQServer server;
+   private Connection connection;
+   private JmsConnectionFactory factory;
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      disableCheckThread();
+      server = this.createServer(true, true);
+      HashMap<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.PORT_PROP_NAME, "5672");
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
+      HashMap<String, Object> extraParams = new HashMap<>();
+      extraParams.put("pubSubPrefix", prefix);
+      TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "foo", extraParams);
+
+      server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
+      server.start();
+      server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true);
+      server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
+      factory = new JmsConnectionFactory("amqp://localhost:5672");
+      factory.setClientID("myClientID");
+      connection = factory.createConnection();
+      connection.setExceptionListener(new ExceptionListener() {
+         @Override
+         public void onException(JMSException exception) {
+            exception.printStackTrace();
+         }
+      });
+
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         Thread.sleep(250);
+         if (connection != null) {
+            connection.close();
+         }
+
+         server.stop();
+      }
+      finally {
+         super.tearDown();
+      }
+   }
+
+   @Test
+   public void testNonDurablePubSub() throws Exception {
+      int numMessages = 100;
+      Topic topic = createTopic(pubAddress);
+      TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer sub = session.createSubscriber(topic);
+
+      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = sendSession.createProducer(topic);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage receive = (TextMessage) sub.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+      }
+   }
+
+   @Test
+   public void testNonDurableMultiplePubSub() throws Exception {
+      int numMessages = 100;
+      Topic topic = createTopic(pubAddress);
+      TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer sub = session.createSubscriber(topic);
+      MessageConsumer sub2 = session.createSubscriber(topic);
+      MessageConsumer sub3 = session.createSubscriber(topic);
+
+      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = sendSession.createProducer(topic);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage receive = (TextMessage) sub.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+         receive = (TextMessage) sub2.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+         receive = (TextMessage) sub3.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+      }
+   }
+
+
+   @Test
+   public void testDurablePubSub() throws Exception {
+      int numMessages = 100;
+      Topic topic = createTopic(pubAddress);
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+
+      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = sendSession.createProducer(topic);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage receive = (TextMessage) sub.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+      }
+   }
+
+   @Test
+   public void testDurableMultiplePubSub() throws Exception {
+      int numMessages = 100;
+      Topic topic = createTopic(pubAddress);
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+      TopicSubscriber sub2 = session.createDurableSubscriber(topic, "myPubId2");
+      TopicSubscriber sub3 = session.createDurableSubscriber(topic, "myPubId3");
+
+      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = sendSession.createProducer(topic);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage receive = (TextMessage) sub.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+         receive = (TextMessage) sub2.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+         receive = (TextMessage) sub3.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+      }
+   }
+
+   @Test
+   public void testDurablePubSubReconnect() throws Exception {
+      int numMessages = 100;
+      Topic topic = createTopic(pubAddress);
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+
+      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = sendSession.createProducer(topic);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage receive = (TextMessage) sub.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+      }
+      connection.close();
+      connection = factory.createConnection();
+      connection.setExceptionListener(new ExceptionListener() {
+         @Override
+         public void onException(JMSException exception) {
+            exception.printStackTrace();
+         }
+      });
+      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      sub = session.createDurableSubscriber(topic, "myPubId");
+
+      sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      producer = sendSession.createProducer(topic);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage receive = (TextMessage) sub.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+      }
+   }
+
+   @Test
+   public void testDurablePubSubUnsubscribe() throws Exception {
+      int numMessages = 100;
+      Topic topic = createTopic(pubAddress);
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+
+      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = sendSession.createProducer(topic);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage receive = (TextMessage) sub.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+      }
+      sub.close();
+      session.unsubscribe("myPubId");
+   }
+
+
+   @Test
+   public void testPubSubWithSimpleClient() throws Exception {
+      SimpleAMQPConnector connector = new SimpleAMQPConnector();
+      connector.start();
+      AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT);
+
+      clientConnection.setContainer("myContainerID");
+
+      clientConnection.clientOpen(null);
+
+      AMQPClientSessionContext clientSession = clientConnection.createClientSession();
+      AMQPClientReceiverContext receiver = clientSession.createReceiver(prefixedPubAddress);
+      int numMessages = 100;
+      Topic topic = createTopic(prefixedPubAddress);
+      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = sendSession.createProducer(topic);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+
+      receiver.flow(100);
+      for (int i = 0; i < numMessages; i++) {
+         ProtonJMessage protonJMessage = receiver.receiveMessage(5000, TimeUnit.MILLISECONDS);
+         assertNotNull(protonJMessage);
+         assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i);
+      }
+
+   }
+
+
+   @Test
+   public void testMultiplePubSubWithSimpleClient() throws Exception {
+      SimpleAMQPConnector connector = new SimpleAMQPConnector();
+      connector.start();
+      AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT);
+
+      clientConnection.setContainer("myContainerID");
+
+      clientConnection.clientOpen(null);
+
+      AMQPClientSessionContext clientSession = clientConnection.createClientSession();
+      AMQPClientReceiverContext receiver = clientSession.createReceiver("sub1", prefixedPubAddress);
+      AMQPClientReceiverContext receiver2 = clientSession.createReceiver("sub2", prefixedPubAddress);
+      AMQPClientReceiverContext receiver3 = clientSession.createReceiver("sub3", prefixedPubAddress);
+      int numMessages = 100;
+      Topic topic = createTopic(prefixedPubAddress);
+      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = sendSession.createProducer(topic);
+      receiver.flow(100);
+      receiver2.flow(100);
+      receiver3.flow(100);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         ProtonJMessage protonJMessage = receiver.receiveMessage(5000, TimeUnit.MILLISECONDS);
+         assertNotNull("did not get message " + i, protonJMessage);
+         assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i);
+         protonJMessage = receiver2.receiveMessage(5000, TimeUnit.MILLISECONDS);
+         assertNotNull("did not get message " + i, protonJMessage);
+         assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i);
+         protonJMessage = receiver3.receiveMessage(5000, TimeUnit.MILLISECONDS);
+         assertNotNull("did not get message " + i, protonJMessage);
+         assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i);
+      }
+
+   }
+
+
+   private javax.jms.Topic createTopic(String address) throws Exception {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      try {
+         return session.createTopic(address);
+      }
+      finally {
+         session.close();
+      }
+   }
+}


[2/2] activemq-artemis git commit: This closes #534

Posted by cl...@apache.org.
This closes #534


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ecd9c136
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ecd9c136
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ecd9c136

Branch: refs/heads/master
Commit: ecd9c13625de96d091bb0f56ae878682451db2d7
Parents: cd08888 73f908b
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue May 24 12:06:24 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue May 24 12:06:24 2016 -0400

----------------------------------------------------------------------
 .../protocol/proton/ProtonProtocolManager.java  |  16 +
 .../plug/ProtonSessionIntegrationCallback.java  |  20 ++
 .../plug/AMQPClientConnectionContext.java       |   2 +
 .../proton/plug/AMQPClientSessionContext.java   |   2 +
 .../org/proton/plug/AMQPSessionCallback.java    |   7 +
 .../plug/context/AbstractConnectionContext.java |  22 +-
 .../context/AbstractProtonContextSender.java    |   4 +-
 .../context/AbstractProtonReceiverContext.java  |   4 +-
 .../context/AbstractProtonSessionContext.java   |   6 +-
 .../plug/context/ProtonDeliveryHandler.java     |   6 +-
 .../plug/context/ProtonTransactionHandler.java  |   2 +-
 .../client/ProtonClientConnectionContext.java   |   5 +
 .../client/ProtonClientSessionContext.java      |   7 +-
 .../server/ProtonServerSenderContext.java       | 133 ++++++-
 .../test/minimalserver/MinimalSessionSPI.java   |  20 ++
 .../en/protocols-interoperability.md            |  15 +
 tests/integration-tests/pom.xml                 |  12 +
 .../integration/proton/ProtonPubSubTest.java    | 351 +++++++++++++++++++
 18 files changed, 606 insertions(+), 28 deletions(-)
----------------------------------------------------------------------