You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/06/09 13:03:54 UTC

qpid-broker-j git commit: QPID-7635: [Java Broker] move handling of routing address into ReceivingDestination and MessageFormat

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 06dfe565f -> e6fa71f0d


QPID-7635: [Java Broker] move handling of routing address into ReceivingDestination and MessageFormat

This also fixes the QueueMessageDurabilityTest#testSendNonPersistentMessageToAll.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e6fa71f0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e6fa71f0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e6fa71f0

Branch: refs/heads/master
Commit: e6fa71f0d44fc61b63a365268ad5f3df476103a6
Parents: 06dfe56
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri Jun 9 14:03:44 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Jun 9 14:03:44 2017 +0100

----------------------------------------------------------------------
 .../qpid/server/plugin/MessageFormat.java       |   4 +-
 .../protocol/v0_10/MessageFormat_0_10.java      |   6 +-
 .../protocol/v0_8/MessageFormat_0_9_1.java      |   6 +-
 .../v1_0/AnonymousRelayDestination.java         |  42 +--
 .../protocol/v1_0/ExchangeDestination.java      |  70 +----
 .../server/protocol/v1_0/MessageFormat_1_0.java | 277 +++++++++++++++++++
 .../protocol/v1_0/NodeReceivingDestination.java |  49 +---
 .../server/protocol/v1_0/QueueDestination.java  |  14 +-
 .../protocol/v1_0/ReceivingDestination.java     |  11 +-
 .../v1_0/StandardReceivingLinkEndpoint.java     | 149 +---------
 10 files changed, 327 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java b/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java
index 34bb4f2..d09c4b7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java
@@ -32,5 +32,7 @@ public interface MessageFormat<M extends ServerMessage<?>> extends Pluggable
     Class<M> getMessageClass();
     List<QpidByteBuffer> convertToMessageFormat(M message);
     M createMessage(List<QpidByteBuffer> buf, MessageStore store, final Object connectionReference);
-    String getRoutingAddress(M message, String destinationAddress);
+    String getRoutingAddress(M message,
+                             String destinationAddress,
+                             String initialDestinationRoutingAddress);
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
index 022b815..f409a94 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.plugin.MessageFormat;
+import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.store.MessageHandle;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoredMessage;
@@ -35,6 +36,7 @@ import org.apache.qpid.server.protocol.v0_10.transport.Header;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.Struct;
 
+@PluggableService
 public class MessageFormat_0_10 implements MessageFormat<MessageTransferMessage>
 {
 
@@ -137,7 +139,9 @@ public class MessageFormat_0_10 implements MessageFormat<MessageTransferMessage>
 
 
     @Override
-    public String getRoutingAddress(final MessageTransferMessage message, final String destinationAddress)
+    public String getRoutingAddress(final MessageTransferMessage message,
+                                    final String destinationAddress,
+                                    final String initialDestinationRoutingAddress)
     {
         String initialRoutingAddress = message.getInitialRoutingAddress();
         if(initialRoutingAddress != null && destinationAddress != null && initialRoutingAddress.startsWith(destinationAddress+"/"))

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java
index 9223bc0..d542cb4 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
 import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
 import org.apache.qpid.server.plugin.MessageFormat;
@@ -33,6 +34,7 @@ import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
+@PluggableService
 public class MessageFormat_0_9_1 implements MessageFormat<AMQMessage>
 {
 
@@ -244,7 +246,9 @@ public class MessageFormat_0_9_1 implements MessageFormat<AMQMessage>
     }
 
     @Override
-    public String getRoutingAddress(final AMQMessage message, final String destinationAddress)
+    public String getRoutingAddress(final AMQMessage message,
+                                    final String destinationAddress,
+                                    final String initialDestinationRoutingAddress)
     {
         String initialRoutingAddress = message.getInitialRoutingAddress();
         if(initialRoutingAddress != null && destinationAddress != null && initialRoutingAddress.startsWith(destinationAddress+"/"))

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
index af07fec..bd8f214 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -73,12 +74,13 @@ public class AnonymousRelayDestination implements ReceivingDestination
     }
 
     @Override
-    public Outcome send(final ServerMessage<?> message,
-                        final String routingAddress,
-                        final ServerTransaction txn,
-                        final SecurityToken securityToken)
+    public <M extends ServerMessage<?>> Outcome send(final MessageFormat<M> messageFormat,
+                                                     final M message,
+                                                     final ServerTransaction txn,
+                                                     final SecurityToken securityToken)
     {
         final ReceivingDestination destination;
+        final String routingAddress = messageFormat.getRoutingAddress(message, null, null);
         if (!routingAddress.startsWith("/") && routingAddress.contains("/"))
         {
             String[] parts = routingAddress.split("/", 2);
@@ -140,7 +142,7 @@ public class AnonymousRelayDestination implements ReceivingDestination
         }
         else
         {
-            outcome = destination.send(message, routingAddress, txn, securityToken);
+            outcome = destination.send(messageFormat, message, txn, securityToken);
         }
         return outcome;
     }
@@ -153,36 +155,6 @@ public class AnonymousRelayDestination implements ReceivingDestination
     }
 
     @Override
-    public String getRoutingAddress(final Message_1_0 message)
-    {
-        String routingAddress;
-        MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
-        final String to = messageHeader.getTo();
-        if (to != null)
-        {
-            routingAddress = to;
-        }
-        else if (messageHeader.getHeader("routing-key") instanceof String)
-        {
-            routingAddress = (String) messageHeader.getHeader("routing-key");
-        }
-        else if (messageHeader.getHeader("routing_key") instanceof String)
-        {
-            routingAddress = (String) messageHeader.getHeader("routing_key");
-        }
-        else if (messageHeader.getSubject() != null)
-        {
-            routingAddress = messageHeader.getSubject();
-        }
-        else
-        {
-            routingAddress = "";
-        }
-
-        return routingAddress;
-    }
-
-    @Override
     public String getAddress()
     {
         return "";

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index 252a111..e751c0c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -95,11 +96,12 @@ public class ExchangeDestination extends QueueDestination
         return OUTCOMES;
     }
 
-    public Outcome send(final ServerMessage<?> message,
-                        final String routingAddress,
-                        ServerTransaction txn,
-                        final SecurityToken securityToken)
+    public <M extends ServerMessage<?>> Outcome send(final MessageFormat<M> messageFormat,
+                                                     final M message,
+                                                     final ServerTransaction txn,
+                                                     final SecurityToken securityToken)
     {
+        final String routingAddress = messageFormat.getRoutingAddress(message, _exchange.getName(), _initialRoutingAddress);
         _exchange.authorisePublish(securityToken, Collections.singletonMap("routingKey", routingAddress));
 
         final InstanceProperties instanceProperties =
@@ -152,61 +154,6 @@ public class ExchangeDestination extends QueueDestination
         return _exchange;
     }
 
-    @Override
-    public String getRoutingAddress(final Message_1_0 message)
-    {
-        String routingAddress;
-        MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
-        if(_initialRoutingAddress == null)
-        {
-            final String to = messageHeader.getTo();
-            if (to != null
-                && (_exchange.getName() == null || _exchange.getName().trim().equals("")))
-            {
-                routingAddress = to;
-            }
-            else if (to != null
-                     && to.startsWith(_exchange.getName() + "/"))
-            {
-                routingAddress = to.substring(1 + _exchange.getName().length());
-            }
-            else if (to != null && !to.equals(_exchange.getName()))
-            {
-                routingAddress = to;
-            }
-            else if (messageHeader.getHeader("routing-key") instanceof String)
-            {
-                routingAddress = (String) messageHeader.getHeader("routing-key");
-            }
-            else if (messageHeader.getHeader("routing_key") instanceof String)
-            {
-                routingAddress = (String) messageHeader.getHeader("routing_key");
-            }
-            else if (messageHeader.getSubject() != null)
-            {
-                routingAddress = messageHeader.getSubject();
-            }
-            else
-            {
-                routingAddress = "";
-            }
-
-        }
-        else
-        {
-            if (messageHeader.getTo() != null
-                && messageHeader.getTo().startsWith(_exchange.getName() + "/" + _initialRoutingAddress + "/"))
-            {
-                routingAddress = messageHeader.getTo().substring(2+_exchange.getName().length()+_initialRoutingAddress.length());
-            }
-            else
-            {
-                routingAddress = _initialRoutingAddress;
-            }
-        }
-        return routingAddress;
-    }
-
     TerminusDurability getDurability()
     {
         return _durability;
@@ -228,11 +175,6 @@ public class ExchangeDestination extends QueueDestination
         return _exchange;
     }
 
-    public String getInitialRoutingAddress()
-    {
-        return _initialRoutingAddress;
-    }
-
     @Override
     public Symbol[] getCapabilities()
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java
new file mode 100644
index 0000000..38da574
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.plugin.MessageFormat;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationPropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+
+@SuppressWarnings("unused")
+@PluggableService
+public class MessageFormat_1_0 implements MessageFormat<Message_1_0>
+{
+    public static final int AMQP_MESSAGE_FORMAT_1_0 = 0;
+    private static final Logger LOGGER = LoggerFactory.getLogger(MessageFormat_1_0.class);
+    private final SectionDecoder _sectionDecoder = new SectionDecoderImpl(AMQPDescribedTypeRegistry.newInstance()
+                                                                                               .registerTransportLayer()
+                                                                                               .registerMessagingLayer()
+                                                                                               .registerTransactionLayer()
+                                                                                               .registerSecurityLayer()
+                                                                                               .registerExtensionSoleconnLayer()
+                                                                                               .getSectionDecoderRegistry());
+
+    @Override
+    public String getType()
+    {
+        return "AMQP_1_0";
+    }
+
+    @Override
+    public int getSupportedFormat()
+    {
+        return AMQP_MESSAGE_FORMAT_1_0;
+    }
+
+    @Override
+    public Class<Message_1_0> getMessageClass()
+    {
+        return Message_1_0.class;
+    }
+
+    @Override
+    public List<QpidByteBuffer> convertToMessageFormat(final Message_1_0 message)
+    {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    @Override
+    public Message_1_0 createMessage(final List<QpidByteBuffer> buf,
+                                     final MessageStore store,
+                                     final Object connectionReference)
+    {
+        List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
+
+        MessageMetaData_1_0 mmd = createMessageMetaData(buf, dataSections);
+        MessageHandle<MessageMetaData_1_0> handle = store.addMessage(mmd);
+
+        for (EncodingRetainingSection<?> dataSection : dataSections)
+        {
+            for (QpidByteBuffer buffer : dataSection.getEncodedForm())
+            {
+                handle.addContent(buffer);
+                buffer.dispose();
+            }
+            dataSection.dispose();
+        }
+        final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
+        Message_1_0 message = new Message_1_0(storedMessage, connectionReference);
+
+        return message;
+    }
+
+    @Override
+    public String getRoutingAddress(final Message_1_0 message,
+                                    final String destinationAddress,
+                                    final String initialDestinationRoutingAddress)
+    {
+        String routingAddress;
+        MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
+        if (initialDestinationRoutingAddress == null)
+        {
+            final String to = messageHeader.getTo();
+            if (to != null && (destinationAddress == null || destinationAddress.trim().equals("")))
+            {
+                routingAddress = to;
+            }
+            else if (to != null && to.startsWith(destinationAddress + "/"))
+            {
+                routingAddress = to.substring(1 + destinationAddress.length());
+            }
+            else if (to != null && !to.equals(destinationAddress))
+            {
+                routingAddress = to;
+            }
+            else if (messageHeader.getHeader("routing-key") instanceof String)
+            {
+                routingAddress = (String) messageHeader.getHeader("routing-key");
+            }
+            else if (messageHeader.getHeader("routing_key") instanceof String)
+            {
+                routingAddress = (String) messageHeader.getHeader("routing_key");
+            }
+            else if (messageHeader.getSubject() != null)
+            {
+                routingAddress = messageHeader.getSubject();
+            }
+            else
+            {
+                routingAddress = "";
+            }
+        }
+        else
+        {
+            if (messageHeader.getTo() != null
+                && messageHeader.getTo().startsWith(destinationAddress + "/" + initialDestinationRoutingAddress + "/"))
+            {
+                final int prefixLength = 2 + destinationAddress.length() + initialDestinationRoutingAddress.length();
+                routingAddress = messageHeader.getTo().substring(prefixLength);
+            }
+            else
+            {
+                routingAddress = initialDestinationRoutingAddress;
+            }
+        }
+        return routingAddress;
+    }
+
+    private MessageMetaData_1_0 createMessageMetaData(final List<QpidByteBuffer> fragments,
+                                                      final List<EncodingRetainingSection<?>> dataSections)
+    {
+
+        List<EncodingRetainingSection<?>> sections;
+        try
+        {
+            sections = getSectionDecoder().parseAll(fragments);
+        }
+        catch (AmqpErrorException e)
+        {
+            LOGGER.error("Decoding read section error", e);
+            // TODO - fix error handling
+            throw new IllegalArgumentException(e);
+        }
+
+        long contentSize = 0L;
+
+        HeaderSection headerSection = null;
+        PropertiesSection propertiesSection = null;
+        DeliveryAnnotationsSection deliveryAnnotationsSection = null;
+        MessageAnnotationsSection messageAnnotationsSection = null;
+        ApplicationPropertiesSection applicationPropertiesSection = null;
+        FooterSection footerSection = null;
+
+        Iterator<EncodingRetainingSection<?>> iter = sections.iterator();
+        EncodingRetainingSection<?> s = iter.hasNext() ? iter.next() : null;
+        if (s instanceof HeaderSection)
+        {
+            headerSection = (HeaderSection) s;
+            s = iter.hasNext() ? iter.next() : null;
+        }
+
+        if (s instanceof DeliveryAnnotationsSection)
+        {
+            deliveryAnnotationsSection = (DeliveryAnnotationsSection) s;
+            s = iter.hasNext() ? iter.next() : null;
+        }
+
+        if (s instanceof MessageAnnotationsSection)
+        {
+            messageAnnotationsSection = (MessageAnnotationsSection) s;
+            s = iter.hasNext() ? iter.next() : null;
+        }
+
+        if (s instanceof PropertiesSection)
+        {
+            propertiesSection = (PropertiesSection) s;
+            s = iter.hasNext() ? iter.next() : null;
+        }
+
+        if (s instanceof ApplicationPropertiesSection)
+        {
+            applicationPropertiesSection = (ApplicationPropertiesSection) s;
+            s = iter.hasNext() ? iter.next() : null;
+        }
+
+        if (s instanceof AmqpValueSection)
+        {
+            contentSize = s.getEncodedSize();
+            dataSections.add(s);
+            s = iter.hasNext() ? iter.next() : null;
+        }
+        else if (s instanceof DataSection)
+        {
+            do
+            {
+                contentSize += s.getEncodedSize();
+                dataSections.add(s);
+                s = iter.hasNext() ? iter.next() : null;
+            }
+            while (s instanceof DataSection);
+        }
+        else if (s instanceof AmqpSequenceSection)
+        {
+            do
+            {
+                contentSize += s.getEncodedSize();
+                dataSections.add(s);
+                s = iter.hasNext() ? iter.next() : null;
+            }
+            while (s instanceof AmqpSequenceSection);
+        }
+
+        if (s instanceof FooterSection)
+        {
+            footerSection = (FooterSection) s;
+            s = iter.hasNext() ? iter.next() : null;
+        }
+        if (s != null)
+        {
+            throw new ConnectionScopedRuntimeException(String.format("Encountered unexpected section '%s'", s.getClass().getSimpleName()));
+        }
+        return new MessageMetaData_1_0(headerSection,
+                                       deliveryAnnotationsSection,
+                                       messageAnnotationsSection,
+                                       propertiesSection,
+                                       applicationPropertiesSection,
+                                       footerSection,
+                                       System.currentTimeMillis(),
+                                       contentSize);
+    }
+
+    private SectionDecoder getSectionDecoder()
+    {
+        return _sectionDecoder;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index 2595b1c..7fe9f32 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -78,10 +79,12 @@ public class NodeReceivingDestination implements ReceivingDestination
         return OUTCOMES;
     }
 
-    public Outcome send(final ServerMessage<?> message,
-                        final String routingAddress, ServerTransaction txn,
-                        final SecurityToken securityToken)
+    public <M extends ServerMessage<?>> Outcome send(final MessageFormat<M> messageFormat,
+                                                     final M message,
+                                                     final ServerTransaction txn,
+                                                     final SecurityToken securityToken)
     {
+        final String routingAddress = messageFormat.getRoutingAddress(message, _destination.getName(), null);
         _destination.authorisePublish(securityToken, Collections.singletonMap("routingKey", routingAddress));
 
         final InstanceProperties instanceProperties =
@@ -151,46 +154,6 @@ public class NodeReceivingDestination implements ReceivingDestination
         return _destination;
     }
 
-    @Override
-    public String getRoutingAddress(final Message_1_0 message)
-    {
-        String routingAddress;
-        MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
-        final String to = messageHeader.getTo();
-        if (to != null
-            && (_destination.getName() == null || _destination.getName().trim().equals("")))
-        {
-            routingAddress = to;
-        }
-        else if (to != null
-                 && to.startsWith(_destination.getName() + "/"))
-        {
-            routingAddress = to.substring(1 + _destination.getName().length());
-        }
-        else if (to != null && !to.equals(_destination.getName()))
-        {
-            routingAddress = to;
-        }
-        else if (messageHeader.getHeader("routing-key") instanceof String)
-        {
-            routingAddress = (String) messageHeader.getHeader("routing-key");
-        }
-        else if (messageHeader.getHeader("routing_key") instanceof String)
-        {
-            routingAddress = (String) messageHeader.getHeader("routing_key");
-        }
-        else if (messageHeader.getSubject() != null)
-        {
-            routingAddress = messageHeader.getSubject();
-        }
-        else
-        {
-            routingAddress = "";
-        }
-
-        return routingAddress;
-    }
-
     TerminusDurability getDurability()
     {
         return _durability;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
index e133206..0ae928f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.security.SecurityToken;
@@ -49,9 +50,10 @@ public class QueueDestination extends MessageSourceDestination implements Receiv
         return OUTCOMES;
     }
 
-    public Outcome send(final ServerMessage<?> message,
-                        final String routingAddress, ServerTransaction txn,
-                        final SecurityToken securityToken)
+    public <M extends ServerMessage<?>> Outcome send(final MessageFormat<M> messageFormat,
+                                                     final M message,
+                                                     final ServerTransaction txn,
+                                                     final SecurityToken securityToken)
     {
         getQueue().authorisePublish(securityToken, Collections.emptyMap());
 
@@ -94,12 +96,6 @@ public class QueueDestination extends MessageSourceDestination implements Receiv
     }
 
     @Override
-    public String getRoutingAddress(Message_1_0 message)
-    {
-        return "";
-    }
-
-    @Override
     public String getAddress()
     {
         return _address;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
index 6763f10..200c823 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;
 
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -37,15 +38,13 @@ public interface ReceivingDestination extends Destination
 
     Outcome[] getOutcomes();
 
-    Outcome send(ServerMessage<?> message,
-                 final String routingAddress,
-                 ServerTransaction txn,
-                 final SecurityToken securityToken);
+    <M extends ServerMessage<?>> Outcome send(final MessageFormat<M> messageFormat,
+                                              final M message,
+                                              final ServerTransaction txn,
+                                              final SecurityToken securityToken);
 
     int getCredit();
 
-    String getRoutingAddress(Message_1_0 message);
-
     String getAddress();
 
     MessageDestination getMessageDestination();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 199ecdf..4809f0c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -63,8 +63,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.server.store.MessageHandle;
-import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -200,45 +198,18 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         else
         {
             ServerMessage<?> serverMessage;
-            String routingAddress;
 
-            if(messageFormat == null || UnsignedInteger.ZERO.equals(messageFormat))
+            MessageFormat format = MessageFormatRegistry.getFormat(messageFormat == null ? 0 : messageFormat.intValue());
+            if(format != null)
             {
-                List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
-
-                MessageMetaData_1_0 mmd = createMessageMetaData(fragments, dataSections);
-                MessageHandle<MessageMetaData_1_0> handle = getAddressSpace().getMessageStore().addMessage(mmd);
-
-                for (EncodingRetainingSection<?> dataSection : dataSections)
-                {
-                    for (QpidByteBuffer buf : dataSection.getEncodedForm())
-                    {
-                        handle.addContent(buf);
-                        buf.dispose();
-                    }
-                    dataSection.dispose();
-                }
-                final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
-                Message_1_0 message = new Message_1_0(storedMessage, getSession().getConnection().getReference());
-
-                routingAddress = getReceivingDestination().getRoutingAddress(message);
-                serverMessage = message;
+                serverMessage = format.createMessage(fragments, getAddressSpace().getMessageStore(), getSession().getConnection().getReference());
             }
             else
             {
-                MessageFormat format = MessageFormatRegistry.getFormat(messageFormat.intValue());
-                if(format != null)
-                {
-                    serverMessage = format.createMessage(fragments, getAddressSpace().getMessageStore(), getSession().getConnection().getReference());
-                    routingAddress = format.getRoutingAddress(serverMessage, getReceivingDestination().getAddress());
-                }
-                else
-                {
-                    final Error err = new Error();
-                    err.setCondition(AmqpError.NOT_IMPLEMENTED);
-                    err.setDescription("Unknown message format: " + messageFormat);
-                    return err;
-                }
+                final Error err = new Error();
+                err.setCondition(AmqpError.NOT_IMPLEMENTED);
+                err.setDescription("Unknown message format: " + messageFormat);
+                return err;
             }
 
             for(QpidByteBuffer fragment: fragments)
@@ -277,7 +248,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                     session.getAMQPConnection()
                            .checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
 
-                    Outcome outcome = getReceivingDestination().send(serverMessage, routingAddress, transaction,
+                    Outcome outcome = getReceivingDestination().send(format, serverMessage, transaction,
                                                                      session.getSecurityToken());
                     Source source = getSource();
 
@@ -398,110 +369,6 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         }
     }
 
-
-    private MessageMetaData_1_0 createMessageMetaData(final List<QpidByteBuffer> fragments,
-                                                      final List<EncodingRetainingSection<?>> dataSections)
-    {
-
-        List<EncodingRetainingSection<?>> sections;
-        try
-        {
-            sections = getSectionDecoder().parseAll(fragments);
-        }
-        catch (AmqpErrorException e)
-        {
-            LOGGER.error("Decoding read section error", e);
-            // TODO - fix error handling
-            throw new IllegalArgumentException(e);
-        }
-
-        long contentSize = 0L;
-
-        HeaderSection headerSection = null;
-        PropertiesSection propertiesSection = null;
-        DeliveryAnnotationsSection deliveryAnnotationsSection = null;
-        MessageAnnotationsSection messageAnnotationsSection = null;
-        ApplicationPropertiesSection applicationPropertiesSection = null;
-        FooterSection footerSection = null;
-
-        Iterator<EncodingRetainingSection<?>> iter = sections.iterator();
-        EncodingRetainingSection<?> s = iter.hasNext() ? iter.next() : null;
-        if (s instanceof HeaderSection)
-        {
-            headerSection = (HeaderSection) s;
-            s = iter.hasNext() ? iter.next() : null;
-        }
-
-        if (s instanceof DeliveryAnnotationsSection)
-        {
-            deliveryAnnotationsSection = (DeliveryAnnotationsSection) s;
-            s = iter.hasNext() ? iter.next() : null;
-        }
-
-        if (s instanceof MessageAnnotationsSection)
-        {
-            messageAnnotationsSection = (MessageAnnotationsSection) s;
-            s = iter.hasNext() ? iter.next() : null;
-        }
-
-        if (s instanceof PropertiesSection)
-        {
-            propertiesSection = (PropertiesSection) s;
-            s = iter.hasNext() ? iter.next() : null;
-        }
-
-        if (s instanceof ApplicationPropertiesSection)
-        {
-            applicationPropertiesSection = (ApplicationPropertiesSection) s;
-            s = iter.hasNext() ? iter.next() : null;
-        }
-
-        if (s instanceof AmqpValueSection)
-        {
-            contentSize = s.getEncodedSize();
-            dataSections.add(s);
-            s = iter.hasNext() ? iter.next() : null;
-        }
-        else if (s instanceof DataSection)
-        {
-            do
-            {
-                contentSize += s.getEncodedSize();
-                dataSections.add(s);
-                s = iter.hasNext() ? iter.next() : null;
-            }
-            while (s instanceof DataSection);
-        }
-        else if (s instanceof AmqpSequenceSection)
-        {
-            do
-            {
-                contentSize += s.getEncodedSize();
-                dataSections.add(s);
-                s = iter.hasNext() ? iter.next() : null;
-            }
-            while (s instanceof AmqpSequenceSection);
-        }
-
-        if (s instanceof FooterSection)
-        {
-            footerSection = (FooterSection) s;
-            s = iter.hasNext() ? iter.next() : null;
-        }
-        if (s != null)
-        {
-            throw new ConnectionScopedRuntimeException(String.format("Encountered unexpected section '%s'", s.getClass().getSimpleName()));
-        }
-        return new MessageMetaData_1_0(headerSection,
-                                       deliveryAnnotationsSection,
-                                       messageAnnotationsSection,
-                                       propertiesSection,
-                                       applicationPropertiesSection,
-                                       footerSection,
-                                       System.currentTimeMillis(),
-                                       contentSize);
-    }
-
     @Override
     public void attachReceived(final Attach attach) throws AmqpErrorException
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org