You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/10/07 14:38:15 UTC
[1/2] activemq-artemis git commit: This closes #814
Repository: activemq-artemis
Updated Branches:
refs/heads/master 20729e79f -> 6f6d9845f
This closes #814
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6f6d9845
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6f6d9845
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6f6d9845
Branch: refs/heads/master
Commit: 6f6d9845fe9f5a0b54a3bd0cda8a541feb150995
Parents: 20729e7 95c4fdd
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Oct 7 10:38:01 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 10:38:01 2016 -0400
----------------------------------------------------------------------
.../core/management/ActiveMQServerControl.java | 7 ++
.../management/impl/JMSTopicControlImpl.java | 2 +-
.../amqp/broker/AMQPConnectionCallback.java | 2 +-
.../amqp/broker/AMQPSessionCallback.java | 17 ++--
.../ActiveMQProtonRemotingConnection.java | 7 +-
.../amqp/proton/AMQPConnectionContext.java | 7 +-
.../protocol/amqp/proton/AmqpSupport.java | 3 +
.../amqp/proton/ProtonServerSenderContext.java | 11 ++-
.../amqp/proton/handler/ProtonHandler.java | 17 +++-
.../impl/ActiveMQServerControlImpl.java | 10 ++-
.../transport/amqp/client/AmqpSession.java | 14 +++-
.../tests/integration/amqp/ProtonTest.java | 85 +++++++++++++++++++-
.../management/ActiveMQServerControlTest.java | 26 ++++++
.../ActiveMQServerControlUsingCoreTest.java | 5 ++
14 files changed, 190 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-762 Reflect management
changes in AMQP protocol
Posted by cl...@apache.org.
ARTEMIS-762 Reflect management changes in AMQP protocol
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/95c4fdd4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/95c4fdd4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/95c4fdd4
Branch: refs/heads/master
Commit: 95c4fdd4086ae522f1d2edfc537c06f820d10ac1
Parents: 20729e7
Author: Martyn Taylor <mt...@redhat.com>
Authored: Fri Sep 30 16:36:00 2016 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 10:38:01 2016 -0400
----------------------------------------------------------------------
.../core/management/ActiveMQServerControl.java | 7 ++
.../management/impl/JMSTopicControlImpl.java | 2 +-
.../amqp/broker/AMQPConnectionCallback.java | 2 +-
.../amqp/broker/AMQPSessionCallback.java | 17 ++--
.../ActiveMQProtonRemotingConnection.java | 7 +-
.../amqp/proton/AMQPConnectionContext.java | 7 +-
.../protocol/amqp/proton/AmqpSupport.java | 3 +
.../amqp/proton/ProtonServerSenderContext.java | 11 ++-
.../amqp/proton/handler/ProtonHandler.java | 17 +++-
.../impl/ActiveMQServerControlImpl.java | 10 ++-
.../transport/amqp/client/AmqpSession.java | 14 +++-
.../tests/integration/amqp/ProtonTest.java | 85 +++++++++++++++++++-
.../management/ActiveMQServerControlTest.java | 26 ++++++
.../ActiveMQServerControlUsingCoreTest.java | 5 ++
14 files changed, 190 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 02819e5..01a8d74 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -512,6 +512,13 @@ public interface ActiveMQServerControl {
void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name) throws Exception;
/**
+ * Destroys the queue corresponding to the specified name.
+ */
+ @Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION)
+ void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name,
+ @Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers) throws Exception;
+
+ /**
* Enables message counters for this server.
*/
@Operation(desc = "Enable message counters", impact = MBeanOperationInfo.ACTION)
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java
index d8c4179..cd8e4e0 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java
@@ -238,7 +238,7 @@ public class JMSTopicControlImpl extends StandardMBean implements TopicControl {
throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
}
ActiveMQServerControl serverControl = (ActiveMQServerControl) managementService.getResource(ResourceNames.CORE_SERVER);
- serverControl.destroyQueue(queueName);
+ serverControl.destroyQueue(queueName, true);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 31abf87..4ced546 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -117,7 +117,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
server.removeClientConnection(remoteContainerId);
}
connection.close();
- amqpConnection.close();
+ amqpConnection.close(null);
} finally {
for (Transaction tx : transactions.values()) {
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 46ed1c9..c7ca446 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -43,6 +43,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternal
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
@@ -59,12 +60,14 @@ import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.ProtonJMessage;
+import org.jboss.logging.Logger;
public class AMQPSessionCallback implements SessionCallback {
+ private static final Logger logger = Logger.getLogger(AMQPSessionCallback.class);
+
protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
private final AMQPConnectionCallback protonSPI;
@@ -467,9 +470,14 @@ public class AMQPSessionCallback implements SessionCallback {
@Override
public void disconnect(ServerConsumer consumer, String queueName) {
- synchronized (connection.getLock()) {
- ((Link) consumer.getProtocolContext()).close();
- connection.flush();
+ ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
+ try {
+ synchronized (connection.getLock()) {
+ ((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
+ connection.flush();
+ }
+ } catch (ActiveMQAMQPException e) {
+ logger.error("Error closing link for " + consumer.getQueue().getAddress());
}
}
@@ -504,5 +512,4 @@ public class AMQPSessionCallback implements SessionCallback {
protonSPI.removeTransaction(txid);
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
index 7f129a1..039da79 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
@@ -23,8 +23,10 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
/**
* This is a Server's Connection representation used by ActiveMQ Artemis.
@@ -103,6 +105,9 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
@Override
public void disconnect(boolean criticalError) {
+ ErrorCondition errorCondition = new ErrorCondition();
+ errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED);
+ amqpConnection.close(errorCondition);
getTransportConnection().close();
}
@@ -111,7 +116,7 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
*/
@Override
public void disconnect(String scaleDownNodeID, boolean criticalError) {
- getTransportConnection().close();
+ disconnect(criticalError);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 3d79026..70e4fd0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
@@ -132,8 +133,8 @@ public class AMQPConnectionContext extends ProtonInitializable {
handler.flush();
}
- public void close() {
- handler.close();
+ public void close(ErrorCondition errorCondition) {
+ handler.close(errorCondition);
}
protected AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException {
@@ -264,7 +265,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
if (!connectionCallback.isSupportsAnonymous()) {
connectionCallback.sendSASLSupported();
connectionCallback.close();
- handler.close();
+ handler.close(null);
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index 13d7170..7bdbd2e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -53,6 +53,9 @@ public class AmqpSupport {
public static final Symbol PRODUCT = Symbol.valueOf("product");
public static final Symbol VERSION = Symbol.valueOf("version");
public static final Symbol PLATFORM = Symbol.valueOf("platform");
+ public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted");
+ public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced");
+
// Symbols used in configuration of newly opened links.
public static final Symbol COPY = Symbol.getSymbol("copy");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 7ef4944..7d401fa 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -169,7 +169,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue
String clientId = connection.getRemoteContainer();
String pubId = sender.getName();
- queue = clientId + ":" + pubId;
+ queue = createQueueName(clientId, pubId);
boolean exists = sessionSPI.queueQuery(queue, false).isExists();
/*
@@ -207,7 +207,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
String clientId = connection.getRemoteContainer();
String pubId = sender.getName();
- queue = clientId + ":" + pubId;
+ queue = createQueueName(clientId, pubId);
QueueQueryResult result = sessionSPI.queueQuery(queue, false);
if (result.isExists()) {
@@ -307,7 +307,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else {
String clientId = connection.getRemoteContainer();
String pubId = sender.getName();
- String queue = clientId + ":" + pubId;
+ String queue = createQueueName(clientId, pubId);
result = sessionSPI.queueQuery(queue, false);
if (result.isExists()) {
if (result.getConsumerCount() > 0) {
@@ -324,6 +324,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
+
@Override
public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
Object message = delivery.getContext();
@@ -478,4 +479,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
+ private static String createQueueName(String clientId, String pubId) {
+ return clientId + "." + pubId;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 0d667b0..3c088d5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -248,6 +248,10 @@ public class ProtonHandler extends ProtonInitializable {
}
public void flush() {
+ flush(false);
+ }
+
+ private void flush(boolean wait) {
synchronized (lock) {
transport.process();
@@ -255,14 +259,21 @@ public class ProtonHandler extends ProtonInitializable {
}
- dispatchExecutor.execute(dispatchRunnable);
+ if (wait) {
+ dispatch();
+ } else {
+ dispatchExecutor.execute(dispatchRunnable);
+ }
}
- public void close() {
+ public void close(ErrorCondition errorCondition) {
synchronized (lock) {
+ if (errorCondition != null) {
+ connection.setCondition(errorCondition);
+ }
connection.close();
}
- flush();
+ flush(true);
}
protected void checkServerSASL() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 48f4374..b96fcbf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -697,20 +697,24 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
@Override
- public void destroyQueue(final String name) throws Exception {
+ public void destroyQueue(final String name, final boolean removeConsumers) throws Exception {
checkStarted();
clearIO();
try {
SimpleString queueName = new SimpleString(name);
-
- server.destroyQueue(queueName, null, true);
+ server.destroyQueue(queueName, null, !removeConsumers, removeConsumers);
} finally {
blockOnIO();
}
}
@Override
+ public void destroyQueue(final String name) throws Exception {
+ destroyQueue(name, false);
+ }
+
+ @Override
public int getConnectionCount() {
checkStarted();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 65a69b7..936d4ef 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -207,10 +207,22 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpReceiver createReceiver(Source source) throws Exception {
+ return createReceiver(source, getNextReceiverId());
+ }
+
+ /**
+ * Create a receiver instance using the given Source
+ *
+ * @param source the caller created and configured Source used to create the receiver link.
+ * @param receiverId the receiver id to use.
+ * @return a newly created receiver that is ready for use.
+ * @throws Exception if an error occurs while creating the receiver.
+ */
+ public AmqpReceiver createReceiver(Source source, String receiverId) throws Exception {
checkClosed();
final ClientFuture request = new ClientFuture();
- final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId());
+ final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
connection.getScheduler().execute(new Runnable() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index e90a8b1..2a1e8c9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -58,7 +58,10 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -71,6 +74,8 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -156,6 +161,7 @@ public class ProtonTest extends ProtonTestBase {
server.createQueue(new SimpleString("amqp_testtopic" + "8"), new SimpleString("amqp_testtopic" + "8"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "9"), new SimpleString("amqp_testtopic" + "9"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);
+
connection = createConnection();
}
@@ -186,9 +192,9 @@ public class ProtonTest extends ProtonTestBase {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
myDurSub = session.createDurableSubscriber(topic, "myDurSub");
myDurSub.close();
- Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub")));
+ Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")));
session.unsubscribe("myDurSub");
- Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub")));
+ Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")));
session.close();
connection.close();
} finally {
@@ -741,6 +747,81 @@ public class ProtonTest extends ProtonTestBase {
}
@Test
+ public void testLinkDetachSentWhenQueueDeleted() throws Exception {
+ AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+ final AmqpConnection amqpConnection = client.connect();
+ try {
+ AmqpSession session = amqpConnection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(coreAddress);
+ server.destroyQueue(new SimpleString(coreAddress), null, false, true);
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return amqpConnection.isClosed();
+ }
+ });
+ assertTrue(receiver.isClosed());
+ } finally {
+ amqpConnection.close();
+ }
+ }
+
+ @Test
+ public void testCloseIsSentOnConnectionClose() throws Exception {
+ connection.close();
+
+ AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+ final AmqpConnection amqpConnection = client.connect();
+ try {
+ for (RemotingConnection connection : server.getRemotingService().getConnections()) {
+ server.getRemotingService().removeConnection(connection);
+ connection.disconnect(true);
+ }
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return amqpConnection.isClosed();
+ }
+ });
+
+ assertTrue(amqpConnection.isClosed());
+ assertEquals(AmqpSupport.CONNECTION_FORCED, amqpConnection.getConnection().getRemoteCondition().getCondition());
+ } finally {
+ amqpConnection.close();
+ }
+ }
+
+
+ @Test
+ public void testClientIdIsSetInSubscriptionList() throws Exception {
+ AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+ AmqpConnection amqpConnection = client.createConnection();
+ amqpConnection.setContainerId("testClient");
+ amqpConnection.setOfferedCapabilities(Arrays.asList(Symbol.getSymbol("topic")));
+ amqpConnection.connect();
+ try {
+ AmqpSession session = amqpConnection.createSession();
+
+ Source source = new Source();
+ source.setDurable(TerminusDurability.UNSETTLED_STATE);
+ source.setCapabilities(Symbol.getSymbol("topic"));
+ source.setAddress("jms.topic.mytopic");
+ AmqpReceiver receiver = session.createReceiver(source, "testSub");
+
+ SimpleString fo = new SimpleString("testClient.testSub:jms.topic.mytopic");
+ assertNotNull(server.locateQueue(fo));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ amqpConnection.close();
+ }
+ }
+
+ @Test
public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception {
String queueName = "TestQueueName";
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index df3cf57..86d19db 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -250,6 +250,32 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
}
@Test
+ public void testCreateAndDestroyQueueClosingConsumers() throws Exception {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString name = RandomUtil.randomSimpleString();
+ boolean durable = true;
+
+ ActiveMQServerControl serverControl = createManagementControl();
+
+ checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
+
+ serverControl.createQueue(address.toString(), name.toString(), durable);
+
+ ServerLocator receiveLocator = createInVMNonHALocator();
+ ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
+ ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
+ ClientConsumer consumer = receiveClientSession.createConsumer(name);
+
+ Assert.assertFalse(consumer.isClosed());
+
+ checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
+ serverControl.destroyQueue(name.toString(), true);
+ Assert.assertTrue(consumer.isClosed());
+
+ checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
+ }
+
+ @Test
public void testCreateAndDestroyQueueWithNullFilter() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString name = RandomUtil.randomSimpleString();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95c4fdd4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 69949b6..777ddd2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -128,6 +128,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
}
@Override
+ public void destroyQueue(final String name, final boolean removeConsumers) throws Exception {
+ proxy.invokeOperation("destroyQueue", name, removeConsumers);
+ }
+
+ @Override
public void disableMessageCounters() throws Exception {
proxy.invokeOperation("disableMessageCounters");
}