You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/12 14:34:54 UTC
svn commit: r1773795 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/plugin/
broker-core/src/main/java/org/apache/qpid/server/protocol/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
broke...
Author: rgodfrey
Date: Mon Dec 12 14:34:54 2016
New Revision: 1773795
URL: http://svn.apache.org/viewvc?rev=1773795&view=rev
Log:
QPID-7587 : Add support for enveloping messages from 0-x protocols using the 1.0 transport
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java
- copied, changed from r1773722, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageFormatRegistry.java
- copied, changed from r1773722, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageConverterRegistry.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java (with props)
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java (with props)
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/MessagePublishInfo.java
Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java (from r1773722, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java&r1=1773722&r2=1773795&rev=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java Mon Dec 12 14:34:54 2016
@@ -20,14 +20,17 @@
*/
package org.apache.qpid.server.plugin;
+import java.util.List;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.store.MessageStore;
-public interface MessageConverter<M extends ServerMessage, N extends ServerMessage> extends Pluggable
+public interface MessageFormat<M extends ServerMessage<?>> extends Pluggable
{
- Class<M> getInputClass();
- Class<N> getOutputClass();
-
- N convert(M message, NamedAddressSpace addressSpace);
- void dispose(N message);
+ int getSupportedFormat();
+ Class<M> getMessageClass();
+ List<QpidByteBuffer> convertToMessageFormat(M message);
+ M createMessage(List<QpidByteBuffer> buf, MessageStore store, final Object connectionReference);
+ String getRoutingAddress(M message, String destinationAddress);
}
Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageFormatRegistry.java (from r1773722, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageConverterRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageFormatRegistry.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageFormatRegistry.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageConverterRegistry.java&r1=1773722&r2=1773795&rev=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageConverterRegistry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageFormatRegistry.java Mon Dec 12 14:34:54 2016
@@ -22,38 +22,40 @@
package org.apache.qpid.server.protocol;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.plugin.MessageFormat;
import org.apache.qpid.server.plugin.QpidServiceLoader;
-public class MessageConverterRegistry
+public class MessageFormatRegistry
{
- private static Map<Class<? extends ServerMessage>, Map<Class<? extends ServerMessage>, MessageConverter>> _converters =
- new HashMap<Class<? extends ServerMessage>, Map<Class<? extends ServerMessage>, MessageConverter>>();
+ private static Map<Integer, MessageFormat<? extends ServerMessage<?>>> INPUT_CONVERTERS =
+ new HashMap<>();
+
+ private static Map<Class<? extends ServerMessage<?>>, MessageFormat<? extends ServerMessage<?>>> OUTPUT_CONVERTERS =
+ new HashMap<>();
+
static
{
-
- for(MessageConverter<? extends ServerMessage, ? extends ServerMessage> converter : (new QpidServiceLoader()).instancesOf(MessageConverter.class))
+ for(MessageFormat<? extends ServerMessage<?>> converter : (new QpidServiceLoader()).instancesOf(MessageFormat.class))
{
- Map<Class<? extends ServerMessage>, MessageConverter> map = _converters.get(converter.getInputClass());
- if(map == null)
- {
- map = new HashMap<Class<? extends ServerMessage>, MessageConverter>();
- _converters.put(converter.getInputClass(), map);
- }
- map.put(converter.getOutputClass(),converter);
+ INPUT_CONVERTERS.put(converter.getSupportedFormat(), converter);
+ OUTPUT_CONVERTERS.put(converter.getMessageClass(), converter);
}
}
- public static <M extends ServerMessage,N extends ServerMessage> MessageConverter<M, N> getConverter(Class<M> from, Class<N> to)
+ public static MessageFormat<? extends ServerMessage<?>> getFormat(int format)
{
- Map<Class<? extends ServerMessage>, MessageConverter> map = _converters.get(from);
- if(map == null)
- {
- map = _converters.get(ServerMessage.class);
- }
- return map == null ? null : map.get(to);
+ return INPUT_CONVERTERS.get(format);
}
+
+ public static MessageFormat<? extends ServerMessage<?>> getFormat(ServerMessage<?> message)
+ {
+ return OUTPUT_CONVERTERS.get(message.getClass());
+ }
+
}
Added: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java?rev=1773795&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java Mon Dec 12 14:34:54 2016
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import java.nio.BufferUnderflowException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MessagePublishInfo;
+import org.apache.qpid.server.plugin.MessageFormat;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.Struct;
+
+public class MessageFormat_0_10 implements MessageFormat<MessageTransferMessage>
+{
+
+ public static final int AMQP_MESSAGE_FORMAT_0_10 = 100;
+
+ @Override
+ public String getType()
+ {
+ return "AMQP_0_10";
+ }
+
+ @Override
+ public int getSupportedFormat()
+ {
+ return AMQP_MESSAGE_FORMAT_0_10;
+ }
+
+ @Override
+ public Class<MessageTransferMessage> getMessageClass()
+ {
+ return MessageTransferMessage.class;
+ }
+
+ // format: <int header count> <headers> <body>
+
+ @Override
+ public List<QpidByteBuffer> convertToMessageFormat(final MessageTransferMessage message)
+ {
+ ServerEncoder encoder = new ServerEncoder(4096, true);
+ Struct[] structs = message.getHeader().getStructs();
+ encoder.writeInt32(structs.length);
+ for(Struct struct : structs)
+ {
+ encoder.writeStruct32(struct);
+ }
+ final QpidByteBuffer headerBuf = encoder.getBuffer();
+ List<QpidByteBuffer> bufs = new ArrayList<>();
+ bufs.add(headerBuf);
+ bufs.addAll(message.getContent(0, (int) message.getSize()));
+
+ return bufs;
+ }
+
+ @Override
+ public MessageTransferMessage createMessage(final List<QpidByteBuffer> buf,
+ final MessageStore store,
+ final Object connectionReference)
+ {
+ try
+ {
+ ServerDecoder serverDecoder = new ServerDecoder(buf);
+ int headerCount = serverDecoder.readInt32();
+ DeliveryProperties deliveryProperties = null;
+ MessageProperties messageProperties = null;
+ List<Struct> nonStandard = null;
+ for(int i = 0; i<headerCount; i++)
+ {
+
+ final Struct struct = serverDecoder.readStruct32();
+ switch(struct.getStructType())
+ {
+ case DeliveryProperties.TYPE:
+ deliveryProperties = (DeliveryProperties)struct;
+ break;
+ case MessageProperties.TYPE:
+ messageProperties = (MessageProperties)struct;
+ break;
+ default:
+ if(nonStandard == null)
+ {
+ nonStandard = new ArrayList<>();
+ }
+ nonStandard.add(struct);
+ }
+ }
+ Header header = new Header(deliveryProperties, messageProperties, nonStandard);
+ int bodySize = 0;
+ for(QpidByteBuffer content : buf)
+ {
+ bodySize += content.remaining();
+ }
+ MessageMetaData_0_10 metaData = new MessageMetaData_0_10(header, bodySize, System.currentTimeMillis());
+ final MessageHandle<MessageMetaData_0_10> handle = store.addMessage(metaData);
+ for (QpidByteBuffer content : buf)
+ {
+ if (content.hasRemaining())
+ {
+ handle.addContent(content);
+ }
+ }
+ final StoredMessage<MessageMetaData_0_10> storedMessage = handle.allContentAdded();
+ return new MessageTransferMessage(storedMessage, connectionReference);
+
+ }
+ catch (BufferUnderflowException e )
+ {
+ throw new ConnectionScopedRuntimeException("Error parsing AMQP 0-10 message format", e);
+ }
+ }
+
+
+ @Override
+ public String getRoutingAddress(final MessageTransferMessage message, final String destinationAddress)
+ {
+ String initialRoutingAddress = message.getInitialRoutingAddress();
+ if(initialRoutingAddress != null && destinationAddress != null && initialRoutingAddress.startsWith(destinationAddress+"/"))
+ {
+ initialRoutingAddress = initialRoutingAddress.substring(destinationAddress.length() + 1);
+ }
+ return initialRoutingAddress;
+ }
+}
Propchange: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Mon Dec 12 14:34:54 2016
@@ -1007,6 +1007,37 @@ public class ServerSessionDelegate exten
{
destination = addressSpace.getDefaultDestination();
}
+ else
+ {
+ Header header = xfr.getHeader();
+ DeliveryProperties delvProps;
+ if(header == null)
+ {
+ delvProps = new DeliveryProperties();
+ header = new Header(delvProps, null, null);
+ xfr.setHeader(header);
+ }
+ else if(header.getDeliveryProperties() == null)
+ {
+ delvProps = new DeliveryProperties();
+ header = new Header(delvProps, header.getMessageProperties(), header.getNonStandardProperties());
+ xfr.setHeader(header);
+ }
+ else
+ {
+ delvProps = header.getDeliveryProperties();
+ }
+ if(delvProps.getExchange() == null && !xfr.getDestination().equals(delvProps.getRoutingKey()))
+ {
+ delvProps.setExchange(xfr.getDestination());
+ }
+ }
+ }
+ else if(xfr.getHeader() != null
+ && xfr.getHeader().getDeliveryProperties() != null
+ && xfr.getHeader().getDeliveryProperties().getExchange() != null)
+ {
+ destination = addressSpace.getAttainedMessageDestination(xfr.getHeader().getDeliveryProperties().getExchange());
}
else
{
Added: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java?rev=1773795&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java Mon Dec 12 14:34:54 2016
@@ -0,0 +1,259 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import java.nio.BufferUnderflowException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.EncodingUtils;
+import org.apache.qpid.framing.MessagePublishInfo;
+import org.apache.qpid.server.plugin.MessageFormat;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+
+public class MessageFormat_0_9_1 implements MessageFormat<AMQMessage>
+{
+
+ public static final int AMQP_MESSAGE_FORMAT_0_9_1 = 91;
+ private static final byte MANDATORY_MASK = (byte)1;
+ private static final byte IMMEDIATE_MASK = (byte)2;
+
+
+ @Override
+ public String getType()
+ {
+ return "AMQP_0_9_1";
+ }
+
+ @Override
+ public int getSupportedFormat()
+ {
+ return AMQP_MESSAGE_FORMAT_0_9_1;
+ }
+
+ @Override
+ public Class<AMQMessage> getMessageClass()
+ {
+ return AMQMessage.class;
+ }
+
+ @Override
+ public List<QpidByteBuffer> convertToMessageFormat(final AMQMessage message)
+ {
+ final MessagePublishInfo messagePublishInfo = message.getMessagePublishInfo();
+ final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
+ AMQShortString exchange = messagePublishInfo.getExchange();
+ AMQShortString routingKey = messagePublishInfo.getRoutingKey();
+ int length = contentHeaderBody.getSize() + (exchange == null ? 0 : exchange.length()) + (routingKey == null ? 0 : routingKey.length()) + 3;
+ QpidByteBuffer headerBuf = QpidByteBuffer.allocateDirect(length);
+ EncodingUtils.writeShortStringBytes(headerBuf, exchange);
+ EncodingUtils.writeShortStringBytes(headerBuf, routingKey);
+ byte flags = messagePublishInfo.isMandatory() ? (byte)0 : MANDATORY_MASK;
+ if(messagePublishInfo.isImmediate())
+ {
+ flags |= IMMEDIATE_MASK;
+ }
+ headerBuf.put(flags);
+ headerBuf.flip();
+
+ contentHeaderBody.writePayload(headerBuf);
+ List<QpidByteBuffer> bufs = new ArrayList<>();
+ headerBuf.flip();
+ bufs.add(headerBuf);
+ bufs.addAll(message.getContent(0, (int) contentHeaderBody.getBodySize()));
+
+ return bufs;
+ }
+
+ @Override
+ public AMQMessage createMessage(final List<QpidByteBuffer> buf,
+ final MessageStore store,
+ final Object connectionReference)
+ {
+ try
+ {
+ AMQShortString exchange = readShortString(buf);
+ AMQShortString routingKey = readShortString(buf);
+ byte flags = readByte(buf);
+ final MessagePublishInfo publishBody = new MessagePublishInfo(exchange,
+ (flags & IMMEDIATE_MASK) != 0,
+ (flags & MANDATORY_MASK) != 0,
+ routingKey);
+ final ContentHeaderBody contentHeaderBody = readContentBody(buf);
+ MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody);
+
+ final MessageHandle<MessageMetaData> handle = store.addMessage(mmd);
+ for (QpidByteBuffer content : buf)
+ {
+ if (content.hasRemaining())
+ {
+ handle.addContent(content);
+ }
+ }
+ final StoredMessage<MessageMetaData> storedMessage = handle.allContentAdded();
+
+ return new AMQMessage(storedMessage, connectionReference);
+ }
+ catch (AMQFrameDecodingException | BufferUnderflowException e )
+ {
+ throw new ConnectionScopedRuntimeException("Error parsing AMQP 0-9-1 message format", e);
+ }
+ }
+
+ private ContentHeaderBody readContentBody(final List<QpidByteBuffer> buf) throws AMQFrameDecodingException
+ {
+ long size = ((long) readInt(buf)) & 0xffffffffL;
+ final QpidByteBuffer buffer = readByteBuffer(buf, size);
+ final ContentHeaderBody contentHeaderBody = new ContentHeaderBody(buffer, size);
+ buffer.dispose();
+ return contentHeaderBody;
+ }
+
+ private QpidByteBuffer readByteBuffer(final List<QpidByteBuffer> data, final long size)
+ {
+ QpidByteBuffer result = null;
+ for(QpidByteBuffer buf : data)
+ {
+ if(result == null && buf.remaining()>= size)
+ {
+ return buf.view(0, (int)size);
+ }
+ else if(buf.hasRemaining())
+ {
+ if(result == null)
+ {
+ result = QpidByteBuffer.allocateDirect((int)size);
+ }
+ if(buf.remaining()>result.remaining())
+ {
+ QpidByteBuffer dup = buf.view(0, result.remaining());
+ result.put(dup);
+ dup.dispose();
+ }
+ else
+ {
+ result.put(buf);
+ }
+ if(!result.hasRemaining())
+ {
+ result.flip();
+ return result;
+ }
+ }
+ }
+ throw new BufferUnderflowException();
+ }
+
+ private int readInt(final List<QpidByteBuffer> data)
+ {
+ int required = 4;
+ int result = 0;
+ for(QpidByteBuffer buf : data)
+ {
+ if(required == 4 && buf.remaining() >= 4)
+ {
+ return buf.getInt();
+ }
+ else
+ {
+ while(buf.remaining() > 0)
+ {
+ result <<= 8;
+ result |= ((int)buf.get()) & 0xff;
+ if(--required == 0)
+ {
+ return result;
+ }
+ }
+ }
+ }
+ throw new BufferUnderflowException();
+ }
+
+ private byte readByte(final List<QpidByteBuffer> data)
+ {
+ for(QpidByteBuffer buf : data)
+ {
+ if(buf.hasRemaining())
+ {
+ return buf.get();
+ }
+ }
+ throw new BufferUnderflowException();
+ }
+
+ private AMQShortString readShortString(final List<QpidByteBuffer> data)
+ {
+ for(QpidByteBuffer buf : data)
+ {
+ if(buf.hasRemaining())
+ {
+ int length = ((int)buf.get(buf.position())) & 0xff;
+ if(buf.remaining()>length)
+ {
+ return AMQShortString.readAMQShortString(buf);
+ }
+ }
+ }
+ int length = ((int)readByte(data)) & 0xff;
+ byte[] octets = new byte[length];
+ readByteArray(octets, data);
+ return new AMQShortString(octets);
+ }
+
+ private void readByteArray(final byte[] octets, final List<QpidByteBuffer> data)
+ {
+ int offset = 0;
+ for(QpidByteBuffer buf : data)
+ {
+ final int remaining = buf.remaining();
+ if(remaining >= octets.length-offset)
+ {
+ buf.get(octets, offset, octets.length-offset);
+ return;
+ }
+ else if(remaining > 0)
+ {
+ buf.get(octets, offset, remaining);
+ offset+=remaining;
+ }
+ }
+ throw new BufferUnderflowException();
+ }
+
+ @Override
+ public String getRoutingAddress(final AMQMessage message, final String destinationAddress)
+ {
+ String initialRoutingAddress = message.getInitialRoutingAddress();
+ if(initialRoutingAddress != null && destinationAddress != null && initialRoutingAddress.startsWith(destinationAddress+"/"))
+ {
+ initialRoutingAddress = initialRoutingAddress.substring(destinationAddress.length() + 1);
+ }
+ return initialRoutingAddress;
+ }
+}
Propchange: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Mon Dec 12 14:34:54 2016
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -76,7 +77,8 @@ public class ExchangeDestination impleme
return OUTCOMES;
}
- public Outcome send(final Message_1_0 message,
+ public Outcome send(final ServerMessage<?> message,
+ final String routingAddress,
ServerTransaction txn,
final Action<MessageInstance> action)
{
@@ -103,7 +105,6 @@ public class ExchangeDestination impleme
return null;
}};
- final String routingAddress = getRoutingAddress(message);
int enqueues = _exchange.send(message,
routingAddress,
instanceProperties,
@@ -133,10 +134,11 @@ public class ExchangeDestination impleme
}
@Override
- public void authorizePublish(final SecurityToken securityToken, final Message_1_0 message)
+ public void authorizePublish(final SecurityToken securityToken,
+ final String routingAddress)
{
_exchange.authorisePublish(securityToken,
- Collections.<String,Object>singletonMap("routingKey", getRoutingAddress(message)));
+ Collections.<String,Object>singletonMap("routingKey", routingAddress));
}
@@ -148,27 +150,38 @@ public class ExchangeDestination impleme
MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
if(_initialRoutingAddress == null)
{
- routingAddress = messageHeader.getSubject();
- if(routingAddress == null)
+ final String to = messageHeader.getTo();
+ if (to != null
+ && (_exchange.getName() == null || _exchange.getName().trim().equals("")))
{
- if (messageHeader.getHeader("routing-key") instanceof String)
- {
- routingAddress = (String) messageHeader.getHeader("routing-key");
- }
- else if (messageHeader.getHeader("routing_key") instanceof String)
- {
- routingAddress = (String) messageHeader.getHeader("routing_key");
- }
- else if (messageHeader.getTo() != null
- && messageHeader.getTo().startsWith(_exchange.getName() + "/"))
- {
- routingAddress = messageHeader.getTo().substring(1+_exchange.getName().length());
- }
- else
- {
- routingAddress = "";
- }
+ routingAddress = to;
}
+ else if (to != null
+ && to.startsWith(_exchange.getName() + "/"))
+ {
+ routingAddress = to.substring(1 + _exchange.getName().length());
+ }
+ else if (to != null && !to.equals(_exchange.getName()))
+ {
+ routingAddress = to;
+ }
+ else if (messageHeader.getHeader("routing-key") instanceof String)
+ {
+ routingAddress = (String) messageHeader.getHeader("routing-key");
+ }
+ else if (messageHeader.getHeader("routing_key") instanceof String)
+ {
+ routingAddress = (String) messageHeader.getHeader("routing_key");
+ }
+ else if (messageHeader.getSubject() != null)
+ {
+ routingAddress = messageHeader.getSubject();
+ }
+ else
+ {
+ routingAddress = "";
+ }
+
}
else
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Mon Dec 12 14:34:54 2016
@@ -28,6 +28,7 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -76,8 +77,8 @@ public class NodeReceivingDestination im
return OUTCOMES;
}
- public Outcome send(final Message_1_0 message,
- ServerTransaction txn,
+ public Outcome send(final ServerMessage<?> message,
+ final String routingAddress, ServerTransaction txn,
final Action<MessageInstance> action)
{
final InstanceProperties instanceProperties =
@@ -103,9 +104,6 @@ public class NodeReceivingDestination im
return null;
}};
- String routingAddress;
- routingAddress = getRoutingAddress(message);
-
int enqueues = _destination.send(message, routingAddress, instanceProperties, txn, action);
if(enqueues == 0)
@@ -113,12 +111,11 @@ public class NodeReceivingDestination im
_eventLogger.message(ExchangeMessages.DISCARDMSG(_destination.getName(), routingAddress));
}
- return enqueues == 0 && !_discardUnroutable ? createdRejectedOutcome(message) : ACCEPTED;
+ return enqueues == 0 && !_discardUnroutable ? createdRejectedOutcome(routingAddress) : ACCEPTED;
}
- private Outcome createdRejectedOutcome(final Message_1_0 message)
+ private Outcome createdRejectedOutcome(String routingAddress)
{
- String routingAddress = getRoutingAddress(message);
Rejected rejected = new Rejected();
final Error notFoundError = new Error(AmqpError.NOT_FOUND, "Unknown destination '" + routingAddress + '"');
rejected.setError(notFoundError);
@@ -132,10 +129,11 @@ public class NodeReceivingDestination im
}
@Override
- public void authorizePublish(final SecurityToken securityToken, final Message_1_0 message)
+ public void authorizePublish(final SecurityToken securityToken,
+ final String routingAddress)
{
_destination.authorisePublish(securityToken,
- Collections.<String, Object>singletonMap("routingKey", getRoutingAddress(message)));
+ Collections.<String, Object>singletonMap("routingKey", routingAddress));
}
@@ -143,32 +141,39 @@ public class NodeReceivingDestination im
public String getRoutingAddress(final Message_1_0 message)
{
MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
- String routingAddress = messageHeader.getSubject();
- if(routingAddress == null)
+ String routingAddress;
+ final String to = messageHeader.getTo();
+ if (to != null
+ && (_destination.getName() == null || _destination.getName().trim().equals("")))
{
- if (messageHeader.getHeader("routing-key") instanceof String)
- {
- routingAddress = (String) messageHeader.getHeader("routing-key");
- }
- else if (messageHeader.getHeader("routing_key") instanceof String)
- {
- routingAddress = (String) messageHeader.getHeader("routing_key");
- }
- else if (messageHeader.getTo() != null
- && messageHeader.getTo().startsWith(_destination.getName() + "/"))
- {
- routingAddress = messageHeader.getTo().substring(1+_destination.getName().length());
- }
- else if (messageHeader.getTo() != null
- && (_destination.getName() == null || _destination.getName().trim().equals("")))
- {
- routingAddress = messageHeader.getTo();
- }
- else
- {
- routingAddress = "";
- }
+ routingAddress = to;
}
+ else if (to != null
+ && to.startsWith(_destination.getName() + "/"))
+ {
+ routingAddress = to.substring(1 + _destination.getName().length());
+ }
+ else if (to != null && !to.equals(_destination.getName()))
+ {
+ routingAddress = to;
+ }
+ else if (messageHeader.getHeader("routing-key") instanceof String)
+ {
+ routingAddress = (String) messageHeader.getHeader("routing-key");
+ }
+ else if (messageHeader.getHeader("routing_key") instanceof String)
+ {
+ routingAddress = (String) messageHeader.getHeader("routing_key");
+ }
+ else if (messageHeader.getSubject() != null)
+ {
+ routingAddress = messageHeader.getSubject();
+ }
+ else
+ {
+ routingAddress = "";
+ }
+
return routingAddress;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Mon Dec 12 14:34:54 2016
@@ -27,6 +27,7 @@ import java.util.Set;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -56,8 +57,8 @@ public class QueueDestination extends Me
return OUTCOMES;
}
- public Outcome send(final Message_1_0 message,
- ServerTransaction txn,
+ public Outcome send(final ServerMessage<?> message,
+ final String routingAddress, ServerTransaction txn,
final Action<MessageInstance> action)
{
@@ -106,11 +107,12 @@ public class QueueDestination extends Me
}
@Override
- public void authorizePublish(final SecurityToken securityToken, final Message_1_0 message)
+ public void authorizePublish(final SecurityToken securityToken,
+ final String routingAddress)
{
_queue.authorisePublish(securityToken,
- Collections.<String,Object>singletonMap("routingKey", getRoutingAddress(message)));
+ Collections.<String,Object>emptyMap());
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java Mon Dec 12 14:34:54 2016
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v1_0;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -38,7 +39,10 @@ public interface ReceivingDestination ex
Outcome[] getOutcomes();
- Outcome send(Message_1_0 message, ServerTransaction txn, final Action<MessageInstance> postEnqueueAction);
+ Outcome send(ServerMessage<?> message,
+ final String routingAddress,
+ ServerTransaction txn,
+ final Action<MessageInstance> postEnqueueAction);
int getCredit();
@@ -46,5 +50,5 @@ public interface ReceivingDestination ex
String getAddress();
- void authorizePublish(SecurityToken securityToken, final Message_1_0 message);
+ void authorizePublish(SecurityToken securityToken, final String routingAddress);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java Mon Dec 12 14:34:54 2016
@@ -34,7 +34,10 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.plugin.MessageFormat;
+import org.apache.qpid.server.protocol.MessageFormatRegistry;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -106,10 +109,10 @@ public class StandardReceivingLink_1_0 i
org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = xfr.getState();
final Binary deliveryTag = xfr.getDeliveryTag();
-
+ UnsignedInteger messageFormat = null;
if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
{
- _incompleteMessage = new ArrayList<Transfer>();
+ _incompleteMessage = new ArrayList<>();
_incompleteMessage.add(xfr);
_resumedMessage = Boolean.TRUE.equals(xfr.getResume());
_messageDeliveryTag = deliveryTag;
@@ -128,6 +131,10 @@ public class StandardReceivingLink_1_0 i
for(Transfer t : _incompleteMessage)
{
+ if(t.getMessageFormat() != null && messageFormat == null)
+ {
+ messageFormat = t.getMessageFormat();
+ }
fragments.addAll(t.getPayload());
t.dispose();
}
@@ -139,6 +146,8 @@ public class StandardReceivingLink_1_0 i
_resumedMessage = Boolean.TRUE.equals(xfr.getResume());
_messageDeliveryTag = deliveryTag;
fragments = xfr.getPayload();
+ messageFormat = xfr.getMessageFormat();
+
xfr.dispose();
}
@@ -161,21 +170,45 @@ public class StandardReceivingLink_1_0 i
}
else
{
- List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
- MessageMetaData_1_0 mmd = createMessageMetaData(fragments, dataSections);
- MessageHandle<MessageMetaData_1_0> handle = _addressSpace.getMessageStore().addMessage(mmd);
+ ServerMessage<?> serverMessage;
+ String routingAddress;
- for(EncodingRetainingSection<?> dataSection : dataSections)
+ if(messageFormat == null || UnsignedInteger.ZERO.equals(messageFormat))
{
- for (QpidByteBuffer buf : dataSection.getEncodedForm())
+ List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
+
+ MessageMetaData_1_0 mmd = createMessageMetaData(fragments, dataSections);
+ MessageHandle<MessageMetaData_1_0> handle = _addressSpace.getMessageStore().addMessage(mmd);
+
+ for (EncodingRetainingSection<?> dataSection : dataSections)
{
- handle.addContent(buf);
- buf.dispose();
+ for (QpidByteBuffer buf : dataSection.getEncodedForm())
+ {
+ handle.addContent(buf);
+ buf.dispose();
+ }
+ }
+ final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
+ Message_1_0 message = new Message_1_0(storedMessage, getSession().getConnection().getReference());
+ routingAddress = _destination.getRoutingAddress(message);
+ serverMessage = message;
+ }
+ else
+ {
+ MessageFormat format = MessageFormatRegistry.getFormat(messageFormat.intValue());
+ if(format != null)
+ {
+ serverMessage = format.createMessage(fragments, _addressSpace.getMessageStore(), getSession().getConnection().getReference());
+ routingAddress = format.getRoutingAddress(serverMessage, _destination.getAddress());
+ }
+ else
+ {
+ final Error err = new Error();
+ err.setCondition(AmqpError.NOT_IMPLEMENTED);
+ err.setDescription("Unknown message format: " + messageFormat);
+ return err;
}
}
- final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
- Message_1_0 message = new Message_1_0(storedMessage, getSession().getConnection().getReference());
-
for(QpidByteBuffer fragment: fragments)
{
@@ -183,7 +216,7 @@ public class StandardReceivingLink_1_0 i
}
fragments = null;
- MessageReference<Message_1_0> reference = message.newReference();
+ MessageReference<?> reference = serverMessage.newReference();
try
{
Binary transactionId = null;
@@ -213,10 +246,10 @@ public class StandardReceivingLink_1_0 i
Session_1_0 session = getSession();
session.getAMQPConnection()
- .checkAuthorizedMessagePrincipal(message.getMessageHeader().getUserId());
- _destination.authorizePublish(session.getSecurityToken(), message);
+ .checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
+ _destination.authorizePublish(session.getSecurityToken(), routingAddress);
- Outcome outcome = _destination.send(message, transaction, session.getCapacityCheckAction());
+ Outcome outcome = _destination.send(serverMessage, routingAddress, transaction, session.getCapacityCheckAction());
Source source = (Source) getEndpoint().getSource();
DeliveryState resultantState;
@@ -261,7 +294,7 @@ public class StandardReceivingLink_1_0 i
getEndpoint().updateDisposition(deliveryTag, resultantState, settled);
getSession().getAMQPConnection()
- .registerMessageReceived(message.getSize(), message.getArrivalTime());
+ .registerMessageReceived(serverMessage.getSize(), serverMessage.getArrivalTime());
if (!(transaction instanceof AutoCommitTransaction))
{
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java Mon Dec 12 14:34:54 2016
@@ -40,16 +40,13 @@ public class ContentHeaderPropertiesFact
throws AMQFrameDecodingException
{
BasicContentHeaderProperties properties;
- // AMQP version change: "Hardwired" version to major=8, minor=0
- // TODO: Change so that the actual version is obtained from
- // the ProtocolInitiation object for this session.
if (classId == BasicConsumeBody.CLASS_ID)
{
properties = new BasicContentHeaderProperties();
}
else
{
- throw new AMQFrameDecodingException("Unsupport content header class id: " + classId, null);
+ throw new AMQFrameDecodingException("Unsupported content header class id: " + classId, null);
}
properties.populatePropertiesFromBuffer(buffer, propertyFlags, size);
return properties;
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/MessagePublishInfo.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/MessagePublishInfo.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/MessagePublishInfo.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/MessagePublishInfo.java Mon Dec 12 14:34:54 2016
@@ -55,9 +55,9 @@ public final class MessagePublishInfo
return _immediate;
}
- public void setImmediate(boolean immedate)
+ public void setImmediate(boolean immediate)
{
- _immediate = immedate;
+ _immediate = immediate;
}
public boolean isMandatory()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org