You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/06/09 13:03:54 UTC
qpid-broker-j git commit: QPID-7635: [Java Broker] move handling of
routing address into ReceivingDestination and MessageFormat
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 06dfe565f -> e6fa71f0d
QPID-7635: [Java Broker] move handling of routing address into ReceivingDestination and MessageFormat
This also fixes the QueueMessageDurabilityTest#testSendNonPersistentMessageToAll.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e6fa71f0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e6fa71f0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e6fa71f0
Branch: refs/heads/master
Commit: e6fa71f0d44fc61b63a365268ad5f3df476103a6
Parents: 06dfe56
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri Jun 9 14:03:44 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Jun 9 14:03:44 2017 +0100
----------------------------------------------------------------------
.../qpid/server/plugin/MessageFormat.java | 4 +-
.../protocol/v0_10/MessageFormat_0_10.java | 6 +-
.../protocol/v0_8/MessageFormat_0_9_1.java | 6 +-
.../v1_0/AnonymousRelayDestination.java | 42 +--
.../protocol/v1_0/ExchangeDestination.java | 70 +----
.../server/protocol/v1_0/MessageFormat_1_0.java | 277 +++++++++++++++++++
.../protocol/v1_0/NodeReceivingDestination.java | 49 +---
.../server/protocol/v1_0/QueueDestination.java | 14 +-
.../protocol/v1_0/ReceivingDestination.java | 11 +-
.../v1_0/StandardReceivingLinkEndpoint.java | 149 +---------
10 files changed, 327 insertions(+), 301 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java b/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java
index 34bb4f2..d09c4b7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java
@@ -32,5 +32,7 @@ public interface MessageFormat<M extends ServerMessage<?>> extends Pluggable
Class<M> getMessageClass();
List<QpidByteBuffer> convertToMessageFormat(M message);
M createMessage(List<QpidByteBuffer> buf, MessageStore store, final Object connectionReference);
- String getRoutingAddress(M message, String destinationAddress);
+ String getRoutingAddress(M message,
+ String destinationAddress,
+ String initialDestinationRoutingAddress);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
index 022b815..f409a94 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.plugin.MessageFormat;
+import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
@@ -35,6 +36,7 @@ import org.apache.qpid.server.protocol.v0_10.transport.Header;
import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
import org.apache.qpid.server.protocol.v0_10.transport.Struct;
+@PluggableService
public class MessageFormat_0_10 implements MessageFormat<MessageTransferMessage>
{
@@ -137,7 +139,9 @@ public class MessageFormat_0_10 implements MessageFormat<MessageTransferMessage>
@Override
- public String getRoutingAddress(final MessageTransferMessage message, final String destinationAddress)
+ public String getRoutingAddress(final MessageTransferMessage message,
+ final String destinationAddress,
+ final String initialDestinationRoutingAddress)
{
String initialRoutingAddress = message.getInitialRoutingAddress();
if(initialRoutingAddress != null && destinationAddress != null && initialRoutingAddress.startsWith(destinationAddress+"/"))
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java
index 9223bc0..d542cb4 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
import org.apache.qpid.server.plugin.MessageFormat;
@@ -33,6 +34,7 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+@PluggableService
public class MessageFormat_0_9_1 implements MessageFormat<AMQMessage>
{
@@ -244,7 +246,9 @@ public class MessageFormat_0_9_1 implements MessageFormat<AMQMessage>
}
@Override
- public String getRoutingAddress(final AMQMessage message, final String destinationAddress)
+ public String getRoutingAddress(final AMQMessage message,
+ final String destinationAddress,
+ final String initialDestinationRoutingAddress)
{
String initialRoutingAddress = message.getInitialRoutingAddress();
if(initialRoutingAddress != null && destinationAddress != null && initialRoutingAddress.startsWith(destinationAddress+"/"))
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
index af07fec..bd8f214 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.MessageFormat;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -73,12 +74,13 @@ public class AnonymousRelayDestination implements ReceivingDestination
}
@Override
- public Outcome send(final ServerMessage<?> message,
- final String routingAddress,
- final ServerTransaction txn,
- final SecurityToken securityToken)
+ public <M extends ServerMessage<?>> Outcome send(final MessageFormat<M> messageFormat,
+ final M message,
+ final ServerTransaction txn,
+ final SecurityToken securityToken)
{
final ReceivingDestination destination;
+ final String routingAddress = messageFormat.getRoutingAddress(message, null, null);
if (!routingAddress.startsWith("/") && routingAddress.contains("/"))
{
String[] parts = routingAddress.split("/", 2);
@@ -140,7 +142,7 @@ public class AnonymousRelayDestination implements ReceivingDestination
}
else
{
- outcome = destination.send(message, routingAddress, txn, securityToken);
+ outcome = destination.send(messageFormat, message, txn, securityToken);
}
return outcome;
}
@@ -153,36 +155,6 @@ public class AnonymousRelayDestination implements ReceivingDestination
}
@Override
- public String getRoutingAddress(final Message_1_0 message)
- {
- String routingAddress;
- MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
- final String to = messageHeader.getTo();
- if (to != null)
- {
- routingAddress = to;
- }
- else if (messageHeader.getHeader("routing-key") instanceof String)
- {
- routingAddress = (String) messageHeader.getHeader("routing-key");
- }
- else if (messageHeader.getHeader("routing_key") instanceof String)
- {
- routingAddress = (String) messageHeader.getHeader("routing_key");
- }
- else if (messageHeader.getSubject() != null)
- {
- routingAddress = messageHeader.getSubject();
- }
- else
- {
- routingAddress = "";
- }
-
- return routingAddress;
- }
-
- @Override
public String getAddress()
{
return "";
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index 252a111..e751c0c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.MessageFormat;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -95,11 +96,12 @@ public class ExchangeDestination extends QueueDestination
return OUTCOMES;
}
- public Outcome send(final ServerMessage<?> message,
- final String routingAddress,
- ServerTransaction txn,
- final SecurityToken securityToken)
+ public <M extends ServerMessage<?>> Outcome send(final MessageFormat<M> messageFormat,
+ final M message,
+ final ServerTransaction txn,
+ final SecurityToken securityToken)
{
+ final String routingAddress = messageFormat.getRoutingAddress(message, _exchange.getName(), _initialRoutingAddress);
_exchange.authorisePublish(securityToken, Collections.singletonMap("routingKey", routingAddress));
final InstanceProperties instanceProperties =
@@ -152,61 +154,6 @@ public class ExchangeDestination extends QueueDestination
return _exchange;
}
- @Override
- public String getRoutingAddress(final Message_1_0 message)
- {
- String routingAddress;
- MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
- if(_initialRoutingAddress == null)
- {
- final String to = messageHeader.getTo();
- if (to != null
- && (_exchange.getName() == null || _exchange.getName().trim().equals("")))
- {
- routingAddress = to;
- }
- else if (to != null
- && to.startsWith(_exchange.getName() + "/"))
- {
- routingAddress = to.substring(1 + _exchange.getName().length());
- }
- else if (to != null && !to.equals(_exchange.getName()))
- {
- routingAddress = to;
- }
- else if (messageHeader.getHeader("routing-key") instanceof String)
- {
- routingAddress = (String) messageHeader.getHeader("routing-key");
- }
- else if (messageHeader.getHeader("routing_key") instanceof String)
- {
- routingAddress = (String) messageHeader.getHeader("routing_key");
- }
- else if (messageHeader.getSubject() != null)
- {
- routingAddress = messageHeader.getSubject();
- }
- else
- {
- routingAddress = "";
- }
-
- }
- else
- {
- if (messageHeader.getTo() != null
- && messageHeader.getTo().startsWith(_exchange.getName() + "/" + _initialRoutingAddress + "/"))
- {
- routingAddress = messageHeader.getTo().substring(2+_exchange.getName().length()+_initialRoutingAddress.length());
- }
- else
- {
- routingAddress = _initialRoutingAddress;
- }
- }
- return routingAddress;
- }
-
TerminusDurability getDurability()
{
return _durability;
@@ -228,11 +175,6 @@ public class ExchangeDestination extends QueueDestination
return _exchange;
}
- public String getInitialRoutingAddress()
- {
- return _initialRoutingAddress;
- }
-
@Override
public Symbol[] getCapabilities()
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java
new file mode 100644
index 0000000..38da574
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.plugin.MessageFormat;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationPropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+
+@SuppressWarnings("unused")
+@PluggableService
+public class MessageFormat_1_0 implements MessageFormat<Message_1_0>
+{
+ public static final int AMQP_MESSAGE_FORMAT_1_0 = 0;
+ private static final Logger LOGGER = LoggerFactory.getLogger(MessageFormat_1_0.class);
+ private final SectionDecoder _sectionDecoder = new SectionDecoderImpl(AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer()
+ .registerExtensionSoleconnLayer()
+ .getSectionDecoderRegistry());
+
+ @Override
+ public String getType()
+ {
+ return "AMQP_1_0";
+ }
+
+ @Override
+ public int getSupportedFormat()
+ {
+ return AMQP_MESSAGE_FORMAT_1_0;
+ }
+
+ @Override
+ public Class<Message_1_0> getMessageClass()
+ {
+ return Message_1_0.class;
+ }
+
+ @Override
+ public List<QpidByteBuffer> convertToMessageFormat(final Message_1_0 message)
+ {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ @Override
+ public Message_1_0 createMessage(final List<QpidByteBuffer> buf,
+ final MessageStore store,
+ final Object connectionReference)
+ {
+ List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
+
+ MessageMetaData_1_0 mmd = createMessageMetaData(buf, dataSections);
+ MessageHandle<MessageMetaData_1_0> handle = store.addMessage(mmd);
+
+ for (EncodingRetainingSection<?> dataSection : dataSections)
+ {
+ for (QpidByteBuffer buffer : dataSection.getEncodedForm())
+ {
+ handle.addContent(buffer);
+ buffer.dispose();
+ }
+ dataSection.dispose();
+ }
+ final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
+ Message_1_0 message = new Message_1_0(storedMessage, connectionReference);
+
+ return message;
+ }
+
+ @Override
+ public String getRoutingAddress(final Message_1_0 message,
+ final String destinationAddress,
+ final String initialDestinationRoutingAddress)
+ {
+ String routingAddress;
+ MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
+ if (initialDestinationRoutingAddress == null)
+ {
+ final String to = messageHeader.getTo();
+ if (to != null && (destinationAddress == null || destinationAddress.trim().equals("")))
+ {
+ routingAddress = to;
+ }
+ else if (to != null && to.startsWith(destinationAddress + "/"))
+ {
+ routingAddress = to.substring(1 + destinationAddress.length());
+ }
+ else if (to != null && !to.equals(destinationAddress))
+ {
+ routingAddress = to;
+ }
+ else if (messageHeader.getHeader("routing-key") instanceof String)
+ {
+ routingAddress = (String) messageHeader.getHeader("routing-key");
+ }
+ else if (messageHeader.getHeader("routing_key") instanceof String)
+ {
+ routingAddress = (String) messageHeader.getHeader("routing_key");
+ }
+ else if (messageHeader.getSubject() != null)
+ {
+ routingAddress = messageHeader.getSubject();
+ }
+ else
+ {
+ routingAddress = "";
+ }
+ }
+ else
+ {
+ if (messageHeader.getTo() != null
+ && messageHeader.getTo().startsWith(destinationAddress + "/" + initialDestinationRoutingAddress + "/"))
+ {
+ final int prefixLength = 2 + destinationAddress.length() + initialDestinationRoutingAddress.length();
+ routingAddress = messageHeader.getTo().substring(prefixLength);
+ }
+ else
+ {
+ routingAddress = initialDestinationRoutingAddress;
+ }
+ }
+ return routingAddress;
+ }
+
+ private MessageMetaData_1_0 createMessageMetaData(final List<QpidByteBuffer> fragments,
+ final List<EncodingRetainingSection<?>> dataSections)
+ {
+
+ List<EncodingRetainingSection<?>> sections;
+ try
+ {
+ sections = getSectionDecoder().parseAll(fragments);
+ }
+ catch (AmqpErrorException e)
+ {
+ LOGGER.error("Decoding read section error", e);
+ // TODO - fix error handling
+ throw new IllegalArgumentException(e);
+ }
+
+ long contentSize = 0L;
+
+ HeaderSection headerSection = null;
+ PropertiesSection propertiesSection = null;
+ DeliveryAnnotationsSection deliveryAnnotationsSection = null;
+ MessageAnnotationsSection messageAnnotationsSection = null;
+ ApplicationPropertiesSection applicationPropertiesSection = null;
+ FooterSection footerSection = null;
+
+ Iterator<EncodingRetainingSection<?>> iter = sections.iterator();
+ EncodingRetainingSection<?> s = iter.hasNext() ? iter.next() : null;
+ if (s instanceof HeaderSection)
+ {
+ headerSection = (HeaderSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s instanceof DeliveryAnnotationsSection)
+ {
+ deliveryAnnotationsSection = (DeliveryAnnotationsSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s instanceof MessageAnnotationsSection)
+ {
+ messageAnnotationsSection = (MessageAnnotationsSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s instanceof PropertiesSection)
+ {
+ propertiesSection = (PropertiesSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s instanceof ApplicationPropertiesSection)
+ {
+ applicationPropertiesSection = (ApplicationPropertiesSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s instanceof AmqpValueSection)
+ {
+ contentSize = s.getEncodedSize();
+ dataSections.add(s);
+ s = iter.hasNext() ? iter.next() : null;
+ }
+ else if (s instanceof DataSection)
+ {
+ do
+ {
+ contentSize += s.getEncodedSize();
+ dataSections.add(s);
+ s = iter.hasNext() ? iter.next() : null;
+ }
+ while (s instanceof DataSection);
+ }
+ else if (s instanceof AmqpSequenceSection)
+ {
+ do
+ {
+ contentSize += s.getEncodedSize();
+ dataSections.add(s);
+ s = iter.hasNext() ? iter.next() : null;
+ }
+ while (s instanceof AmqpSequenceSection);
+ }
+
+ if (s instanceof FooterSection)
+ {
+ footerSection = (FooterSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+ if (s != null)
+ {
+ throw new ConnectionScopedRuntimeException(String.format("Encountered unexpected section '%s'", s.getClass().getSimpleName()));
+ }
+ return new MessageMetaData_1_0(headerSection,
+ deliveryAnnotationsSection,
+ messageAnnotationsSection,
+ propertiesSection,
+ applicationPropertiesSection,
+ footerSection,
+ System.currentTimeMillis(),
+ contentSize);
+ }
+
+ private SectionDecoder getSectionDecoder()
+ {
+ return _sectionDecoder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index 2595b1c..7fe9f32 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.plugin.MessageFormat;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -78,10 +79,12 @@ public class NodeReceivingDestination implements ReceivingDestination
return OUTCOMES;
}
- public Outcome send(final ServerMessage<?> message,
- final String routingAddress, ServerTransaction txn,
- final SecurityToken securityToken)
+ public <M extends ServerMessage<?>> Outcome send(final MessageFormat<M> messageFormat,
+ final M message,
+ final ServerTransaction txn,
+ final SecurityToken securityToken)
{
+ final String routingAddress = messageFormat.getRoutingAddress(message, _destination.getName(), null);
_destination.authorisePublish(securityToken, Collections.singletonMap("routingKey", routingAddress));
final InstanceProperties instanceProperties =
@@ -151,46 +154,6 @@ public class NodeReceivingDestination implements ReceivingDestination
return _destination;
}
- @Override
- public String getRoutingAddress(final Message_1_0 message)
- {
- String routingAddress;
- MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
- final String to = messageHeader.getTo();
- if (to != null
- && (_destination.getName() == null || _destination.getName().trim().equals("")))
- {
- routingAddress = to;
- }
- else if (to != null
- && to.startsWith(_destination.getName() + "/"))
- {
- routingAddress = to.substring(1 + _destination.getName().length());
- }
- else if (to != null && !to.equals(_destination.getName()))
- {
- routingAddress = to;
- }
- else if (messageHeader.getHeader("routing-key") instanceof String)
- {
- routingAddress = (String) messageHeader.getHeader("routing-key");
- }
- else if (messageHeader.getHeader("routing_key") instanceof String)
- {
- routingAddress = (String) messageHeader.getHeader("routing_key");
- }
- else if (messageHeader.getSubject() != null)
- {
- routingAddress = messageHeader.getSubject();
- }
- else
- {
- routingAddress = "";
- }
-
- return routingAddress;
- }
-
TerminusDurability getDurability()
{
return _durability;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
index e133206..0ae928f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.MessageFormat;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.security.SecurityToken;
@@ -49,9 +50,10 @@ public class QueueDestination extends MessageSourceDestination implements Receiv
return OUTCOMES;
}
- public Outcome send(final ServerMessage<?> message,
- final String routingAddress, ServerTransaction txn,
- final SecurityToken securityToken)
+ public <M extends ServerMessage<?>> Outcome send(final MessageFormat<M> messageFormat,
+ final M message,
+ final ServerTransaction txn,
+ final SecurityToken securityToken)
{
getQueue().authorisePublish(securityToken, Collections.emptyMap());
@@ -94,12 +96,6 @@ public class QueueDestination extends MessageSourceDestination implements Receiv
}
@Override
- public String getRoutingAddress(Message_1_0 message)
- {
- return "";
- }
-
- @Override
public String getAddress()
{
return _address;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
index 6763f10..200c823 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageFormat;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -37,15 +38,13 @@ public interface ReceivingDestination extends Destination
Outcome[] getOutcomes();
- Outcome send(ServerMessage<?> message,
- final String routingAddress,
- ServerTransaction txn,
- final SecurityToken securityToken);
+ <M extends ServerMessage<?>> Outcome send(final MessageFormat<M> messageFormat,
+ final M message,
+ final ServerTransaction txn,
+ final SecurityToken securityToken);
int getCredit();
- String getRoutingAddress(Message_1_0 message);
-
String getAddress();
MessageDestination getMessageDestination();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e6fa71f0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 199ecdf..4809f0c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -63,8 +63,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.server.store.MessageHandle;
-import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -200,45 +198,18 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
else
{
ServerMessage<?> serverMessage;
- String routingAddress;
- if(messageFormat == null || UnsignedInteger.ZERO.equals(messageFormat))
+ MessageFormat format = MessageFormatRegistry.getFormat(messageFormat == null ? 0 : messageFormat.intValue());
+ if(format != null)
{
- List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
-
- MessageMetaData_1_0 mmd = createMessageMetaData(fragments, dataSections);
- MessageHandle<MessageMetaData_1_0> handle = getAddressSpace().getMessageStore().addMessage(mmd);
-
- for (EncodingRetainingSection<?> dataSection : dataSections)
- {
- for (QpidByteBuffer buf : dataSection.getEncodedForm())
- {
- handle.addContent(buf);
- buf.dispose();
- }
- dataSection.dispose();
- }
- final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
- Message_1_0 message = new Message_1_0(storedMessage, getSession().getConnection().getReference());
-
- routingAddress = getReceivingDestination().getRoutingAddress(message);
- serverMessage = message;
+ serverMessage = format.createMessage(fragments, getAddressSpace().getMessageStore(), getSession().getConnection().getReference());
}
else
{
- MessageFormat format = MessageFormatRegistry.getFormat(messageFormat.intValue());
- if(format != null)
- {
- serverMessage = format.createMessage(fragments, getAddressSpace().getMessageStore(), getSession().getConnection().getReference());
- routingAddress = format.getRoutingAddress(serverMessage, getReceivingDestination().getAddress());
- }
- else
- {
- final Error err = new Error();
- err.setCondition(AmqpError.NOT_IMPLEMENTED);
- err.setDescription("Unknown message format: " + messageFormat);
- return err;
- }
+ final Error err = new Error();
+ err.setCondition(AmqpError.NOT_IMPLEMENTED);
+ err.setDescription("Unknown message format: " + messageFormat);
+ return err;
}
for(QpidByteBuffer fragment: fragments)
@@ -277,7 +248,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
session.getAMQPConnection()
.checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
- Outcome outcome = getReceivingDestination().send(serverMessage, routingAddress, transaction,
+ Outcome outcome = getReceivingDestination().send(format, serverMessage, transaction,
session.getSecurityToken());
Source source = getSource();
@@ -398,110 +369,6 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
}
}
-
- private MessageMetaData_1_0 createMessageMetaData(final List<QpidByteBuffer> fragments,
- final List<EncodingRetainingSection<?>> dataSections)
- {
-
- List<EncodingRetainingSection<?>> sections;
- try
- {
- sections = getSectionDecoder().parseAll(fragments);
- }
- catch (AmqpErrorException e)
- {
- LOGGER.error("Decoding read section error", e);
- // TODO - fix error handling
- throw new IllegalArgumentException(e);
- }
-
- long contentSize = 0L;
-
- HeaderSection headerSection = null;
- PropertiesSection propertiesSection = null;
- DeliveryAnnotationsSection deliveryAnnotationsSection = null;
- MessageAnnotationsSection messageAnnotationsSection = null;
- ApplicationPropertiesSection applicationPropertiesSection = null;
- FooterSection footerSection = null;
-
- Iterator<EncodingRetainingSection<?>> iter = sections.iterator();
- EncodingRetainingSection<?> s = iter.hasNext() ? iter.next() : null;
- if (s instanceof HeaderSection)
- {
- headerSection = (HeaderSection) s;
- s = iter.hasNext() ? iter.next() : null;
- }
-
- if (s instanceof DeliveryAnnotationsSection)
- {
- deliveryAnnotationsSection = (DeliveryAnnotationsSection) s;
- s = iter.hasNext() ? iter.next() : null;
- }
-
- if (s instanceof MessageAnnotationsSection)
- {
- messageAnnotationsSection = (MessageAnnotationsSection) s;
- s = iter.hasNext() ? iter.next() : null;
- }
-
- if (s instanceof PropertiesSection)
- {
- propertiesSection = (PropertiesSection) s;
- s = iter.hasNext() ? iter.next() : null;
- }
-
- if (s instanceof ApplicationPropertiesSection)
- {
- applicationPropertiesSection = (ApplicationPropertiesSection) s;
- s = iter.hasNext() ? iter.next() : null;
- }
-
- if (s instanceof AmqpValueSection)
- {
- contentSize = s.getEncodedSize();
- dataSections.add(s);
- s = iter.hasNext() ? iter.next() : null;
- }
- else if (s instanceof DataSection)
- {
- do
- {
- contentSize += s.getEncodedSize();
- dataSections.add(s);
- s = iter.hasNext() ? iter.next() : null;
- }
- while (s instanceof DataSection);
- }
- else if (s instanceof AmqpSequenceSection)
- {
- do
- {
- contentSize += s.getEncodedSize();
- dataSections.add(s);
- s = iter.hasNext() ? iter.next() : null;
- }
- while (s instanceof AmqpSequenceSection);
- }
-
- if (s instanceof FooterSection)
- {
- footerSection = (FooterSection) s;
- s = iter.hasNext() ? iter.next() : null;
- }
- if (s != null)
- {
- throw new ConnectionScopedRuntimeException(String.format("Encountered unexpected section '%s'", s.getClass().getSimpleName()));
- }
- return new MessageMetaData_1_0(headerSection,
- deliveryAnnotationsSection,
- messageAnnotationsSection,
- propertiesSection,
- applicationPropertiesSection,
- footerSection,
- System.currentTimeMillis(),
- contentSize);
- }
-
@Override
public void attachReceived(final Attach attach) throws AmqpErrorException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org