You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/04/21 19:48:37 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1127 Match remote Sender and Receiver settle modes

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 310f953c7 -> 694dbd700


ARTEMIS-1127 Match remote Sender and Receiver settle modes

On link attach we currently default out SenderSettleMode to MIXED which
while legal doesn't truly reflect what the client asked for. We instead
now update the link to reflect the mode requested by the client

Also add some tests to ensure that we always return the
ReceiverSettleMode as FIRST since we don't support SECOND.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9eed28e0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9eed28e0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9eed28e0

Branch: refs/heads/master
Commit: 9eed28e0aad9683b894f03f731e053b5c4639bc6
Parents: 310f953
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Apr 21 14:22:09 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Apr 21 15:00:09 2017 -0400

----------------------------------------------------------------------
 .../proton/ProtonServerReceiverContext.java     |   7 ++
 .../amqp/proton/ProtonServerSenderContext.java  |  14 ++-
 .../transport/amqp/client/AmqpReceiver.java     |  59 +++++++++--
 .../transport/amqp/client/AmqpSender.java       |  59 +++++++++--
 .../transport/amqp/client/AmqpSession.java      |  76 +++++++++++++-
 .../integration/amqp/AmqpReceiverTest.java      |  99 ++++++++++++++++++
 .../tests/integration/amqp/AmqpSenderTest.java  | 104 +++++++++++++++++++
 7 files changed, 396 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9eed28e0/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 2606482..57d7307 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -31,6 +31,7 @@ import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.jboss.logging.Logger;
@@ -81,6 +82,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       super.initialise();
       org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
 
+      // Match the settlement mode of the remote instead of relying on the default of MIXED.
+      receiver.setSenderSettleMode(receiver.getRemoteSenderSettleMode());
+
+      // We don't currently support SECOND so enforce that the answer is anlways FIRST
+      receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
       if (target != null) {
          if (target.getDynamic()) {
             // if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9eed28e0/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 4d8bf53..f8fa473 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -22,8 +22,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -63,12 +61,16 @@ import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Sender;
 import org.jboss.logging.Logger;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
 /**
  * TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
  */
@@ -155,6 +157,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       String selector = null;
       final Map<Symbol, Object> supportedFilters = new HashMap<>();
 
+      // Match the settlement mode of the remote instead of relying on the default of MIXED.
+      sender.setSenderSettleMode(sender.getRemoteSenderSettleMode());
+
+      // We don't currently support SECOND so enforce that the answer is anlways FIRST
+      sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
       if (source != null) {
          // We look for message selectors on every receiver, while in other cases we might only
          // consume the filter depending on the subscription type.
@@ -570,7 +578,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                   Modified modification = (Modified) remoteState;
 
                   if (Boolean.TRUE.equals(modification.getUndeliverableHere())) {
-                     message.rejectConsumer(((Consumer) brokerConsumer).sequentialID());
+                     message.rejectConsumer(brokerConsumer.sequentialID());
                   }
 
                   if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9eed28e0/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 414f933..9de2fce 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -70,7 +70,10 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    private final AmqpSession session;
    private final String address;
    private final String receiverId;
+
    private final Source userSpecifiedSource;
+   private final SenderSettleMode userSpecifiedSenderSettlementMode;
+   private final ReceiverSettleMode userSpecifiedReceiverSettlementMode;
 
    private String subscriptionName;
    private String selector;
@@ -83,11 +86,32 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    /**
     * Create a new receiver instance.
     *
-    * @param session    The parent session that created the receiver.
-    * @param address    The address that this receiver should listen on.
-    * @param receiverId The unique ID assigned to this receiver.
+    * @param session
+    *        The parent session that created the receiver.
+    * @param address
+    *        The address that this receiver should listen on.
+    * @param receiverId
+    *        The unique ID assigned to this receiver.
     */
    public AmqpReceiver(AmqpSession session, String address, String receiverId) {
+      this(session, address, receiverId, null, null);
+   }
+
+   /**
+    * Create a new receiver instance.
+    *
+    * @param session
+    *        The parent session that created the receiver.
+    * @param address
+    *        The address that this receiver should listen on.
+    * @param receiverId
+    *        The unique ID assigned to this receiver.
+    * @param senderMode
+    *        The {@link SenderSettleMode} to use on open.
+    * @param receiverMode
+    *        The {@link ReceiverSettleMode} to use on open.
+    */
+   public AmqpReceiver(AmqpSession session, String address, String receiverId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) {
 
       if (address != null && address.isEmpty()) {
          throw new IllegalArgumentException("Address cannot be empty.");
@@ -97,6 +121,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
       this.session = session;
       this.address = address;
       this.receiverId = receiverId;
+      this.userSpecifiedSenderSettlementMode = senderMode;
+      this.userSpecifiedReceiverSettlementMode = receiverMode;
    }
 
    /**
@@ -113,9 +139,11 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
       }
 
       this.session = session;
-      this.userSpecifiedSource = source;
       this.address = source.getAddress();
       this.receiverId = receiverId;
+      this.userSpecifiedSource = source;
+      this.userSpecifiedSenderSettlementMode = null;
+      this.userSpecifiedReceiverSettlementMode = null;
    }
 
    /**
@@ -687,12 +715,25 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
       Receiver receiver = session.getEndpoint().receiver(receiverName);
       receiver.setSource(source);
       receiver.setTarget(target);
-      if (isPresettle()) {
-         receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
+
+      if (userSpecifiedSenderSettlementMode != null) {
+         receiver.setSenderSettleMode(userSpecifiedSenderSettlementMode);
+         if (SenderSettleMode.SETTLED.equals(userSpecifiedSenderSettlementMode)) {
+            setPresettle(true);
+         }
+      } else {
+         if (isPresettle()) {
+            receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
+         } else {
+            receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+         }
+      }
+
+      if (userSpecifiedReceiverSettlementMode != null) {
+         receiver.setReceiverSettleMode(userSpecifiedReceiverSettlementMode);
       } else {
-         receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+         receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
       }
-      receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
 
       setEndpoint(receiver);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9eed28e0/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index 0a41ce6..c9bc0d6 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -66,7 +66,10 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
    private final AmqpSession session;
    private final String address;
    private final String senderId;
+
    private final Target userSpecifiedTarget;
+   private final SenderSettleMode userSpecifiedSenderSettlementMode;
+   private final ReceiverSettleMode userSpecifiedReceiverSettlementMode;
 
    private boolean presettle;
    private long sendTimeout = DEFAULT_SEND_TIMEOUT;
@@ -81,11 +84,32 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
    /**
     * Create a new sender instance.
     *
-    * @param session  The parent session that created the session.
-    * @param address  The address that this sender produces to.
-    * @param senderId The unique ID assigned to this sender.
+    * @param session
+    *        The parent session that created the session.
+    * @param address
+    *        The address that this sender produces to.
+    * @param senderId
+    *        The unique ID assigned to this sender.
     */
    public AmqpSender(AmqpSession session, String address, String senderId) {
+      this(session, address, senderId, null, null);
+   }
+
+   /**
+    * Create a new sender instance.
+    *
+    * @param session
+    *        The parent session that created the session.
+    * @param address
+    *        The address that this sender produces to.
+    * @param senderId
+    *        The unique ID assigned to this sender.
+    * @param senderMode
+    *        The {@link SenderSettleMode} to use on open.
+    * @param receiverMode
+    *        The {@link ReceiverSettleMode} to use on open.
+    */
+   public AmqpSender(AmqpSession session, String address, String senderId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) {
 
       if (address != null && address.isEmpty()) {
          throw new IllegalArgumentException("Address cannot be empty.");
@@ -95,6 +119,8 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
       this.address = address;
       this.senderId = senderId;
       this.userSpecifiedTarget = null;
+      this.userSpecifiedSenderSettlementMode = senderMode;
+      this.userSpecifiedReceiverSettlementMode = receiverMode;
    }
 
    /**
@@ -111,9 +137,11 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
       }
 
       this.session = session;
-      this.userSpecifiedTarget = target;
       this.address = target.getAddress();
       this.senderId = senderId;
+      this.userSpecifiedTarget = target;
+      this.userSpecifiedSenderSettlementMode = null;
+      this.userSpecifiedReceiverSettlementMode = null;
    }
 
    /**
@@ -289,12 +317,25 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
       Sender sender = session.getEndpoint().sender(senderName);
       sender.setSource(source);
       sender.setTarget(target);
-      if (presettle) {
-         sender.setSenderSettleMode(SenderSettleMode.SETTLED);
+
+      if (userSpecifiedSenderSettlementMode != null) {
+         sender.setSenderSettleMode(userSpecifiedSenderSettlementMode);
+         if (SenderSettleMode.SETTLED.equals(userSpecifiedSenderSettlementMode)) {
+            presettle = true;
+         }
+      } else {
+         if (presettle) {
+            sender.setSenderSettleMode(SenderSettleMode.SETTLED);
+         } else {
+            sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+         }
+      }
+
+      if (userSpecifiedReceiverSettlementMode != null) {
+         sender.setReceiverSettleMode(userSpecifiedReceiverSettlementMode);
       } else {
-         sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+         sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
       }
-      sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
 
       sender.setDesiredCapabilities(desiredCapabilities);
       sender.setOfferedCapabilities(offeredCapabilities);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9eed28e0/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 9589a7e..677b354 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -28,6 +28,8 @@ import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Session;
 
@@ -174,6 +176,42 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
    }
 
    /**
+    * Create a sender instance using the given address
+    *
+    * @param address
+    *        the address to which the sender will produce its messages.
+    * @param senderSettlementMode
+    *        controls the settlement mode used by the created Sender
+    * @param receiverSettlementMode
+    *        controls the desired settlement mode used by the remote Receiver
+    *
+    * @return a newly created sender that is ready for use.
+    *
+    * @throws Exception if an error occurs while creating the sender.
+    */
+   public AmqpSender createSender(final String address, final SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception {
+      checkClosed();
+
+      final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(), senderMode, receiverMode);
+      final ClientFuture request = new ClientFuture();
+
+      connection.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            sender.setStateInspector(getStateInspector());
+            sender.open(request);
+            pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+
+      return sender;
+   }
+
+   /**
     * Create a sender instance using the given Target
     *
     * @param target the caller created and configured Target used to create the sender link.
@@ -310,6 +348,42 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
    }
 
    /**
+    * Create a receiver instance using the given address
+    *
+    * @param address
+    *        the address to which the receiver will subscribe for its messages.
+    * @param senderSettlementMode
+    *        controls the desired settlement mode used by the remote Sender
+    * @param receiverSettlementMode
+    *        controls the settlement mode used by the created Receiver
+    *
+    * @return a newly created receiver that is ready for use.
+    *
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createReceiver(String address, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception {
+      checkClosed();
+
+      final ClientFuture request = new ClientFuture();
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId(), senderMode, receiverMode);
+
+      connection.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            receiver.setStateInspector(getStateInspector());
+            receiver.open(request);
+            pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+
+      return receiver;
+   }
+
+   /**
     * Create a receiver instance using the given Source
     *
     * @param source the caller created and configured Source used to create the receiver link.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9eed28e0/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java
new file mode 100644
index 0000000..b47ad50
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.junit.Test;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testSenderSettlementModeSettledIsHonored() throws Exception {
+      doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED);
+   }
+
+   @Test(timeout = 60000)
+   public void testSenderSettlementModeUnsettledIsHonored() throws Exception {
+      doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED);
+   }
+
+   @Test(timeout = 60000)
+   public void testSenderSettlementModeMixedIsHonored() throws Exception {
+      doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED);
+   }
+
+   public void doTestSenderSettlementModeIsHonored(SenderSettleMode settleMode) throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), settleMode, ReceiverSettleMode.FIRST);
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertNotNull(queueView);
+      assertEquals(0, queueView.getMessageCount());
+      assertEquals(1, server.getTotalConsumerCount());
+
+      assertEquals(settleMode, receiver.getEndpoint().getRemoteSenderSettleMode());
+
+      receiver.close();
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testReceiverSettlementModeSetToFirst() throws Exception {
+      doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST);
+   }
+
+   @Test(timeout = 60000)
+   public void testReceiverSettlementModeSetToSecond() throws Exception {
+      doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND);
+   }
+
+   /*
+    * The Broker does not currently support ReceiverSettleMode of SECOND so we ensure that it
+    * always drops that back to FIRST to let the client know. The client will need to check and
+    * react accordingly.
+    */
+   private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode modeToUse) throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), SenderSettleMode.MIXED, modeToUse);
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertNotNull(queueView);
+      assertEquals(0, queueView.getMessageCount());
+      assertEquals(1, server.getTotalConsumerCount());
+
+      assertEquals(ReceiverSettleMode.FIRST, receiver.getEndpoint().getRemoteReceiverSettleMode());
+
+      receiver.close();
+      connection.close();
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9eed28e0/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
new file mode 100644
index 0000000..7b8cbef
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.junit.Test;
+
+/**
+ * Test broker behavior when creating AMQP senders
+ */
+public class AmqpSenderTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testSenderSettlementModeSettledIsHonored() throws Exception {
+      doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED);
+   }
+
+   @Test(timeout = 60000)
+   public void testSenderSettlementModeUnsettledIsHonored() throws Exception {
+      doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED);
+   }
+
+   @Test(timeout = 60000)
+   public void testSenderSettlementModeMixedIsHonored() throws Exception {
+      doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED);
+   }
+
+   public void doTestSenderSettlementModeIsHonored(SenderSettleMode settleMode) throws Exception {
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender("queue://" + getTestName(), settleMode, ReceiverSettleMode.FIRST);
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertNotNull(queueView);
+      assertEquals(0, queueView.getMessageCount());
+
+      assertEquals(settleMode, sender.getEndpoint().getRemoteSenderSettleMode());
+
+      AmqpMessage message = new AmqpMessage();
+      message.setText("Test-Message");
+      sender.send(message);
+
+      sender.close();
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testReceiverSettlementModeSetToFirst() throws Exception {
+      doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST);
+   }
+
+   @Test(timeout = 60000)
+   public void testReceiverSettlementModeSetToSecond() throws Exception {
+      doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND);
+   }
+
+   /*
+    * The Broker does not currently support ReceiverSettleMode of SECOND so we ensure that it
+    * always drops that back to FIRST to let the client know. The client will need to check and
+    * react accordingly.
+    */
+   private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode modeToUse) throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender("queue://" + getTestName(), SenderSettleMode.UNSETTLED, modeToUse);
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertNotNull(queueView);
+      assertEquals(0, queueView.getMessageCount());
+
+      assertEquals(ReceiverSettleMode.FIRST, sender.getEndpoint().getRemoteReceiverSettleMode());
+
+      sender.close();
+
+      connection.close();
+   }
+}
\ No newline at end of file


[2/2] activemq-artemis git commit: This closes #1221

Posted by ta...@apache.org.
This closes #1221


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/694dbd70
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/694dbd70
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/694dbd70

Branch: refs/heads/master
Commit: 694dbd70077b913b3026c520cd6b7926dc4ee5b4
Parents: 310f953 9eed28e
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Apr 21 15:48:06 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Apr 21 15:48:06 2017 -0400

----------------------------------------------------------------------
 .../proton/ProtonServerReceiverContext.java     |   7 ++
 .../amqp/proton/ProtonServerSenderContext.java  |  14 ++-
 .../transport/amqp/client/AmqpReceiver.java     |  59 +++++++++--
 .../transport/amqp/client/AmqpSender.java       |  59 +++++++++--
 .../transport/amqp/client/AmqpSession.java      |  76 +++++++++++++-
 .../integration/amqp/AmqpReceiverTest.java      |  99 ++++++++++++++++++
 .../tests/integration/amqp/AmqpSenderTest.java  | 104 +++++++++++++++++++
 7 files changed, 396 insertions(+), 22 deletions(-)
----------------------------------------------------------------------