You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/07/29 11:17:13 UTC

[qpid-broker-j] branch 7.0.x updated (23ae8c8 -> 21becf4)

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a change to branch 7.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git.


    from 23ae8c8  NO-JIRA: Amend travis descriptor
     new 67a3a52  QPID-8345: [Broker-J][AMQP 1.0] Dequeue messages sent non-transactionally as pre-settled
     new 21becf4  QPID-8343: [Broker-J][AMQP 1.0] Derive dynamic node durability from terminus expiry policy

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/qpid/server/model/BrokerTestHelper.java |  4 ++
 .../server/protocol/v1_0/ConsumerTarget_1_0.java   | 41 ++++++++++++++-
 .../qpid/server/protocol/v1_0/Session_1_0.java     | 43 ++++++++--------
 .../protocol/v1_0/type/messaging/Source.java       |  2 +-
 .../protocol/v1_0/type/messaging/Target.java       |  2 +-
 .../messaging/Terminus.java}                       | 21 ++++++--
 .../qpid/server/protocol/v1_0/Session_1_0Test.java | 58 ++++++++++++++++++++--
 .../protocol/v1_0/messaging/TransferTest.java      |  8 ++-
 8 files changed, 147 insertions(+), 32 deletions(-)
 copy broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/{SendingDestination.java => type/messaging/Terminus.java} (69%)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 01/02: QPID-8345: [Broker-J][AMQP 1.0] Dequeue messages sent non-transactionally as pre-settled

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 67a3a525a717835003e506105fb5d4763cda1ac5
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed Jul 24 11:31:42 2019 +0100

    QPID-8345: [Broker-J][AMQP 1.0] Dequeue messages sent non-transactionally as pre-settled
    
    (cherry picked from commit ca0d0d7ec22c31364c1adc32f5dffee0392ff230)
---
 .../server/protocol/v1_0/ConsumerTarget_1_0.java   | 41 +++++++++++++++++++++-
 .../protocol/v1_0/messaging/TransferTest.java      |  8 ++++-
 2 files changed, 47 insertions(+), 2 deletions(-)

diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 0d19b9d..c7a2cd6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -238,9 +238,14 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
 
             if (_linkEndpoint.isAttached())
             {
-                if (SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
+                boolean sendPreSettled = SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode());
+                if (sendPreSettled)
                 {
                     transfer.setSettled(true);
+                    if (_acquires && _transactionId == null)
+                    {
+                        transfer.setState(new Accepted());
+                    }
                 }
                 else
                 {
@@ -295,6 +300,11 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
                 }
                 getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
                 getEndpoint().transfer(transfer, false);
+
+                if (sendPreSettled && _acquires && _transactionId == null)
+                {
+                    handleAcquiredEntrySentPareSettledNonTransactional(entry, consumer);
+                }
             }
             else
             {
@@ -312,6 +322,35 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
         }
     }
 
+    private void handleAcquiredEntrySentPareSettledNonTransactional(final MessageInstance entry,
+                                                                    final MessageInstanceConsumer consumer)
+    {
+        if (entry.makeAcquisitionUnstealable(consumer))
+        {
+            final ServerTransaction txn = _linkEndpoint.getAsyncAutoCommitTransaction();
+            txn.dequeue(entry.getEnqueueRecord(),
+                        new ServerTransaction.Action()
+                        {
+                            @Override
+                            public void postCommit()
+                            {
+                                entry.delete();
+                            }
+
+                            @Override
+                            public void onRollback()
+                            {
+                                entry.release(consumer);
+                            }
+                        });
+            txn.commit();
+        }
+        else
+        {
+            entry.release(consumer);
+        }
+    }
+
     @Override
     public void flushBatched()
     {
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 70c479b..bd16988 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -737,7 +737,6 @@ public class TransferTest extends BrokerAdminUsingTestBase
                                                      .begin().consumeResponse()
                                                      .attachRole(Role.RECEIVER)
                                                      .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
-                                                     .attachRcvSettleMode(ReceiverSettleMode.FIRST)
                                                      .attachSndSettleMode(SenderSettleMode.SETTLED)
                                                      .attach().consumeResponse(Attach.class);
             Attach attach = interaction.getLatestResponse(Attach.class);
@@ -760,6 +759,13 @@ public class TransferTest extends BrokerAdminUsingTestBase
             // verify no unexpected performative received by closing the connection
             interaction.doCloseConnection();
         }
+
+        if (getBrokerAdmin().isQueueDepthSupported())
+        {
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+        }
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "test");
+        assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo("test")));
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 02/02: QPID-8343: [Broker-J][AMQP 1.0] Derive dynamic node durability from terminus expiry policy

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 21becf4e443120c29ef212913764a345bdf85e76
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed Jul 24 18:03:05 2019 +0100

    QPID-8343: [Broker-J][AMQP 1.0] Derive dynamic node durability from terminus expiry policy
    
    (cherry picked from commit bf75f490170beb91f72547c00c365bf024cbaa1f)
---
 .../apache/qpid/server/model/BrokerTestHelper.java |  4 ++
 .../qpid/server/protocol/v1_0/Session_1_0.java     | 43 ++++++++--------
 .../protocol/v1_0/type/messaging/Source.java       |  2 +-
 .../protocol/v1_0/type/messaging/Target.java       |  2 +-
 .../protocol/v1_0/type/messaging/Terminus.java     | 43 ++++++++++++++++
 .../qpid/server/protocol/v1_0/Session_1_0Test.java | 58 ++++++++++++++++++++--
 6 files changed, 127 insertions(+), 25 deletions(-)

diff --git a/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java b/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java
index 9573bdb..11138af 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java
@@ -411,4 +411,8 @@ public class BrokerTestHelper
         }
     }
 
+    public static <X extends ConfiguredObject> X mockAsSystemPrincipalSource(Class<X> clazz)
+    {
+        return mockWithSystemPrincipal(clazz, SYSTEM_PRINCIPAL);
+    }
 }
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index f4f0857..bc730e7 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -28,7 +28,6 @@ import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.text.MessageFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -89,6 +88,8 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoMessages;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Terminus;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
@@ -665,7 +666,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         {
             if (Boolean.TRUE.equals(target.getDynamic()))
             {
-                MessageDestination tempDestination = createDynamicDestination(link, target.getDynamicNodeProperties(), target.getCapabilities());
+                MessageDestination tempDestination = createDynamicDestination(link, target);
                 if(tempDestination != null)
                 {
                     target.setAddress(_primaryDomain + tempDestination.getName());
@@ -744,10 +745,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
         if (Boolean.TRUE.equals(source.getDynamic()))
         {
-            final Set<Symbol> sourceCapabilities = source.getCapabilities() == null
-                    ? Collections.emptySet()
-                    : new HashSet<>(Arrays.asList(source.getCapabilities()));
-            MessageSource tempSource = createDynamicSource(link, source.getDynamicNodeProperties(), sourceCapabilities);
+            MessageSource tempSource = createDynamicSource(link, source);
             if(tempSource != null)
             {
                 source.setAddress(_primaryDomain + tempSource.getName());
@@ -822,21 +820,23 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
     }
 
     private MessageSource createDynamicSource(final Link_1_0<?, ?> link,
-                                              Map properties,
-                                              final Set<Symbol> capabilities) throws AmqpErrorException
+                                              final Terminus terminus) throws AmqpErrorException
     {
         // TODO temporary topics?
         final String queueName = "TempQueue" + UUID.randomUUID().toString();
         try
         {
-            Map<String, Object> attributes = convertDynamicNodePropertiesToAttributes(link, properties, queueName);
+            final Map<String, Object> attributes = createDynamicNodeAttributes(link, terminus, queueName);
 
-            if (capabilities.contains(Symbol.valueOf("temporary-queue"))
-                || capabilities.contains(Symbol.valueOf("temporary-topic")))
+            if (terminus.getCapabilities() != null)
             {
-                attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION);
+                final Set<Symbol> capabilities = Sets.newHashSet(terminus.getCapabilities());
+                if (capabilities.contains(Symbol.valueOf("temporary-queue"))
+                    || capabilities.contains(Symbol.valueOf("temporary-topic")))
+                {
+                    attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION);
+                }
             }
-
             return Subject.doAs(getSubjectWithAddedSystemRights(),
                                 (PrivilegedAction<MessageSource>) () -> getAddressSpace().createMessageSource(MessageSource.class, attributes));
         }
@@ -853,15 +853,15 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
 
     private MessageDestination createDynamicDestination(final Link_1_0<?, ?> link,
-                                                        Map properties,
-                                                        final Symbol[] capabilities) throws AmqpErrorException
+                                                        final Terminus terminus) throws AmqpErrorException
     {
+        final Symbol[] capabilities = terminus.getCapabilities();
         final Set<Symbol> capabilitySet = capabilities == null ? Collections.emptySet() : Sets.newHashSet(capabilities);
         boolean isTopic = capabilitySet.contains(Symbol.valueOf("temporary-topic")) || capabilitySet.contains(Symbol.valueOf("topic"));
         final String destName = (isTopic ? "TempTopic" : "TempQueue") + UUID.randomUUID().toString();
         try
         {
-            Map<String, Object> attributes = convertDynamicNodePropertiesToAttributes(link, properties, destName);
+            final Map<String, Object> attributes = createDynamicNodeAttributes(link, terminus, destName);
 
 
             Class<? extends MessageDestination> clazz = isTopic ? Exchange.class : MessageDestination.class;
@@ -888,11 +888,14 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         }
     }
 
-    private Map<String, Object> convertDynamicNodePropertiesToAttributes(final Link_1_0<?, ?> link,
-                                                                         final Map properties,
-                                                                         final String nodeName)
+    private Map<String, Object> createDynamicNodeAttributes(final Link_1_0<?, ?> link,
+                                                            final Terminus terminus,
+                                                            final String nodeName)
     {
         // TODO convert AMQP 1-0 node properties to queue attributes
+
+        final Map<Symbol, Object> properties = terminus.getDynamicNodeProperties();
+        final TerminusExpiryPolicy expiryPolicy = terminus.getExpiryPolicy();
         LifetimePolicy lifetimePolicy = properties == null
                                         ? null
                                         : (LifetimePolicy) properties.get(LIFETIME_POLICY);
@@ -900,7 +903,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         Map<String,Object> attributes = new HashMap<>();
         attributes.put(ConfiguredObject.ID, UUID.randomUUID());
         attributes.put(ConfiguredObject.NAME, nodeName);
-        attributes.put(ConfiguredObject.DURABLE, true);
+        attributes.put(ConfiguredObject.DURABLE, TerminusExpiryPolicy.NEVER.equals(expiryPolicy));
 
         if(lifetimePolicy instanceof DeleteOnNoLinks)
         {
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java
index 47e67d8..547958c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java
@@ -37,7 +37,7 @@ import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 
 @CompositeType( symbolicDescriptor = "amqp:source:list", numericDescriptor = 0x0000000000000028L)
-public class Source implements BaseSource
+public class Source implements BaseSource, Terminus
 {
     @CompositeTypeField(index = 0)
     private String _address;
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java
index 0f81898..e44670d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java
@@ -34,7 +34,7 @@ import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 
 @CompositeType( symbolicDescriptor = "amqp:target:list", numericDescriptor = 0x0000000000000029L)
-public class Target implements BaseTarget
+public class Target implements BaseTarget, Terminus
 {
     @CompositeTypeField(index = 0)
     private String _address;
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Terminus.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Terminus.java
new file mode 100644
index 0000000..d750daa
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Terminus.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.type.messaging;
+
+import java.util.Map;
+
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+
+public interface Terminus
+{
+    String getAddress();
+
+    TerminusDurability getDurable();
+
+    TerminusExpiryPolicy getExpiryPolicy();
+
+    UnsignedInteger getTimeout();
+
+    Boolean getDynamic();
+
+    Map<Symbol, Object> getDynamicNodeProperties();
+
+    Symbol[] getCapabilities();
+}
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index b5ba0aa..ac8eece 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -63,6 +63,7 @@ import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
@@ -496,6 +497,51 @@ public class Session_1_0Test extends QpidTestCase
         assertAttachSent(connection2, session2, attach);
     }
 
+    public void testAttachSourceDynamicWithLifeTimePolicyDeleteOnClose()
+    {
+        final Attach attach = createReceiverAttach(getTestName());
+        final Source source = createDynamicSource(new DeleteOnClose());
+        attach.setSource(source);
+
+        _session.receiveAttach(attach);
+
+        assertQueueDurability(getDynamicNodeAddressFromAttachResponse(), false);
+    }
+
+    public void testAttachSourceDynamicWithLifeTimePolicyDeleteOnCloseAndExpiryPolicyNever()
+    {
+        final Attach attach = createReceiverAttach(getTestName());
+        final Source source = createDynamicSource(new DeleteOnClose());
+        source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+        attach.setSource(source);
+
+        _session.receiveAttach(attach);
+
+        assertQueueDurability(getDynamicNodeAddressFromAttachResponse(), true);
+    }
+
+    private Source createDynamicSource(final DeleteOnClose lifetimePolicy)
+    {
+        final Source source = new Source();
+        source.setDynamic(true);
+        source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, lifetimePolicy));
+        return source;
+    }
+
+    private String getDynamicNodeAddressFromAttachResponse()
+    {
+        final Attach sentAttach = captureAttach(_connection, _session, 0);
+        assertTrue(sentAttach.getSource() instanceof Source);
+        return ((Source) (sentAttach.getSource())).getAddress();
+    }
+
+    public void assertQueueDurability(final String queueName, final boolean expectedDurability)
+    {
+        final Queue queue = _virtualHost.getChildByName(Queue.class, queueName);
+        assertNotNull("Queue not found", queue);
+        assertEquals("Unexpected durability", queue.isDurable(), expectedDurability);
+    }
+
     private void assertFilter(final Attach sentAttach, final String selectorExpression)
     {
         Source source = (Source)sentAttach.getSource();
@@ -675,7 +721,7 @@ public class Session_1_0Test extends QpidTestCase
                                 final boolean isGlobal,
                                 final boolean isShared)
     {
-        Attach attach = new Attach();
+        Attach attach = createReceiverAttach(linkName);
         Source source = new Source();
 
         List<Symbol> capabilities = new ArrayList<>();
@@ -701,14 +747,20 @@ public class Session_1_0Test extends QpidTestCase
             source.setDurable(TerminusDurability.NONE);
             source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
         }
+        source.setAddress(address);
         attach.setSource(source);
+        return attach;
+    }
+
+    private Attach createReceiverAttach(String linkName)
+    {
+        final Attach attach = new Attach();
         Target target = new Target();
         attach.setTarget(target);
         attach.setHandle(new UnsignedInteger(_handle++));
         attach.setIncompleteUnsettled(false);
         attach.setName(linkName);
         attach.setRole(Role.RECEIVER);
-        source.setAddress(address);
         return attach;
     }
 
@@ -719,7 +771,7 @@ public class Session_1_0Test extends QpidTestCase
 
     private AMQPConnection_1_0 createAmqpConnection_1_0(String containerId)
     {
-        AMQPConnection_1_0 connection = mock(AMQPConnection_1_0.class);
+        AMQPConnection_1_0 connection = BrokerTestHelper.mockAsSystemPrincipalSource(AMQPConnection_1_0.class);
         Subject subject =
                 new Subject(true, Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
         when(connection.getSubject()).thenReturn(subject);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org