You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/03/29 01:32:49 UTC
[1/4] activemq-artemis git commit: This closes #1147
Repository: activemq-artemis
Updated Branches:
refs/heads/master facc9dbc9 -> 056a88dfc
This closes #1147
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/056a88df
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/056a88df
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/056a88df
Branch: refs/heads/master
Commit: 056a88dfca45566822306f81e931bc9bf65450a5
Parents: facc9db 1f4473e
Author: Justin Bertram <jb...@apache.org>
Authored: Tue Mar 28 20:32:28 2017 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Tue Mar 28 20:32:28 2017 -0500
----------------------------------------------------------------------
.../activemq/artemis/api/core/Message.java | 7 +++
.../protocol/amqp/broker/AMQPMessage.java | 22 +++++++++
.../amqp/broker/AMQPSessionCallback.java | 11 ++++-
.../amqp/proton/ProtonServerSenderContext.java | 12 +++--
.../activemq/artemis/core/server/Consumer.java | 3 ++
.../artemis/core/server/ServerConsumer.java | 2 +
.../core/server/cluster/impl/BridgeImpl.java | 9 ++++
.../core/server/cluster/impl/Redistributor.java | 9 ++++
.../core/server/impl/ServerConsumerImpl.java | 30 +++++++++++++
.../amqp/AmqpReceiverDispositionTest.java | 4 +-
.../tests/integration/amqp/ProtonTest.java | 47 +++++++++++---------
.../tests/integration/amqp/ProtonTestBase.java | 6 ++-
.../integration/cli/DummyServerConsumer.java | 10 +++++
.../core/server/impl/fakes/FakeConsumer.java | 5 +++
14 files changed, 147 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
[2/4] activemq-artemis git commit: ARTEMIS-1080 Implementing
AMQP::reject
Posted by jb...@apache.org.
ARTEMIS-1080 Implementing AMQP::reject
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/746220e1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/746220e1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/746220e1
Branch: refs/heads/master
Commit: 746220e11eaf19939b058ca19aeb6df80530c734
Parents: 13a272b
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Mar 28 17:12:02 2017 -0400
Committer: Justin Bertram <jb...@apache.org>
Committed: Tue Mar 28 20:32:28 2017 -0500
----------------------------------------------------------------------
.../protocol/amqp/broker/AMQPSessionCallback.java | 9 +++++++++
.../amqp/proton/ProtonServerSenderContext.java | 2 +-
.../artemis/core/server/ServerConsumer.java | 2 ++
.../core/server/impl/ServerConsumerImpl.java | 16 ++++++++++++++++
.../amqp/AmqpReceiverDispositionTest.java | 4 ++--
.../tests/integration/cli/DummyServerConsumer.java | 5 +++++
6 files changed, 35 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/746220e1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 58134f5..58d51db 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -336,6 +336,15 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
+ public void reject(Object brokerConsumer, Message message) throws Exception {
+ recoverContext();
+ try {
+ ((ServerConsumer) brokerConsumer).reject(message.getMessageID());
+ } finally {
+ resetContext();
+ }
+ }
+
public void resumeDelivery(Object consumer) {
((ServerConsumer) consumer).receiveCredits(-1);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/746220e1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 780ca4d..fb540a8 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -546,7 +546,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
} else if (remoteState instanceof Rejected) {
try {
- sessionSPI.cancel(brokerConsumer, message, true);
+ sessionSPI.reject(brokerConsumer, message);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/746220e1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index ce9c489..37232bc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -92,6 +92,8 @@ public interface ServerConsumer extends Consumer {
void individualAcknowledge(Transaction tx, long messageID) throws Exception;
+ void reject(final long messageID) throws Exception;
+
void individualCancel(final long messageID, boolean failed) throws Exception;
void forceDelivery(long sequence);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/746220e1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index e812f2e..3552b93 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -910,6 +910,22 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ref.getQueue().cancel(ref, System.currentTimeMillis());
}
+
+ @Override
+ public synchronized void reject(final long messageID) throws Exception {
+ if (browseOnly) {
+ return;
+ }
+
+ MessageReference ref = removeReferenceByID(messageID);
+
+ if (ref == null) {
+ return; // nothing to be done
+ }
+
+ ref.getQueue().sendToDeadLetterAddress(null, ref);
+ }
+
@Override
public synchronized void backToDelivering(MessageReference reference) {
deliveringRefs.addFirst(reference);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/746220e1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
index f206654..b347d37 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
@@ -97,13 +97,13 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
// Reject is a terminal outcome and should not be redelivered to the rejecting receiver
// or any other as it should move to the archived state.
receiver1.flow(1);
- message = receiver1.receive(1, TimeUnit.SECONDS);
+ message = receiver1.receiveNoWait();
assertNull("Should not receive message again", message);
// Attempt to Read the message again with another receiver to validate it is archived.
AmqpReceiver receiver2 = session.createReceiver(getTestName());
receiver2.flow(1);
- assertNull(receiver2.receive(3, TimeUnit.SECONDS));
+ assertNull(receiver2.receiveNoWait());
connection.close();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/746220e1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index d79b444..78b3e09 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -123,6 +123,11 @@ public class DummyServerConsumer implements ServerConsumer {
}
@Override
+ public void reject(long messageID) throws Exception {
+
+ }
+
+ @Override
public void acknowledge(Transaction tx, long messageID) throws Exception {
}
[3/4] activemq-artemis git commit: ARTEMIS-1081 Implementing AMQP
UndeliverableHere
Posted by jb...@apache.org.
ARTEMIS-1081 Implementing AMQP UndeliverableHere
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1f4473e8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1f4473e8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1f4473e8
Branch: refs/heads/master
Commit: 1f4473e8d77ccc724a16921b5e1207b565ab7c4c
Parents: 746220e
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Mar 28 17:59:41 2017 -0400
Committer: Justin Bertram <jb...@apache.org>
Committed: Tue Mar 28 20:32:28 2017 -0500
----------------------------------------------------------------------
.../activemq/artemis/api/core/Message.java | 7 +++++++
.../protocol/amqp/broker/AMQPMessage.java | 22 ++++++++++++++++++++
.../amqp/proton/ProtonServerSenderContext.java | 10 +++++++--
.../activemq/artemis/core/server/Consumer.java | 3 +++
.../core/server/cluster/impl/BridgeImpl.java | 9 ++++++++
.../core/server/cluster/impl/Redistributor.java | 9 ++++++++
.../core/server/impl/ServerConsumerImpl.java | 14 +++++++++++++
.../integration/cli/DummyServerConsumer.java | 5 +++++
.../core/server/impl/fakes/FakeConsumer.java | 5 +++++
9 files changed, 82 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 9cd3fa7..856e865 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -251,6 +251,13 @@ public interface Message {
/** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
Message copy(long newID);
+ default boolean acceptsConsumer(long uniqueConsumerID) {
+ return true;
+ }
+
+ default void rejectConsumer(long uniqueConsumerID) {
+ }
+
/**
* Returns the messageID.
* <br>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index d241958..08953a2 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -81,6 +81,8 @@ public class AMQPMessage extends RefCountMessage {
private long scheduledTime = -1;
private String connectionID;
+ Set<Object> rejectedConsumers;
+
public AMQPMessage(long messageFormat, byte[] data) {
this.data = Unpooled.wrappedBuffer(data);
this.messageFormat = messageFormat;
@@ -323,6 +325,26 @@ public class AMQPMessage extends RefCountMessage {
return AMQPMessagePersister.getInstance();
}
+ @Override
+ public synchronized boolean acceptsConsumer(long consumer) {
+
+ if (rejectedConsumers == null) {
+ return true;
+ } else {
+ return !rejectedConsumers.contains(consumer);
+ }
+ }
+
+ @Override
+ public synchronized void rejectConsumer(long consumer) {
+ if (rejectedConsumers == null) {
+ rejectedConsumers = new HashSet<>();
+ }
+
+ rejectedConsumers.add(consumer);
+ }
+
+
private synchronized void partialDecode(ByteBuffer buffer) {
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setByteBuffer(buffer);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index fb540a8..69d156b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
+import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@@ -78,7 +79,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private static final Symbol SHARED = Symbol.valueOf("shared");
private static final Symbol GLOBAL = Symbol.valueOf("global");
- private Object brokerConsumer;
+ private Consumer brokerConsumer;
protected final AMQPSessionContext protonSession;
protected final Sender sender;
@@ -391,7 +392,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
try {
- brokerConsumer = sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
+ brokerConsumer = (Consumer)sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
} catch (ActiveMQAMQPResourceLimitExceededException e1) {
throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
} catch (Exception e) {
@@ -553,6 +554,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else if (remoteState instanceof Modified) {
try {
Modified modification = (Modified) remoteState;
+
+ if (Boolean.TRUE.equals(modification.getUndeliverableHere())) {
+ message.rejectConsumer(((Consumer)brokerConsumer).sequentialID());
+ }
+
if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
sessionSPI.cancel(brokerConsumer, message, true);
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index 58c7d81..50c0b01 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -68,4 +68,7 @@ public interface Consumer {
* disconnect the consumer
*/
void disconnect();
+
+ /** an unique sequential ID for this consumer */
+ long sequentialID();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index fe43532..c1a0ccc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -86,6 +86,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private final UUID nodeUUID;
+ private final long sequentialID;
+
private final SimpleString name;
private final Queue queue;
@@ -170,6 +172,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
final String password,
final StorageManager storageManager) {
+ this.sequentialID = storageManager.generateID();
+
this.reconnectAttempts = reconnectAttempts;
this.reconnectAttemptsInUse = initialConnectAttempts;
@@ -245,6 +249,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
@Override
+ public long sequentialID() {
+ return sequentialID;
+ }
+
+ @Override
public synchronized void start() throws Exception {
if (started) {
return;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index 26399dc..eff8d67 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -52,6 +52,8 @@ public class Redistributor implements Consumer {
private int count;
+ private final long sequentialID;
+
// a Flush executor here is happening inside another executor.
// what may cause issues under load. Say you are running out of executors for cases where you don't need to wait at all.
// So, instead of using a future we will use a plain ReusableLatch here
@@ -64,6 +66,8 @@ public class Redistributor implements Consumer {
final int batchSize) {
this.queue = queue;
+ this.sequentialID = storageManager.generateID();
+
this.storageManager = storageManager;
this.postOffice = postOffice;
@@ -74,6 +78,11 @@ public class Redistributor implements Consumer {
}
@Override
+ public long sequentialID() {
+ return sequentialID;
+ }
+
+ @Override
public Filter getFilter() {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 3552b93..9e33602 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -77,6 +77,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private final long id;
+ private final long sequentialID;
+
protected final Queue messageQueue;
private final Filter filter;
@@ -180,6 +182,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
final ActiveMQServer server) throws Exception {
this.id = id;
+ this.sequentialID = server.getStorageManager().generateID();
+
this.filter = filter;
this.session = session;
@@ -232,6 +236,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// ServerConsumer implementation
// ----------------------------------------------------------------------
+
+ @Override
+ public long sequentialID() {
+ return sequentialID;
+ }
+
@Override
public Object getProtocolData() {
return protocolData;
@@ -343,6 +353,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
final Message message = ref.getMessage();
+ if (!message.acceptsConsumer(sequentialID())) {
+ return HandleStatus.NO_MATCH;
+ }
+
if (filter != null && !filter.match(message)) {
if (logger.isTraceEnabled()) {
logger.trace("Reference " + ref + " is a noMatch on consumer " + this);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 78b3e09..968c31b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -59,6 +59,11 @@ public class DummyServerConsumer implements ServerConsumer {
}
@Override
+ public long sequentialID() {
+ return 0;
+ }
+
+ @Override
public Object getProtocolContext() {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index 665686c..1db8347 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -82,6 +82,11 @@ public class FakeConsumer implements Consumer {
delayCountdown = numReferences;
}
+ @Override
+ public long sequentialID() {
+ return 0;
+ }
+
public synchronized List<MessageReference> getReferences() {
return references;
}
[4/4] activemq-artemis git commit: ARTEMIS-1056 fixing tests
Posted by jb...@apache.org.
ARTEMIS-1056 fixing tests
When I added flow control, some tests that were using reflection started to fail.
Also as a precaution I'm using <= on the flow control low credit check
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/13a272b3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/13a272b3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/13a272b3
Branch: refs/heads/master
Commit: 13a272b37b16949d18cc7ef5264196b388aafcf5
Parents: facc9db
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Mar 28 16:14:21 2017 -0400
Committer: Justin Bertram <jb...@apache.org>
Committed: Tue Mar 28 20:32:28 2017 -0500
----------------------------------------------------------------------
.../amqp/broker/AMQPSessionCallback.java | 2 +-
.../tests/integration/amqp/ProtonTest.java | 47 +++++++++++---------
.../tests/integration/amqp/ProtonTestBase.java | 6 ++-
3 files changed, 30 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13a272b3/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 18294e0..58134f5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -451,7 +451,7 @@ public class AMQPSessionCallback implements SessionCallback {
@Override
public void run() {
synchronized (connection.getLock()) {
- if (receiver.getRemoteCredit() < threshold) {
+ if (receiver.getRemoteCredit() <= threshold) {
receiver.flow(credits);
connection.flush();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13a272b3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index fb5e90a..199d9c5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -41,7 +41,6 @@ import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import java.io.IOException;
import java.io.Serializable;
-import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
@@ -74,7 +73,6 @@ import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFact
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
-import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ByteUtil;
@@ -128,6 +126,7 @@ public class ProtonTest extends ProtonTestBase {
return Arrays.asList(new Object[][]{{"AMQP", 0}, {"AMQP_ANONYMOUS", 3}});
}
+
ConnectionFactory factory;
private final int protocol;
@@ -146,6 +145,14 @@ public class ProtonTest extends ProtonTestBase {
private final String address;
private Connection connection;
+
+ @Override
+ protected ActiveMQServer createAMQPServer(int port) throws Exception {
+ ActiveMQServer server = super.createAMQPServer(port);
+ server.getConfiguration().addAcceptorConfiguration("flow", "tcp://localhost:" + (8 + port) + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpMinCredits=1");
+ return server;
+ }
+
@Override
@Before
public void setUp() throws Exception {
@@ -418,14 +425,9 @@ public class ProtonTest extends ProtonTestBase {
@Test
public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception {
- // Only allow 1 credit to be submitted at a time.
- Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
- maxCreditAllocation.setAccessible(true);
- int originalMaxCreditAllocation = maxCreditAllocation.getInt(null);
- maxCreditAllocation.setInt(null, 1);
String destinationAddress = address + 1;
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+ AmqpClient client = new AmqpClient(new URI("tcp://localhost:5680"), userName, password);
AmqpConnection amqpConnection = client.connect();
try {
AmqpSession session = amqpConnection.createSession();
@@ -433,7 +435,6 @@ public class ProtonTest extends ProtonTestBase {
assertTrue(sender.getSender().getCredit() == 1);
} finally {
amqpConnection.close();
- maxCreditAllocation.setInt(null, originalMaxCreditAllocation);
}
}
@@ -609,18 +610,13 @@ public class ProtonTest extends ProtonTestBase {
assertTrue(addressSize >= maxSizeBytesRejectThreshold);
}
- @Test
+ @Test(timeout = 10000)
public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
setAddressFullBlockPolicy();
- // Only allow 1 credit to be submitted at a time.
- Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
- maxCreditAllocation.setAccessible(true);
- int originalMaxCreditAllocation = maxCreditAllocation.getInt(null);
- maxCreditAllocation.setInt(null, 1);
String destinationAddress = address + 1;
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+ AmqpClient client = new AmqpClient(new URI("tcp://localhost:5680"), userName, password);
AmqpConnection amqpConnection = client.connect();
try {
AmqpSession session = amqpConnection.createSession();
@@ -637,7 +633,6 @@ public class ProtonTest extends ProtonTestBase {
assertTrue(addressSize >= maxSizeBytes && addressSize <= maxSizeBytesRejectThreshold);
} finally {
amqpConnection.close();
- maxCreditAllocation.setInt(null, originalMaxCreditAllocation);
}
}
@@ -771,6 +766,7 @@ public class ProtonTest extends ProtonTestBase {
try {
for (int i = 0; i < maxMessages; i++) {
sender.send(message);
+ System.out.println("Sent " + i);
sentMessages.getAndIncrement();
}
timeout.countDown();
@@ -781,13 +777,20 @@ public class ProtonTest extends ProtonTestBase {
};
Thread t = new Thread(sendMessages);
- t.start();
- timeout.await(5, TimeUnit.SECONDS);
+ try {
+ t.start();
+
+ timeout.await(1, TimeUnit.SECONDS);
- messagesSent = sentMessages.get();
- if (errors[0] != null) {
- throw errors[0];
+ messagesSent = sentMessages.get();
+ if (errors[0] != null) {
+ throw errors[0];
+ }
+ } finally {
+ t.interrupt();
+ t.join(1000);
+ Assert.assertFalse(t.isAlive());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13a272b3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
index 7057b8b..1a06c54 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -54,9 +53,12 @@ public class ProtonTestBase extends ActiveMQTestBase {
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
HashMap<String, Object> amqpParams = new HashMap<>();
configureAmqp(amqpParams);
+
+ amqpServer.getConfiguration().getAcceptorConfigurations().clear();
+
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams);
- amqpServer.getConfiguration().setAcceptorConfigurations(Collections.singleton(transportConfiguration));
+ amqpServer.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
amqpServer.getConfiguration().setName(brokerName);
amqpServer.getConfiguration().setJournalDirectory(amqpServer.getConfiguration().getJournalDirectory() + port);
amqpServer.getConfiguration().setBindingsDirectory(amqpServer.getConfiguration().getBindingsDirectory() + port);