You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/05/24 16:06:41 UTC
[1/2] activemq-artemis git commit: ARTEMIS-535 - Improve amqp
protocol to support topics
Repository: activemq-artemis
Updated Branches:
refs/heads/master cd088888b -> ecd9c1362
ARTEMIS-535 - Improve amqp protocol to support topics
https://issues.apache.org/jira/browse/ARTEMIS-535
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/73f908b8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/73f908b8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/73f908b8
Branch: refs/heads/master
Commit: 73f908b8b495e0f66c310cc30b6e6744bb288c4e
Parents: cd08888
Author: Andy Taylor <an...@gmail.com>
Authored: Tue May 24 11:59:04 2016 +0100
Committer: Andy Taylor <an...@gmail.com>
Committed: Tue May 24 13:26:24 2016 +0100
----------------------------------------------------------------------
.../protocol/proton/ProtonProtocolManager.java | 16 +
.../plug/ProtonSessionIntegrationCallback.java | 20 ++
.../plug/AMQPClientConnectionContext.java | 2 +
.../proton/plug/AMQPClientSessionContext.java | 2 +
.../org/proton/plug/AMQPSessionCallback.java | 7 +
.../plug/context/AbstractConnectionContext.java | 22 +-
.../context/AbstractProtonContextSender.java | 4 +-
.../context/AbstractProtonReceiverContext.java | 4 +-
.../context/AbstractProtonSessionContext.java | 6 +-
.../plug/context/ProtonDeliveryHandler.java | 6 +-
.../plug/context/ProtonTransactionHandler.java | 2 +-
.../client/ProtonClientConnectionContext.java | 5 +
.../client/ProtonClientSessionContext.java | 7 +-
.../server/ProtonServerSenderContext.java | 133 ++++++-
.../test/minimalserver/MinimalSessionSPI.java | 20 ++
.../en/protocols-interoperability.md | 15 +
tests/integration-tests/pom.xml | 12 +
.../integration/proton/ProtonPubSubTest.java | 351 +++++++++++++++++++
18 files changed, 606 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
index 40eb175..1b1699f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnectio
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
+import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -54,6 +55,12 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
private final ProtonProtocolManagerFactory factory;
+ /*
+ * used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
+ * the address. This can be changed on the acceptor.
+ * */
+ private String pubSubPrefix = ActiveMQTopic.JMS_TOPIC_ADDRESS_PREFIX;
+
public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
this.factory = factory;
this.server = server;
@@ -139,4 +146,13 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
}
+ public String getPubSubPrefix() {
+ return pubSubPrefix;
+ }
+
+ public void setPubSubPrefix(String pubSubPrefix) {
+ this.pubSubPrefix = pubSubPrefix;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 2350f9d..5b73acf 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -185,6 +185,16 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
}
@Override
+ public void createTemporaryQueue(String address, String queueName) throws Exception {
+ serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true);
+ }
+
+ @Override
+ public void createDurableQueue(String address, String queueName) throws Exception {
+ serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true);
+ }
+
+ @Override
public boolean queueQuery(String queueName) throws Exception {
boolean queryResult = false;
@@ -360,6 +370,16 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
}
}
+ @Override
+ public String getPubSubPrefix() {
+ return manager.getPubSubPrefix();
+ }
+
+ @Override
+ public void deleteQueue(String address) throws Exception {
+ manager.getServer().destroyQueue(new SimpleString(address));
+ }
+
private void resetContext() {
manager.getServer().getStorageManager().setContext(null);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java
index 786d0d7..1abd96f 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java
@@ -31,4 +31,6 @@ public interface AMQPClientConnectionContext extends AMQPConnectionContext {
void clientOpen(ClientSASL sasl) throws Exception;
AMQPClientSessionContext createClientSession() throws ActiveMQAMQPException;
+
+ void setContainer(String containerID);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
index 6cd0aa7..b518474 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
@@ -23,4 +23,6 @@ public interface AMQPClientSessionContext extends AMQPSessionContext {
AMQPClientSenderContext createSender(String address, boolean preSettled) throws ActiveMQAMQPException;
AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException;
+
+ AMQPClientReceiverContext createReceiver(String name, String address) throws ActiveMQAMQPException;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
index 0c0dbe0..630761f 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
@@ -40,6 +40,12 @@ public interface AMQPSessionCallback {
void createTemporaryQueue(String queueName) throws Exception;
+ void createTemporaryQueue(String address, String queueName) throws Exception;
+
+ void createDurableQueue(String address, String queueName) throws Exception;
+
+ void deleteQueue(String address) throws Exception;
+
boolean queueQuery(String queueName) throws Exception;
void closeSender(Object brokerConsumer) throws Exception;
@@ -82,4 +88,5 @@ public interface AMQPSessionCallback {
int messageFormat,
ByteBuf messageEncoded) throws Exception;
+ String getPubSubPrefix();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
index 262dc2a..34e1873 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
@@ -32,6 +32,7 @@ import org.apache.qpid.proton.engine.Transport;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.SASLResult;
+import org.proton.plug.context.server.ProtonServerSenderContext;
import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.handler.ProtonHandler;
import org.proton.plug.handler.impl.DefaultEventHandler;
@@ -163,6 +164,14 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
}
}
+ public String getRemoteContainer() {
+ return handler.getConnection().getRemoteContainer();
+ }
+
+ public String getPubSubPrefix() {
+ return null;
+ }
+
// This listener will perform a bunch of things here
class LocalListener extends DefaultEventHandler {
@@ -265,7 +274,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
link.close();
ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
if (linkContext != null) {
- linkContext.close();
+ linkContext.close(true);
}
}
@@ -275,6 +284,15 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
}
@Override
+ public void onDetach(Link link) throws Exception {
+ Object context = link.getContext();
+ if (context instanceof ProtonServerSenderContext) {
+ ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
+ senderContext.close(false);
+ }
+ }
+
+ @Override
public void onDelivery(Delivery delivery) throws Exception {
ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext();
if (handler != null) {
@@ -289,4 +307,6 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
}
+
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
index 6b209b8..29e3459 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
@@ -67,7 +67,7 @@ public abstract class AbstractProtonContextSender extends ProtonInitializable im
* close the session
* */
@Override
- public void close() throws ActiveMQAMQPException {
+ public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
closed = true;
protonSession.removeSender(sender);
synchronized (connection.getLock()) {
@@ -84,7 +84,7 @@ public abstract class AbstractProtonContextSender extends ProtonInitializable im
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
closed = true;
sender.setCondition(condition);
- close();
+ close(false);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
index 4286140..ffc08d3 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
@@ -53,14 +53,14 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
}
@Override
- public void close() throws ActiveMQAMQPException {
+ public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
protonSession.removeReceiver(receiver);
}
@Override
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
receiver.setCondition(condition);
- close();
+ close(false);
}
public void flow(int credits) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
index abb3115..5b22944 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
@@ -85,7 +85,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i
AbstractProtonContextSender protonConsumer = senders.remove(consumer);
if (protonConsumer != null) {
try {
- protonConsumer.close();
+ protonConsumer.close(false);
}
catch (ActiveMQAMQPException e) {
protonConsumer.getSender().setTarget(null);
@@ -116,7 +116,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i
for (AbstractProtonReceiverContext protonProducer : receiversCopy) {
try {
- protonProducer.close();
+ protonProducer.close(false);
}
catch (Exception e) {
e.printStackTrace();
@@ -130,7 +130,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i
for (AbstractProtonContextSender protonConsumer : protonSendersClone) {
try {
- protonConsumer.close();
+ protonConsumer.close(false);
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
index ad7ff4f..d861394 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
@@ -29,7 +29,11 @@ public interface ProtonDeliveryHandler {
void onMessage(Delivery delivery) throws ActiveMQAMQPException;
- void close() throws ActiveMQAMQPException;
+ /*
+ * we have to distinguish between a remote close on the link and a close via a connection or session as the latter mean
+ * that a link reattach can happen and we need to keep the underlying resource (queue/subscription) around for pub subs
+ * */
+ void close(boolean remoteLinkClose) throws ActiveMQAMQPException;
void close(ErrorCondition condition) throws ActiveMQAMQPException;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
index 1b32b32..e768bb4 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
@@ -116,7 +116,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
}
@Override
- public void close() throws ActiveMQAMQPException {
+ public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
//noop
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
index 76a7da9..f4a43c1 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
@@ -83,6 +83,11 @@ public class ProtonClientConnectionContext extends AbstractConnectionContext imp
}
@Override
+ public void setContainer(String containerID) {
+ handler.getConnection().setContainer(containerID);
+ }
+
+ @Override
protected AbstractProtonSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException {
AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this);
AbstractProtonSessionContext protonSession = new ProtonClientSessionContext(sessionSPI, this, realSession);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
index 3b07a40..b3e96bb 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
@@ -64,12 +64,17 @@ public class ProtonClientSessionContext extends AbstractProtonSessionContext imp
@Override
public AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException {
+ return createReceiver(address, address);
+ }
+
+ @Override
+ public AMQPClientReceiverContext createReceiver(String name, String address) throws ActiveMQAMQPException {
FutureRunnable futureRunnable = new FutureRunnable(1);
ProtonClientReceiverContext amqpReceiver;
synchronized (connection.getLock()) {
- Receiver receiver = session.receiver(address);
+ Receiver receiver = session.receiver(name);
Source source = new Source();
source.setAddress(address);
receiver.setSource(source);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
index ae1caa4..13b50e5 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
@@ -26,6 +26,8 @@ import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@@ -50,6 +52,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector");
private static final Symbol COPY = Symbol.valueOf("copy");
+ private static final Symbol TOPIC = Symbol.valueOf("topic");
private Object brokerConsumer;
@@ -81,7 +84,10 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
//todo add flow control
try {
// to do whatever you need to make the broker start sending messages to the consumer
- sessionSPI.startSender(brokerConsumer);
+ //this could be null if a link reattach has happened
+ if (brokerConsumer != null) {
+ sessionSPI.startSender(brokerConsumer);
+ }
//protonSession.getServerSession().receiveConsumerCredits(consumerID, -1);
}
catch (Exception e) {
@@ -105,26 +111,58 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
/*
* even tho the filter is a map it will only return a single filter unless a nolocal is also provided
* */
- Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
- if (filter != null) {
- selector = filter.getValue().getDescribed().toString();
- // Validate the Selector.
- try {
- SelectorParser.parse(selector);
- }
- catch (FilterException e) {
- close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
- return;
+ if (source != null) {
+ Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
+ if (filter != null) {
+ selector = filter.getValue().getDescribed().toString();
+ // Validate the Selector.
+ try {
+ SelectorParser.parse(selector);
+ }
+ catch (FilterException e) {
+ close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
+ return;
+ }
}
}
+ /*
+ * if we have a capability for a topic (qpid-jms) or we are configured on this address to act like a topic then act
+ * like a subscription.
+ * */
+ boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
+
//filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
//if (filter != null) {
//todo implement nolocal filter
//}
-
- if (source != null) {
+ if (source == null) {
+ // Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue
+ String clientId = connection.getRemoteContainer();
+ String pubId = sender.getName();
+ queue = clientId + ":" + pubId;
+ boolean exists = sessionSPI.queueQuery(queue);
+
+ /*
+ * If it exists then we know it is a subscription so we set the capabilities on the source so we can delete on a
+ * link remote close.
+ * */
+ if (exists) {
+ source = new org.apache.qpid.proton.amqp.messaging.Source();
+ source.setAddress(queue);
+ source.setDurable(TerminusDurability.UNSETTLED_STATE);
+ source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ source.setDistributionMode(COPY);
+ source.setCapabilities(TOPIC);
+ sender.setSource(source);
+ }
+ else {
+ sender.setCondition(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + sender.getName()));
+ sender.close();
+ }
+ }
+ else {
if (source.getDynamic()) {
//if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
// will be deleted on closing of the session
@@ -141,7 +179,36 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
else {
//if not dynamic then we use the targets address as the address to forward the messages to, however there has to
//be a queue bound to it so we nee to check this.
- queue = source.getAddress();
+
+
+ if (isPubSub) {
+ // if we are a subscription and durable create a durable queue using the container id and link name
+ if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
+ TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
+ String clientId = connection.getRemoteContainer();
+ String pubId = sender.getName();
+ queue = clientId + ":" + pubId;
+ boolean exists = sessionSPI.queueQuery(queue);
+ if (!exists) {
+ sessionSPI.createDurableQueue(source.getAddress(), queue);
+ }
+ }
+ //otherwise we are a volatile subscription
+ else {
+ queue = java.util.UUID.randomUUID().toString();
+ try {
+ sessionSPI.createTemporaryQueue(source.getAddress(), queue);
+ }
+ catch (Exception e) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
+ }
+ source.setAddress(queue);
+ }
+
+ }
+ else {
+ queue = source.getAddress();
+ }
if (queue == null) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
}
@@ -156,7 +223,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
}
}
- boolean browseOnly = source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
+ boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
try {
brokerConsumer = sessionSPI.createSender(this, queue, selector, browseOnly);
}
@@ -166,6 +233,12 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
}
}
+ private boolean isPubSub(Source source) {
+ String pubSubPrefix = sessionSPI.getPubSubPrefix();
+ return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix);
+ }
+
+
/*
* close the session
* */
@@ -185,10 +258,23 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
* close the session
* */
@Override
- public void close() throws ActiveMQAMQPException {
- super.close();
+ public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
+ super.close(remoteLinkClose);
+
try {
sessionSPI.closeSender(brokerConsumer);
+ //if this is a link close rather than a connection close or detach, we need to delete any durable resources for
+ // say pub subs
+ if (remoteLinkClose ) {
+ Source source = (Source)sender.getSource();
+ if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) {
+ String address = source.getAddress();
+ boolean exists = sessionSPI.queueQuery(address);
+ if (exists) {
+ sessionSPI.deleteQueue(address);
+ }
+ }
+ }
}
catch (Exception e) {
e.printStackTrace();
@@ -277,4 +363,17 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
return performSend(serverMessage, message);
}
+ private static boolean hasCapabilities(Symbol symbol, Source source) {
+ if (source != null) {
+ if (source.getCapabilities() != null) {
+ for (Symbol cap : source.getCapabilities()) {
+ if (symbol.equals(cap)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
index 3578926..1e5839c 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
@@ -71,6 +71,26 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
}
@Override
+ public void createDurableQueue(String address, String queueName) throws Exception {
+
+ }
+
+ @Override
+ public void createTemporaryQueue(String address, String queueName) throws Exception {
+
+ }
+
+ @Override
+ public void deleteQueue(String address) throws Exception {
+
+ }
+
+ @Override
+ public String getPubSubPrefix() {
+ return null;
+ }
+
+ @Override
public void onFlowConsumer(Object consumer, int credits, boolean drain) {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/docs/user-manual/en/protocols-interoperability.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/protocols-interoperability.md b/docs/user-manual/en/protocols-interoperability.md
index 5baa7cb..65bacef 100644
--- a/docs/user-manual/en/protocols-interoperability.md
+++ b/docs/user-manual/en/protocols-interoperability.md
@@ -86,6 +86,21 @@ does not exist then an exception will be sent
> For the next version we will add a flag to aut create durable queue
> but for now you will have to add them via the configuration
+### AMQP and Topics
+
+Although amqp has no notion of topics it is still possible to treat amqp consumers or receivers as subscriptions rather
+than just consumers on a queue. By default any receiving link that attaches to an address with the prefix `jms.topic.`
+will be treated as a subscription and a subscription queue will be created. If the Terminus Durability is either UNSETTLED_STATE
+or CONFIGURATION then the queue will be made durable, similar to a JMS durable subscription and given a name made up from
+the container id and the link name, something like `my-container-id:my-link-name`. if the Terminus Durability is configured
+as NONE then a volatile queue will be created.
+
+The prefix can be changed by configuring the Acceptor and setting the `pubSubPrefix` like so
+
+> <acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP;pubSubPrefix=foo.bar.</acceptor>
+
+Artemis also supports the qpid-jms client and will respect its use of topics regardless of the prefix used for the address.
+
### AMQP and Coordinations - Handling Transactions
An AMQP links target can also be a Coordinator, the Coordinator is used
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index e3106fc..90f5425 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -142,6 +142,18 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-proton-plug</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-proton-plug</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
<artifactId>artemis-hornetq-protocol</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73f908b8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java
new file mode 100644
index 0000000..bf4e38c
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.proton;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.proton.plug.AMQPClientConnectionContext;
+import org.proton.plug.AMQPClientReceiverContext;
+import org.proton.plug.AMQPClientSessionContext;
+import org.proton.plug.test.Constants;
+import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+
+public class ProtonPubSubTest extends ActiveMQTestBase {
+ private final String prefix = "foo.bar.";
+ private final String pubAddress = "pubAddress";
+ private final String prefixedPubAddress = prefix + "pubAddress";
+ private final SimpleString ssPubAddress = new SimpleString(pubAddress);
+ private final SimpleString ssprefixedPubAddress = new SimpleString(prefixedPubAddress);
+ private ActiveMQServer server;
+ private Connection connection;
+ private JmsConnectionFactory factory;
+
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ disableCheckThread();
+ server = this.createServer(true, true);
+ HashMap<String, Object> params = new HashMap<>();
+ params.put(TransportConstants.PORT_PROP_NAME, "5672");
+ params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
+ HashMap<String, Object> extraParams = new HashMap<>();
+ extraParams.put("pubSubPrefix", prefix);
+ TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "foo", extraParams);
+
+ server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
+ server.start();
+ server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true);
+ server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
+ factory = new JmsConnectionFactory("amqp://localhost:5672");
+ factory.setClientID("myClientID");
+ connection = factory.createConnection();
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ exception.printStackTrace();
+ }
+ });
+
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ try {
+ Thread.sleep(250);
+ if (connection != null) {
+ connection.close();
+ }
+
+ server.stop();
+ }
+ finally {
+ super.tearDown();
+ }
+ }
+
+ @Test
+ public void testNonDurablePubSub() throws Exception {
+ int numMessages = 100;
+ Topic topic = createTopic(pubAddress);
+ TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer sub = session.createSubscriber(topic);
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(topic);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage receive = (TextMessage) sub.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ }
+ }
+
+ @Test
+ public void testNonDurableMultiplePubSub() throws Exception {
+ int numMessages = 100;
+ Topic topic = createTopic(pubAddress);
+ TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer sub = session.createSubscriber(topic);
+ MessageConsumer sub2 = session.createSubscriber(topic);
+ MessageConsumer sub3 = session.createSubscriber(topic);
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(topic);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage receive = (TextMessage) sub.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ receive = (TextMessage) sub2.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ receive = (TextMessage) sub3.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ }
+ }
+
+
+ @Test
+ public void testDurablePubSub() throws Exception {
+ int numMessages = 100;
+ Topic topic = createTopic(pubAddress);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(topic);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage receive = (TextMessage) sub.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ }
+ }
+
+ @Test
+ public void testDurableMultiplePubSub() throws Exception {
+ int numMessages = 100;
+ Topic topic = createTopic(pubAddress);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+ TopicSubscriber sub2 = session.createDurableSubscriber(topic, "myPubId2");
+ TopicSubscriber sub3 = session.createDurableSubscriber(topic, "myPubId3");
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(topic);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage receive = (TextMessage) sub.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ receive = (TextMessage) sub2.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ receive = (TextMessage) sub3.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ }
+ }
+
+ @Test
+ public void testDurablePubSubReconnect() throws Exception {
+ int numMessages = 100;
+ Topic topic = createTopic(pubAddress);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(topic);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage receive = (TextMessage) sub.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ }
+ connection.close();
+ connection = factory.createConnection();
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ exception.printStackTrace();
+ }
+ });
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ sub = session.createDurableSubscriber(topic, "myPubId");
+
+ sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = sendSession.createProducer(topic);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage receive = (TextMessage) sub.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ }
+ }
+
+ @Test
+ public void testDurablePubSubUnsubscribe() throws Exception {
+ int numMessages = 100;
+ Topic topic = createTopic(pubAddress);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(topic);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage receive = (TextMessage) sub.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ }
+ sub.close();
+ session.unsubscribe("myPubId");
+ }
+
+
+ @Test
+ public void testPubSubWithSimpleClient() throws Exception {
+ SimpleAMQPConnector connector = new SimpleAMQPConnector();
+ connector.start();
+ AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT);
+
+ clientConnection.setContainer("myContainerID");
+
+ clientConnection.clientOpen(null);
+
+ AMQPClientSessionContext clientSession = clientConnection.createClientSession();
+ AMQPClientReceiverContext receiver = clientSession.createReceiver(prefixedPubAddress);
+ int numMessages = 100;
+ Topic topic = createTopic(prefixedPubAddress);
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(topic);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+
+ receiver.flow(100);
+ for (int i = 0; i < numMessages; i++) {
+ ProtonJMessage protonJMessage = receiver.receiveMessage(5000, TimeUnit.MILLISECONDS);
+ assertNotNull(protonJMessage);
+ assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i);
+ }
+
+ }
+
+
+ @Test
+ public void testMultiplePubSubWithSimpleClient() throws Exception {
+ SimpleAMQPConnector connector = new SimpleAMQPConnector();
+ connector.start();
+ AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT);
+
+ clientConnection.setContainer("myContainerID");
+
+ clientConnection.clientOpen(null);
+
+ AMQPClientSessionContext clientSession = clientConnection.createClientSession();
+ AMQPClientReceiverContext receiver = clientSession.createReceiver("sub1", prefixedPubAddress);
+ AMQPClientReceiverContext receiver2 = clientSession.createReceiver("sub2", prefixedPubAddress);
+ AMQPClientReceiverContext receiver3 = clientSession.createReceiver("sub3", prefixedPubAddress);
+ int numMessages = 100;
+ Topic topic = createTopic(prefixedPubAddress);
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(topic);
+ receiver.flow(100);
+ receiver2.flow(100);
+ receiver3.flow(100);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ ProtonJMessage protonJMessage = receiver.receiveMessage(5000, TimeUnit.MILLISECONDS);
+ assertNotNull("did not get message " + i, protonJMessage);
+ assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i);
+ protonJMessage = receiver2.receiveMessage(5000, TimeUnit.MILLISECONDS);
+ assertNotNull("did not get message " + i, protonJMessage);
+ assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i);
+ protonJMessage = receiver3.receiveMessage(5000, TimeUnit.MILLISECONDS);
+ assertNotNull("did not get message " + i, protonJMessage);
+ assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i);
+ }
+
+ }
+
+
+ private javax.jms.Topic createTopic(String address) throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try {
+ return session.createTopic(address);
+ }
+ finally {
+ session.close();
+ }
+ }
+}
[2/2] activemq-artemis git commit: This closes #534
Posted by cl...@apache.org.
This closes #534
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ecd9c136
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ecd9c136
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ecd9c136
Branch: refs/heads/master
Commit: ecd9c13625de96d091bb0f56ae878682451db2d7
Parents: cd08888 73f908b
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue May 24 12:06:24 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue May 24 12:06:24 2016 -0400
----------------------------------------------------------------------
.../protocol/proton/ProtonProtocolManager.java | 16 +
.../plug/ProtonSessionIntegrationCallback.java | 20 ++
.../plug/AMQPClientConnectionContext.java | 2 +
.../proton/plug/AMQPClientSessionContext.java | 2 +
.../org/proton/plug/AMQPSessionCallback.java | 7 +
.../plug/context/AbstractConnectionContext.java | 22 +-
.../context/AbstractProtonContextSender.java | 4 +-
.../context/AbstractProtonReceiverContext.java | 4 +-
.../context/AbstractProtonSessionContext.java | 6 +-
.../plug/context/ProtonDeliveryHandler.java | 6 +-
.../plug/context/ProtonTransactionHandler.java | 2 +-
.../client/ProtonClientConnectionContext.java | 5 +
.../client/ProtonClientSessionContext.java | 7 +-
.../server/ProtonServerSenderContext.java | 133 ++++++-
.../test/minimalserver/MinimalSessionSPI.java | 20 ++
.../en/protocols-interoperability.md | 15 +
tests/integration-tests/pom.xml | 12 +
.../integration/proton/ProtonPubSubTest.java | 351 +++++++++++++++++++
18 files changed, 606 insertions(+), 28 deletions(-)
----------------------------------------------------------------------