You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/03/29 01:32:49 UTC

[1/4] activemq-artemis git commit: This closes #1147

Repository: activemq-artemis
Updated Branches:
  refs/heads/master facc9dbc9 -> 056a88dfc


This closes #1147


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/056a88df
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/056a88df
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/056a88df

Branch: refs/heads/master
Commit: 056a88dfca45566822306f81e931bc9bf65450a5
Parents: facc9db 1f4473e
Author: Justin Bertram <jb...@apache.org>
Authored: Tue Mar 28 20:32:28 2017 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Tue Mar 28 20:32:28 2017 -0500

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  7 +++
 .../protocol/amqp/broker/AMQPMessage.java       | 22 +++++++++
 .../amqp/broker/AMQPSessionCallback.java        | 11 ++++-
 .../amqp/proton/ProtonServerSenderContext.java  | 12 +++--
 .../activemq/artemis/core/server/Consumer.java  |  3 ++
 .../artemis/core/server/ServerConsumer.java     |  2 +
 .../core/server/cluster/impl/BridgeImpl.java    |  9 ++++
 .../core/server/cluster/impl/Redistributor.java |  9 ++++
 .../core/server/impl/ServerConsumerImpl.java    | 30 +++++++++++++
 .../amqp/AmqpReceiverDispositionTest.java       |  4 +-
 .../tests/integration/amqp/ProtonTest.java      | 47 +++++++++++---------
 .../tests/integration/amqp/ProtonTestBase.java  |  6 ++-
 .../integration/cli/DummyServerConsumer.java    | 10 +++++
 .../core/server/impl/fakes/FakeConsumer.java    |  5 +++
 14 files changed, 147 insertions(+), 30 deletions(-)
----------------------------------------------------------------------



[2/4] activemq-artemis git commit: ARTEMIS-1080 Implementing AMQP::reject

Posted by jb...@apache.org.
ARTEMIS-1080 Implementing AMQP::reject


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/746220e1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/746220e1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/746220e1

Branch: refs/heads/master
Commit: 746220e11eaf19939b058ca19aeb6df80530c734
Parents: 13a272b
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Mar 28 17:12:02 2017 -0400
Committer: Justin Bertram <jb...@apache.org>
Committed: Tue Mar 28 20:32:28 2017 -0500

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPSessionCallback.java   |  9 +++++++++
 .../amqp/proton/ProtonServerSenderContext.java      |  2 +-
 .../artemis/core/server/ServerConsumer.java         |  2 ++
 .../core/server/impl/ServerConsumerImpl.java        | 16 ++++++++++++++++
 .../amqp/AmqpReceiverDispositionTest.java           |  4 ++--
 .../tests/integration/cli/DummyServerConsumer.java  |  5 +++++
 6 files changed, 35 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/746220e1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 58134f5..58d51db 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -336,6 +336,15 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
+   public void reject(Object brokerConsumer, Message message) throws Exception {
+      recoverContext();
+      try {
+         ((ServerConsumer) brokerConsumer).reject(message.getMessageID());
+      } finally {
+         resetContext();
+      }
+   }
+
    public void resumeDelivery(Object consumer) {
       ((ServerConsumer) consumer).receiveCredits(-1);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/746220e1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 780ca4d..fb540a8 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -546,7 +546,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                }
             } else if (remoteState instanceof Rejected) {
                try {
-                  sessionSPI.cancel(brokerConsumer, message, true);
+                  sessionSPI.reject(brokerConsumer, message);
                } catch (Exception e) {
                   throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
                }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/746220e1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index ce9c489..37232bc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -92,6 +92,8 @@ public interface ServerConsumer extends Consumer {
 
    void individualAcknowledge(Transaction tx, long messageID) throws Exception;
 
+   void reject(final long messageID) throws Exception;
+
    void individualCancel(final long messageID, boolean failed) throws Exception;
 
    void forceDelivery(long sequence);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/746220e1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index e812f2e..3552b93 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -910,6 +910,22 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       ref.getQueue().cancel(ref, System.currentTimeMillis());
    }
 
+
+   @Override
+   public synchronized void reject(final long messageID) throws Exception {
+      if (browseOnly) {
+         return;
+      }
+
+      MessageReference ref = removeReferenceByID(messageID);
+
+      if (ref == null) {
+         return; // nothing to be done
+      }
+
+      ref.getQueue().sendToDeadLetterAddress(null, ref);
+   }
+
    @Override
    public synchronized void backToDelivering(MessageReference reference) {
       deliveringRefs.addFirst(reference);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/746220e1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
index f206654..b347d37 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
@@ -97,13 +97,13 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
       // Reject is a terminal outcome and should not be redelivered to the rejecting receiver
       // or any other as it should move to the archived state.
       receiver1.flow(1);
-      message = receiver1.receive(1, TimeUnit.SECONDS);
+      message = receiver1.receiveNoWait();
       assertNull("Should not receive message again", message);
 
       // Attempt to Read the message again with another receiver to validate it is archived.
       AmqpReceiver receiver2 = session.createReceiver(getTestName());
       receiver2.flow(1);
-      assertNull(receiver2.receive(3, TimeUnit.SECONDS));
+      assertNull(receiver2.receiveNoWait());
 
       connection.close();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/746220e1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index d79b444..78b3e09 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -123,6 +123,11 @@ public class DummyServerConsumer implements ServerConsumer {
    }
 
    @Override
+   public void reject(long messageID) throws Exception {
+
+   }
+
+   @Override
    public void acknowledge(Transaction tx, long messageID) throws Exception {
 
    }


[3/4] activemq-artemis git commit: ARTEMIS-1081 Implementing AMQP UndeliverableHere

Posted by jb...@apache.org.
ARTEMIS-1081 Implementing AMQP UndeliverableHere


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1f4473e8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1f4473e8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1f4473e8

Branch: refs/heads/master
Commit: 1f4473e8d77ccc724a16921b5e1207b565ab7c4c
Parents: 746220e
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Mar 28 17:59:41 2017 -0400
Committer: Justin Bertram <jb...@apache.org>
Committed: Tue Mar 28 20:32:28 2017 -0500

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  7 +++++++
 .../protocol/amqp/broker/AMQPMessage.java       | 22 ++++++++++++++++++++
 .../amqp/proton/ProtonServerSenderContext.java  | 10 +++++++--
 .../activemq/artemis/core/server/Consumer.java  |  3 +++
 .../core/server/cluster/impl/BridgeImpl.java    |  9 ++++++++
 .../core/server/cluster/impl/Redistributor.java |  9 ++++++++
 .../core/server/impl/ServerConsumerImpl.java    | 14 +++++++++++++
 .../integration/cli/DummyServerConsumer.java    |  5 +++++
 .../core/server/impl/fakes/FakeConsumer.java    |  5 +++++
 9 files changed, 82 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 9cd3fa7..856e865 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -251,6 +251,13 @@ public interface Message {
    /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
    Message copy(long newID);
 
+   default boolean acceptsConsumer(long uniqueConsumerID) {
+      return true;
+   }
+
+   default void rejectConsumer(long uniqueConsumerID) {
+   }
+
    /**
     * Returns the messageID.
     * <br>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index d241958..08953a2 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -81,6 +81,8 @@ public class AMQPMessage extends RefCountMessage {
    private long scheduledTime = -1;
    private String connectionID;
 
+   Set<Object> rejectedConsumers;
+
    public AMQPMessage(long messageFormat, byte[] data) {
       this.data = Unpooled.wrappedBuffer(data);
       this.messageFormat = messageFormat;
@@ -323,6 +325,26 @@ public class AMQPMessage extends RefCountMessage {
       return AMQPMessagePersister.getInstance();
    }
 
+   @Override
+   public synchronized boolean acceptsConsumer(long consumer) {
+
+      if (rejectedConsumers == null) {
+         return true;
+      } else {
+         return !rejectedConsumers.contains(consumer);
+      }
+   }
+
+   @Override
+   public synchronized void rejectConsumer(long consumer) {
+      if (rejectedConsumers == null) {
+         rejectedConsumers = new HashSet<>();
+      }
+
+      rejectedConsumers.add(consumer);
+   }
+
+
    private synchronized void partialDecode(ByteBuffer buffer) {
       DecoderImpl decoder = TLSEncode.getDecoder();
       decoder.setByteBuffer(buffer);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index fb540a8..69d156b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
+import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@@ -78,7 +79,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    private static final Symbol SHARED = Symbol.valueOf("shared");
    private static final Symbol GLOBAL = Symbol.valueOf("global");
 
-   private Object brokerConsumer;
+   private Consumer brokerConsumer;
 
    protected final AMQPSessionContext protonSession;
    protected final Sender sender;
@@ -391,7 +392,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
       boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
       try {
-         brokerConsumer = sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
+         brokerConsumer = (Consumer)sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
       } catch (ActiveMQAMQPResourceLimitExceededException e1) {
          throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
       } catch (Exception e) {
@@ -553,6 +554,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             } else if (remoteState instanceof Modified) {
                try {
                   Modified modification = (Modified) remoteState;
+
+                  if (Boolean.TRUE.equals(modification.getUndeliverableHere())) {
+                     message.rejectConsumer(((Consumer)brokerConsumer).sequentialID());
+                  }
+
                   if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
                      sessionSPI.cancel(brokerConsumer, message, true);
                   } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index 58c7d81..50c0b01 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -68,4 +68,7 @@ public interface Consumer {
     * disconnect the consumer
     */
    void disconnect();
+
+   /** an unique sequential ID for this consumer */
+   long sequentialID();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index fe43532..c1a0ccc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -86,6 +86,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
    private final UUID nodeUUID;
 
+   private final long sequentialID;
+
    private final SimpleString name;
 
    private final Queue queue;
@@ -170,6 +172,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
                      final String password,
                      final StorageManager storageManager) {
 
+      this.sequentialID = storageManager.generateID();
+
       this.reconnectAttempts = reconnectAttempts;
 
       this.reconnectAttemptsInUse = initialConnectAttempts;
@@ -245,6 +249,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
    }
 
    @Override
+   public long sequentialID() {
+      return sequentialID;
+   }
+
+   @Override
    public synchronized void start() throws Exception {
       if (started) {
          return;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index 26399dc..eff8d67 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -52,6 +52,8 @@ public class Redistributor implements Consumer {
 
    private int count;
 
+   private final long sequentialID;
+
    // a Flush executor here is happening inside another executor.
    // what may cause issues under load. Say you are running out of executors for cases where you don't need to wait at all.
    // So, instead of using a future we will use a plain ReusableLatch here
@@ -64,6 +66,8 @@ public class Redistributor implements Consumer {
                         final int batchSize) {
       this.queue = queue;
 
+      this.sequentialID = storageManager.generateID();
+
       this.storageManager = storageManager;
 
       this.postOffice = postOffice;
@@ -74,6 +78,11 @@ public class Redistributor implements Consumer {
    }
 
    @Override
+   public long sequentialID() {
+      return sequentialID;
+   }
+
+   @Override
    public Filter getFilter() {
       return null;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 3552b93..9e33602 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -77,6 +77,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
    private final long id;
 
+   private final long sequentialID;
+
    protected final Queue messageQueue;
 
    private final Filter filter;
@@ -180,6 +182,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                              final ActiveMQServer server) throws Exception {
       this.id = id;
 
+      this.sequentialID = server.getStorageManager().generateID();
+
       this.filter = filter;
 
       this.session = session;
@@ -232,6 +236,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    // ServerConsumer implementation
    // ----------------------------------------------------------------------
 
+
+   @Override
+   public long sequentialID() {
+      return sequentialID;
+   }
+
    @Override
    public Object getProtocolData() {
       return protocolData;
@@ -343,6 +353,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
          }
          final Message message = ref.getMessage();
 
+         if (!message.acceptsConsumer(sequentialID())) {
+            return HandleStatus.NO_MATCH;
+         }
+
          if (filter != null && !filter.match(message)) {
             if (logger.isTraceEnabled()) {
                logger.trace("Reference " + ref + " is a noMatch on consumer " + this);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 78b3e09..968c31b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -59,6 +59,11 @@ public class DummyServerConsumer implements ServerConsumer {
    }
 
    @Override
+   public long sequentialID() {
+      return 0;
+   }
+
+   @Override
    public Object getProtocolContext() {
       return null;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index 665686c..1db8347 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -82,6 +82,11 @@ public class FakeConsumer implements Consumer {
       delayCountdown = numReferences;
    }
 
+   @Override
+   public long sequentialID() {
+      return 0;
+   }
+
    public synchronized List<MessageReference> getReferences() {
       return references;
    }


[4/4] activemq-artemis git commit: ARTEMIS-1056 fixing tests

Posted by jb...@apache.org.
ARTEMIS-1056 fixing tests

When I added flow control, some tests that were using reflection started to fail.
Also as a precaution I'm using <= on the flow control low credit check


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/13a272b3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/13a272b3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/13a272b3

Branch: refs/heads/master
Commit: 13a272b37b16949d18cc7ef5264196b388aafcf5
Parents: facc9db
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Mar 28 16:14:21 2017 -0400
Committer: Justin Bertram <jb...@apache.org>
Committed: Tue Mar 28 20:32:28 2017 -0500

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        |  2 +-
 .../tests/integration/amqp/ProtonTest.java      | 47 +++++++++++---------
 .../tests/integration/amqp/ProtonTestBase.java  |  6 ++-
 3 files changed, 30 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13a272b3/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 18294e0..58134f5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -451,7 +451,7 @@ public class AMQPSessionCallback implements SessionCallback {
             @Override
             public void run() {
                synchronized (connection.getLock()) {
-                  if (receiver.getRemoteCredit() < threshold) {
+                  if (receiver.getRemoteCredit() <= threshold) {
                      receiver.flow(credits);
                      connection.flush();
                   }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13a272b3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index fb5e90a..199d9c5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -41,7 +41,6 @@ import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 import java.io.IOException;
 import java.io.Serializable;
-import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -74,7 +73,6 @@ import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFact
 import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
 import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
-import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.ByteUtil;
@@ -128,6 +126,7 @@ public class ProtonTest extends ProtonTestBase {
       return Arrays.asList(new Object[][]{{"AMQP", 0}, {"AMQP_ANONYMOUS", 3}});
    }
 
+
    ConnectionFactory factory;
 
    private final int protocol;
@@ -146,6 +145,14 @@ public class ProtonTest extends ProtonTestBase {
    private final String address;
    private Connection connection;
 
+
+   @Override
+   protected ActiveMQServer createAMQPServer(int port) throws Exception {
+      ActiveMQServer server = super.createAMQPServer(port);
+      server.getConfiguration().addAcceptorConfiguration("flow", "tcp://localhost:" + (8 + port) + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpMinCredits=1");
+      return server;
+   }
+
    @Override
    @Before
    public void setUp() throws Exception {
@@ -418,14 +425,9 @@ public class ProtonTest extends ProtonTestBase {
 
    @Test
    public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception {
-      // Only allow 1 credit to be submitted at a time.
-      Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
-      maxCreditAllocation.setAccessible(true);
-      int originalMaxCreditAllocation = maxCreditAllocation.getInt(null);
-      maxCreditAllocation.setInt(null, 1);
 
       String destinationAddress = address + 1;
-      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpClient client = new AmqpClient(new URI("tcp://localhost:5680"), userName, password);
       AmqpConnection amqpConnection = client.connect();
       try {
          AmqpSession session = amqpConnection.createSession();
@@ -433,7 +435,6 @@ public class ProtonTest extends ProtonTestBase {
          assertTrue(sender.getSender().getCredit() == 1);
       } finally {
          amqpConnection.close();
-         maxCreditAllocation.setInt(null, originalMaxCreditAllocation);
       }
    }
 
@@ -609,18 +610,13 @@ public class ProtonTest extends ProtonTestBase {
       assertTrue(addressSize >= maxSizeBytesRejectThreshold);
    }
 
-   @Test
+   @Test(timeout = 10000)
    public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
       setAddressFullBlockPolicy();
 
-      // Only allow 1 credit to be submitted at a time.
-      Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
-      maxCreditAllocation.setAccessible(true);
-      int originalMaxCreditAllocation = maxCreditAllocation.getInt(null);
-      maxCreditAllocation.setInt(null, 1);
 
       String destinationAddress = address + 1;
-      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpClient client = new AmqpClient(new URI("tcp://localhost:5680"), userName, password);
       AmqpConnection amqpConnection = client.connect();
       try {
          AmqpSession session = amqpConnection.createSession();
@@ -637,7 +633,6 @@ public class ProtonTest extends ProtonTestBase {
          assertTrue(addressSize >= maxSizeBytes && addressSize <= maxSizeBytesRejectThreshold);
       } finally {
          amqpConnection.close();
-         maxCreditAllocation.setInt(null, originalMaxCreditAllocation);
       }
    }
 
@@ -771,6 +766,7 @@ public class ProtonTest extends ProtonTestBase {
             try {
                for (int i = 0; i < maxMessages; i++) {
                   sender.send(message);
+                  System.out.println("Sent " + i);
                   sentMessages.getAndIncrement();
                }
                timeout.countDown();
@@ -781,13 +777,20 @@ public class ProtonTest extends ProtonTestBase {
       };
 
       Thread t = new Thread(sendMessages);
-      t.start();
 
-      timeout.await(5, TimeUnit.SECONDS);
+      try {
+         t.start();
+
+         timeout.await(1, TimeUnit.SECONDS);
 
-      messagesSent = sentMessages.get();
-      if (errors[0] != null) {
-         throw errors[0];
+         messagesSent = sentMessages.get();
+         if (errors[0] != null) {
+            throw errors[0];
+         }
+      } finally {
+         t.interrupt();
+         t.join(1000);
+         Assert.assertFalse(t.isAlive());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13a272b3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
index 7057b8b..1a06c54 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -54,9 +53,12 @@ public class ProtonTestBase extends ActiveMQTestBase {
       params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
       HashMap<String, Object> amqpParams = new HashMap<>();
       configureAmqp(amqpParams);
+
+      amqpServer.getConfiguration().getAcceptorConfigurations().clear();
+
       TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams);
 
-      amqpServer.getConfiguration().setAcceptorConfigurations(Collections.singleton(transportConfiguration));
+      amqpServer.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
       amqpServer.getConfiguration().setName(brokerName);
       amqpServer.getConfiguration().setJournalDirectory(amqpServer.getConfiguration().getJournalDirectory() + port);
       amqpServer.getConfiguration().setBindingsDirectory(amqpServer.getConfiguration().getBindingsDirectory() + port);