You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/25 22:55:13 UTC

svn commit: r1776037 [2/2] - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/java/org/apache/qpid/server/exchange/topic/ broker-core/src/main/java/org/apache/qpid/server/filter/ broker-core/src/main/...

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Sun Dec 25 22:55:13 2016
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.exchange;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anySet;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
@@ -27,6 +28,7 @@ import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -42,16 +44,23 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.BrokerModel;
 import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -84,6 +93,7 @@ public class HeadersExchangeTest extends
         when(_virtualHost.getCategoryClass()).thenReturn(VirtualHost.class);
         when(_virtualHost.getTaskExecutor()).thenReturn(_taskExecutor);
         when(_virtualHost.getChildExecutor()).thenReturn(_taskExecutor);
+        when(_virtualHost.getState()).thenReturn(State.ACTIVE);
 
         _factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
         when(_virtualHost.getObjectFactory()).thenReturn(_factory);
@@ -106,7 +116,7 @@ public class HeadersExchangeTest extends
 
     protected void routeAndTest(ServerMessage msg, Queue<?>... expected) throws Exception
     {
-        List<? extends BaseQueue> results = _exchange.route(msg, "", InstanceProperties.EMPTY);
+        List<? extends BaseQueue> results = routeToQueues(msg, "", InstanceProperties.EMPTY);
         List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results);
         unexpected.removeAll(Arrays.asList(expected));
         assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty());
@@ -116,6 +126,88 @@ public class HeadersExchangeTest extends
         assertTrue("Duplicates " + results, results.size()==(new HashSet<BaseQueue>(results)).size());
     }
 
+    private List<? extends BaseQueue> routeToQueues(final ServerMessage message,
+                                                    final String routingAddress,
+                                                    final InstanceProperties instanceProperties)
+    {
+        RoutingResult result = _exchange.route(message, routingAddress, instanceProperties);
+        final List<BaseQueue> resultQueues = new ArrayList<>();
+        result.send(new ServerTransaction()
+        {
+            @Override
+            public long getTransactionStartTime()
+            {
+                return 0;
+            }
+
+            @Override
+            public long getTransactionUpdateTime()
+            {
+                return 0;
+            }
+
+            @Override
+            public void addPostTransactionAction(final Action postTransactionAction)
+            {
+
+            }
+
+            @Override
+            public void dequeue(final MessageEnqueueRecord record, final Action postTransactionAction)
+            {
+
+            }
+
+            @Override
+            public void dequeue(final Collection<MessageInstance> messages, final Action postTransactionAction)
+            {
+
+            }
+
+            @Override
+            public void enqueue(final TransactionLogResource queue,
+                                final EnqueueableMessage message,
+                                final EnqueueAction postTransactionAction)
+            {
+                resultQueues.add((BaseQueue) queue);
+            }
+
+            @Override
+            public void enqueue(final Collection<? extends BaseQueue> queues,
+                                final EnqueueableMessage message,
+                                final EnqueueAction postTransactionAction)
+            {
+                resultQueues.addAll(queues);
+            }
+
+            @Override
+            public void commit()
+            {
+
+            }
+
+            @Override
+            public void commit(final Runnable immediatePostTransactionAction)
+            {
+
+            }
+
+            @Override
+            public void rollback()
+            {
+
+            }
+
+            @Override
+            public boolean isTransactional()
+            {
+                return false;
+            }
+        }, null);
+
+        return resultQueues;
+    }
+
 
     private Queue<?> createAndBind(final String name, String... arguments)
             throws Exception
@@ -169,6 +261,9 @@ public class HeadersExchangeTest extends
         when(q.getTaskExecutor()).thenReturn(taskExecutor);
         when(q.getChildExecutor()).thenReturn(taskExecutor);
         when(_virtualHost.getAttainedQueue(name)).thenReturn(q);
+        final RoutingResult routingResult = new RoutingResult(null);
+        routingResult.addQueue(q);
+        when(q.route(any(ServerMessage.class), anyString(), any(InstanceProperties.class))).thenReturn(routingResult);
         return q;
     }
 
@@ -282,6 +377,7 @@ public class HeadersExchangeTest extends
         });
         final ServerMessage serverMessage = mock(ServerMessage.class);
         when(serverMessage.getMessageHeader()).thenReturn(header);
+        when(serverMessage.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
         return serverMessage;
     }
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Sun Dec 25 22:55:13 2016
@@ -25,6 +25,8 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -34,15 +36,20 @@ import org.junit.Assert;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class TopicExchangeTest extends QpidTestCase
@@ -353,24 +360,31 @@ public class TopicExchangeTest extends Q
         _exchange.bind(queue.getName(), bindingKey, bindArgs, false);
 
         ServerMessage matchMsg1 = mock(ServerMessage.class);
+        when(matchMsg1.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
         AMQMessageHeader msgHeader1 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6));
         when(matchMsg1.getMessageHeader()).thenReturn(msgHeader1);
         routeMessage(matchMsg1, bindingKey, 1);
         Assert.assertEquals("First message should be routed to queue", 1, queue.getQueueDepthMessages());
 
         ServerMessage nonmatchMsg2 = mock(ServerMessage.class);
+        when(nonmatchMsg2.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
+
         AMQMessageHeader msgHeader2 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 5));
         when(nonmatchMsg2.getMessageHeader()).thenReturn(msgHeader2);
         routeMessage(nonmatchMsg2, bindingKey, 2);
         Assert.assertEquals("Second message should not be routed to queue", 1, queue.getQueueDepthMessages());
 
         ServerMessage nonmatchMsg3 = mock(ServerMessage.class);
+        when(nonmatchMsg3.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
+
         AMQMessageHeader msgHeader3 = createMessageHeader(Collections.<String, Object>emptyMap());
         when(nonmatchMsg3.getMessageHeader()).thenReturn(msgHeader3);
         routeMessage(nonmatchMsg3, bindingKey, 3);
         Assert.assertEquals("Third message should not be routed to queue", 1, queue.getQueueDepthMessages());
 
         ServerMessage matchMsg4 = mock(ServerMessage.class);
+        when(matchMsg4.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
+
         AMQMessageHeader msgHeader4 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 7));
         when(matchMsg4.getMessageHeader()).thenReturn(msgHeader4);
         routeMessage(matchMsg4, bindingKey, 4);
@@ -388,6 +402,7 @@ public class TopicExchangeTest extends Q
 
         AMQMessageHeader mgsHeader1 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6));
         ServerMessage msg1 = mock(ServerMessage.class);
+        when(msg1.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
         when(msg1.getMessageHeader()).thenReturn(mgsHeader1);
 
         routeMessage(msg1, bindingKey, 1);
@@ -400,6 +415,7 @@ public class TopicExchangeTest extends Q
         // Message that would have matched the original selector but not the new
         AMQMessageHeader mgsHeader2 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6));
         ServerMessage msg2 = mock(ServerMessage.class);
+        when(msg2.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
         when(msg2.getMessageHeader()).thenReturn(mgsHeader2);
 
         routeMessage(msg2, bindingKey, 2);
@@ -408,6 +424,7 @@ public class TopicExchangeTest extends Q
         // Message that matches only the second
         AMQMessageHeader mgsHeader3 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 7));
         ServerMessage msg3 = mock(ServerMessage.class);
+        when(msg3.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
         when(msg3.getMessageHeader()).thenReturn(mgsHeader3);
 
         routeMessage(msg3, bindingKey, 2);
@@ -425,7 +442,7 @@ public class TopicExchangeTest extends Q
         _exchange.bind(queue.getName(), bindingKey, null, false);
 
         ServerMessage msg1 = mock(ServerMessage.class);
-
+        when(msg1.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
         routeMessage(msg1, bindingKey, 1);
         Assert.assertEquals(1, queue.getQueueDepthMessages());
 
@@ -436,6 +453,7 @@ public class TopicExchangeTest extends Q
         // Message that does not match the new selector
         AMQMessageHeader mgsHeader2 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6));
         ServerMessage msg2 = mock(ServerMessage.class);
+        when(msg2.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
         when(msg2.getMessageHeader()).thenReturn(mgsHeader2);
 
         routeMessage(msg2, bindingKey, 2);
@@ -444,6 +462,8 @@ public class TopicExchangeTest extends Q
         // Message that matches the selector
         AMQMessageHeader mgsHeader3 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 7));
         ServerMessage msg3 = mock(ServerMessage.class);
+        when(msg3.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
+
         when(msg3.getMessageHeader()).thenReturn(mgsHeader3);
 
         routeMessage(msg3, bindingKey, 2);
@@ -455,13 +475,14 @@ public class TopicExchangeTest extends Q
     private int routeMessage(String routingKey, long messageNumber)
     {
         ServerMessage message = mock(ServerMessage.class);
+        when(message.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
         return routeMessage(message, routingKey, messageNumber);
     }
 
     private int routeMessage(ServerMessage message, String routingKey, long messageNumber)
     {
         when(message.getInitialRoutingAddress()).thenReturn(routingKey);
-        List<? extends BaseQueue> queues = _exchange.route(message, routingKey, InstanceProperties.EMPTY);
+        List<? extends BaseQueue> queues = routeToQueues(message, routingKey, InstanceProperties.EMPTY);
         MessageReference ref = mock(MessageReference.class);
         when(ref.getMessage()).thenReturn(message);
         when(message.newReference()).thenReturn(ref);
@@ -475,6 +496,88 @@ public class TopicExchangeTest extends Q
         return queues.size();
     }
 
+    private List<? extends BaseQueue> routeToQueues(final ServerMessage message,
+                                                    final String routingAddress,
+                                                    final InstanceProperties instanceProperties)
+    {
+        RoutingResult result = _exchange.route(message, routingAddress, instanceProperties);
+        final List<BaseQueue> resultQueues = new ArrayList<>();
+        result.send(new ServerTransaction()
+        {
+            @Override
+            public long getTransactionStartTime()
+            {
+                return 0;
+            }
+
+            @Override
+            public long getTransactionUpdateTime()
+            {
+                return 0;
+            }
+
+            @Override
+            public void addPostTransactionAction(final Action postTransactionAction)
+            {
+
+            }
+
+            @Override
+            public void dequeue(final MessageEnqueueRecord record, final Action postTransactionAction)
+            {
+
+            }
+
+            @Override
+            public void dequeue(final Collection<MessageInstance> messages, final Action postTransactionAction)
+            {
+
+            }
+
+            @Override
+            public void enqueue(final TransactionLogResource queue,
+                                final EnqueueableMessage message,
+                                final EnqueueAction postTransactionAction)
+            {
+                resultQueues.add((BaseQueue) queue);
+            }
+
+            @Override
+            public void enqueue(final Collection<? extends BaseQueue> queues,
+                                final EnqueueableMessage message,
+                                final EnqueueAction postTransactionAction)
+            {
+                resultQueues.addAll(queues);
+            }
+
+            @Override
+            public void commit()
+            {
+
+            }
+
+            @Override
+            public void commit(final Runnable immediatePostTransactionAction)
+            {
+
+            }
+
+            @Override
+            public void rollback()
+            {
+
+            }
+
+            @Override
+            public boolean isTransactional()
+            {
+                return false;
+            }
+        }, null);
+
+        return resultQueues;
+    }
+
     private AMQMessageHeader createMessageHeader(Map<String, Object> headers)
     {
         AMQMessageHeader messageHeader = mock(AMQMessageHeader.class);

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java Sun Dec 25 22:55:13 2016
@@ -54,7 +54,6 @@ public class AsyncAutoCommitTransactionT
 
         when(_messageStore.newTransaction()).thenReturn(_storeTransaction);
         when(_storeTransaction.commitTranAsync((Void) null)).thenReturn(_future);
-        when(_queue.isDurable()).thenReturn(true);
         when(_queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
     }
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Sun Dec 25 22:55:13 2016
@@ -424,7 +424,6 @@ public class AutoCommitTransactionTest e
     private BaseQueue createTestAMQQueue(final boolean durable)
     {
         BaseQueue queue = mock(BaseQueue.class);
-        when(queue.isDurable()).thenReturn(durable);
         when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER);
         return queue;
     }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java Sun Dec 25 22:55:13 2016
@@ -658,7 +658,6 @@ public class LocalTransactionTest extend
     private BaseQueue createQueue(final boolean durable)
     {
         BaseQueue queue = mock(BaseQueue.class);
-        when(queue.isDurable()).thenReturn(durable);
         when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER);
         return queue;
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Sun Dec 25 22:55:13 2016
@@ -55,7 +55,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
@@ -65,6 +64,7 @@ import org.apache.qpid.server.message.In
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfigurationChangeListener;
@@ -98,22 +98,7 @@ import org.apache.qpid.server.txn.Unknow
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.Deletable;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.Binary;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlow;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageSetFlowMode;
-import org.apache.qpid.transport.MessageStop;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.RangeSetFactory;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.Xid;
+import org.apache.qpid.transport.*;
 import org.apache.qpid.transport.network.Ticker;
 
 public class ServerSession extends Session
@@ -298,10 +283,9 @@ public class ServerSession extends Sessi
             _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
             invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
         }
-        int enqueues = exchange.send(message,
-                                     message.getInitialRoutingAddress(),
-                                     instanceProperties, _transaction, _checkCapacityAction
-                                    );
+        final RoutingResult<MessageTransferMessage> result =
+                exchange.route(message, message.getInitialRoutingAddress(), instanceProperties);
+        int enqueues = result.send(_transaction, _checkCapacityAction);
         getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
         incrementOutstandingTxnsIfNecessary();
         incrementUncommittedMessageSize(message.getStoredMessage());

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sun Dec 25 22:55:13 2016
@@ -81,6 +81,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.*;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -494,11 +495,12 @@ public class AMQChannel
                                     }
                                 };
 
-                        int enqueues = destination.send(amqMessage,
-                                                        amqMessage.getInitialRoutingAddress(),
-                                                        instanceProperties, _transaction,
-                                                        immediate ? _immediateAction : _capacityCheckAction
-                                                       );
+                        final RoutingResult<AMQMessage> result =
+                                destination.route(amqMessage,
+                                                  amqMessage.getInitialRoutingAddress(),
+                                                  instanceProperties);
+
+                        int enqueues = result.send(_transaction, immediate ? _immediateAction : _capacityCheckAction);
                         if (enqueues == 0)
                         {
                             finallyAction = handleUnroutableMessage(amqMessage);

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java Sun Dec 25 22:55:13 2016
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -37,6 +38,10 @@ import java.util.Set;
 
 import javax.security.auth.Subject;
 
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.MethodRegistry;
@@ -46,6 +51,7 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.BrokerModel;
@@ -61,8 +67,6 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.store.NullMessageStore;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMemoryMessage;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -213,7 +217,16 @@ public class AMQChannelTest extends Qpid
                 return messageHandle;
             }
         });
-
+        final ArgumentCaptor<ServerMessage> messageCaptor = ArgumentCaptor.forClass(ServerMessage.class);
+        doAnswer(new Answer()
+        {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable
+            {
+                ServerMessage message = messageCaptor.getValue();
+                return new RoutingResult(message);
+            }
+        }).when(_messageDestination).route(messageCaptor.capture(), eq(ROUTING_KEY.toString()), any(InstanceProperties.class));
         AMQChannel channel = new AMQChannel(_amqConnection, 1, _virtualHost.getMessageStore());
 
         BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
@@ -221,10 +234,8 @@ public class AMQChannelTest extends Qpid
         channel.receiveBasicPublish(AMQShortString.EMPTY_STRING, ROUTING_KEY, false, false);
         channel.receiveMessageHeader(properties, 0);
 
-        verify(_messageDestination).send((ServerMessage) any(),
+        verify(_messageDestination).route((ServerMessage) any(),
                                          eq(ROUTING_KEY.toString()),
-                                         any(InstanceProperties.class),
-                                         any(ServerTransaction.class),
-                                         any(Action.class) );
+                                         any(InstanceProperties.class));
     }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Sun Dec 25 22:55:13 2016
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -105,11 +106,10 @@ public class ExchangeDestination impleme
                     return null;
                 }};
 
-        int enqueues = _exchange.send(message,
-                                      routingAddress,
-                                      instanceProperties,
-                                      txn,
-                                      action);
+        final RoutingResult result = _exchange.route(message,
+                                                     routingAddress,
+                                                     instanceProperties);
+        int enqueues = result.send(txn, action);
 
         if(enqueues == 0)
         {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Sun Dec 25 22:55:13 2016
@@ -28,6 +28,7 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -104,7 +105,8 @@ public class NodeReceivingDestination im
                     return null;
                 }};
 
-        int enqueues = _destination.send(message, routingAddress, instanceProperties, txn, action);
+        RoutingResult result = _destination.route(message, routingAddress, instanceProperties);
+        int enqueues = result.send(txn, action);
 
         if(enqueues == 0)
         {

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java Sun Dec 25 22:55:13 2016
@@ -43,9 +43,9 @@ import org.apache.qpid.exchange.Exchange
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageSender;
 import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Connection;
@@ -64,7 +64,6 @@ import org.apache.qpid.server.store.Stor
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.DtxNotSupportedException;
 import org.apache.qpid.server.txn.DtxRegistry;
-import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNode;
@@ -354,20 +353,18 @@ public class ManagementAddressSpace impl
         }
 
         @Override
-        public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
-                                                                                     final String routingAddress,
-                                                                                     final InstanceProperties instanceProperties,
-                                                                                     final ServerTransaction txn,
-                                                                                     final Action<? super MessageInstance> postEnqueueAction)
+        public <M extends ServerMessage<? extends StorableMessageMetaData>> RoutingResult<M> route(final M message,
+                                                                                                   final String routingAddress,
+                                                                                                   final InstanceProperties instanceProperties)
         {
             MessageDestination destination = getAttainedMessageDestination(routingAddress);
             if(destination == null || destination == this)
             {
-                return 0;
+                return new RoutingResult<>(message);
             }
             else
             {
-                return destination.send(message, routingAddress, instanceProperties, txn, postEnqueueAction);
+                return destination.route(message, routingAddress, instanceProperties);
             }
         }
 

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Sun Dec 25 22:55:13 2016
@@ -64,6 +64,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageSender;
 import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.message.internal.InternalMessageHeader;
@@ -81,6 +82,7 @@ import org.apache.qpid.server.model.Publ
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -92,7 +94,7 @@ import org.apache.qpid.server.util.Actio
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.util.StateChangeListener;
 
-class ManagementNode implements MessageSource, MessageDestination
+class ManagementNode implements MessageSource, MessageDestination, BaseQueue
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ManagementNode.class);
 
@@ -173,6 +175,15 @@ class ManagementNode implements MessageS
 
     private final ManagementInputConverter _managementInputConverter;
 
+    private static final InstanceProperties CONSUMED_INSTANCE_PROPERTIES = new InstanceProperties()
+    {
+        @Override
+        public Object getProperty(final Property prop)
+        {
+            return null;
+        }
+    };
+
     ManagementNode(final NamedAddressSpace addressSpace,
                    final ConfiguredObject<?> configuredObject)
     {
@@ -332,12 +343,11 @@ class ManagementNode implements MessageS
     }
 
     @Override
-    public  <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
-                                                                                  final String routingAddress,
-                                                                                  final InstanceProperties instanceProperties,
-                                                                                  final ServerTransaction txn,
-                                                                                  final Action<? super MessageInstance> postEnqueueAction)
+    public <M extends ServerMessage<? extends StorableMessageMetaData>> RoutingResult<M> route(final M message,
+                                                                                               final String routingAddress,
+                                                                                               final InstanceProperties instanceProperties)
     {
+        final RoutingResult<M> result = new RoutingResult<>(message);
         if(message.isResourceAcceptable(this))
         {
             @SuppressWarnings("unchecked")
@@ -347,33 +357,12 @@ class ManagementNode implements MessageS
 
             if (converter != null)
             {
-                final InternalMessage msg = converter.convert(message, _addressSpace);
-                txn.addPostTransactionAction(new ServerTransaction.Action()
-                {
-                    @Override
-                    public void postCommit()
-                    {
-                        enqueue(msg, instanceProperties, postEnqueueAction);
-                    }
-
-                    @Override
-                    public void onRollback()
-                    {
-
-                    }
-                });
-
-                return 1;
+                result.addQueue(this);
             }
-            else
-            {
-                return 0;
-            }
-        }
-        else
-        {
-            return 0;
+
         }
+        return result;
+
     }
 
     @Override
@@ -408,7 +397,7 @@ class ManagementNode implements MessageS
         String id = (String) message.getMessageHeader().getHeader(IDENTITY_ATTRIBUTE);
         String type = (String) message.getMessageHeader().getHeader(TYPE_ATTRIBUTE);
         String operation = (String) message.getMessageHeader().getHeader(OPERATION_HEADER);
-
+        LOGGER.debug("Management Node identity: {}, type: {}, operation {}", id, type, operation);
         InternalMessage response;
 
         // TODO - handle runtime exceptions
@@ -433,6 +422,26 @@ class ManagementNode implements MessageS
 
     }
 
+    @Override
+    public void enqueue(final ServerMessage message,
+                        final Action<? super MessageInstance> action,
+                        final MessageEnqueueRecord record)
+    {
+        MessageConverter<ServerMessage, InternalMessage> converter =
+                (MessageConverter<ServerMessage, InternalMessage>) MessageConverterRegistry.getConverter((message.getClass()), InternalMessage.class);
+
+        final InternalMessage msg = converter.convert(message, _addressSpace);
+
+        enqueue(msg, CONSUMED_INSTANCE_PROPERTIES, action);
+
+    }
+
+    @Override
+    public boolean isDeleted()
+    {
+        return false;
+    }
+
     private interface StandardOperation
     {
         String getName();
@@ -982,11 +991,9 @@ class ManagementNode implements MessageS
         String replyTo = message.getMessageHeader().getReplyTo();
         response.setInitialRoutingAddress(replyTo);
 
-
-        getResponseDestination(replyTo).send(response,
-                                             replyTo, InstanceProperties.EMPTY,
-                                             new AutoCommitTransaction(_addressSpace.getMessageStore()),
-                                             null);
+        final MessageDestination responseDestination = getResponseDestination(replyTo);
+        RoutingResult<InternalMessage> result = responseDestination.route(response, replyTo, InstanceProperties.EMPTY);
+        result.send(new AutoCommitTransaction(_addressSpace.getMessageStore()), null);
 
     }
 
@@ -1533,14 +1540,13 @@ class ManagementNode implements MessageS
 
     private class ConsumedMessageInstance implements MessageInstance
     {
+
         private final ServerMessage _message;
-        private final InstanceProperties _properties;
 
         ConsumedMessageInstance(final ServerMessage message,
                                 final InstanceProperties properties)
         {
             _message = message;
-            _properties = properties;
         }
 
         @Override
@@ -1741,7 +1747,7 @@ class ManagementNode implements MessageS
         @Override
         public InstanceProperties getInstanceProperties()
         {
-            return _properties;
+            return CONSUMED_INSTANCE_PROPERTIES;
         }
 
         @Override

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Sun Dec 25 22:55:13 2016
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.InstanceProperties;
@@ -32,24 +33,28 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.message.MessageContainer;
+import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 
-class ManagementNodeConsumer<T extends ConsumerTarget> implements MessageInstanceConsumer<T>, MessageDestination
+class ManagementNodeConsumer<T extends ConsumerTarget> implements MessageInstanceConsumer<T>, MessageDestination,
+                                                                  BaseQueue
 {
     private final ManagementNode _managementNode;
     private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
     private final T _target;
     private final String _name;
-    private final Object _identifier = new Object();
+    private final UUID _identifier = UUID.randomUUID();
 
 
     public ManagementNodeConsumer(final String consumerName, final ManagementNode managementNode, T target)
@@ -142,14 +147,25 @@ class ManagementNodeConsumer<T extends C
     }
 
     @Override
-    public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
-                                                                                 final String routingAddress,
-                                                                                 final InstanceProperties instanceProperties,
-                                                                                 final ServerTransaction txn,
-                                                                                 final Action<? super MessageInstance> postEnqueueAction)
+    public UUID getId()
     {
-        send((InternalMessage)message);
-        return 1;
+        return _identifier;
+    }
+
+    @Override
+    public MessageDurability getMessageDurability()
+    {
+        return MessageDurability.NEVER;
+    }
+
+    @Override
+    public <M extends ServerMessage<? extends StorableMessageMetaData>> RoutingResult<M> route(final M message,
+                                                                                               final String routingAddress,
+                                                                                               final InstanceProperties instanceProperties)
+    {
+        RoutingResult<M> result = new RoutingResult<>(message);
+        result.addQueue(this);
+        return result;
     }
 
     @Override
@@ -181,10 +197,30 @@ class ManagementNodeConsumer<T extends C
         return _managementNode;
     }
 
-    void send(final InternalMessage response)
+    void send(ManagementResponse responseEntry)
     {
-        final ManagementResponse responseEntry = new ManagementResponse(this, response);
         _queue.add(responseEntry);
         _target.notifyWork();
     }
+
+    @Override
+    public void enqueue(final ServerMessage message,
+                        final Action<? super MessageInstance> action,
+                        final MessageEnqueueRecord record)
+    {
+        final InternalMessage internalMessage = (InternalMessage) message;
+        final ManagementResponse responseEntry = new ManagementResponse(this, internalMessage);
+
+        send(responseEntry);
+        if(action != null)
+        {
+            action.performAction(responseEntry);
+        }
+    }
+
+    @Override
+    public boolean isDeleted()
+    {
+        return isClosed();
+    }
 }

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java Sun Dec 25 22:55:13 2016
@@ -47,6 +47,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageSender;
 import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.NamedAddressSpace;
@@ -55,8 +56,6 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.Action;
 
 public class ProxyMessageSource implements MessageSource, MessageDestination
 {
@@ -94,13 +93,11 @@ public class ProxyMessageSource implemen
     }
 
     @Override
-    public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
-                                                                                 final String routingAddress,
-                                                                                 final InstanceProperties instanceProperties,
-                                                                                 final ServerTransaction txn,
-                                                                                 final Action<? super MessageInstance> postEnqueueAction)
+    public <M extends ServerMessage<? extends StorableMessageMetaData>> RoutingResult<M> route(final M message,
+                                                                                               final String routingAddress,
+                                                                                               final InstanceProperties instanceProperties)
     {
-        return 0;
+        return new RoutingResult<>(message);
     }
 
     @Override

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java Sun Dec 25 22:55:13 2016
@@ -48,6 +48,7 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
@@ -615,7 +616,8 @@ public class VirtualHostMessageStoreTest
 
 
         ServerTransaction trans = new AutoCommitTransaction(_virtualHost.getMessageStore());
-        exchange.send(currentMessage, routingKey, InstanceProperties.EMPTY, trans, null);
+        RoutingResult result = exchange.route(currentMessage, routingKey, InstanceProperties.EMPTY);
+        result.send(trans, null);
 
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org