You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/12/07 14:36:59 UTC
svn commit: r1773086 - in /qpid/java/trunk/broker-plugins:
amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/
amqp-msg-conv-0-10-to-1-0/src/main/jav...
Author: kwall
Date: Wed Dec 7 14:36:59 2016
New Revision: 1773086
URL: http://svn.apache.org/viewvc?rev=1773086&view=rev
Log:
QPID-7575 : [Java Broker] Refactor the AMQP 1.0 MessageMetaData
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpSequence.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpValue.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/ApplicationProperties.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Data.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/DeliveryAnnotations.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Footer.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/FooterSection.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Header.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/MessageAnnotations.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/NonEncodingRetainingSection.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Properties.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java Wed Dec 7 14:36:59 2016
@@ -24,7 +24,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -58,11 +57,9 @@ public class MessageConverter_Internal_t
@Override
protected MessageMetaData_1_0 convertMetaData(final InternalMessage serverMessage,
- final NonEncodingRetainingSection<?> bodySection,
- final SectionEncoder sectionEncoder,
- final List<EncodingRetainingSection<?>> bodySections)
+ final EncodingRetainingSection<?> bodySection,
+ final SectionEncoder sectionEncoder)
{
- List<NonEncodingRetainingSection<?>> sections = new ArrayList<>(3);
Header header = new Header();
header.setDurable(serverMessage.isPersistent());
@@ -72,8 +69,6 @@ public class MessageConverter_Internal_t
header.setTtl(UnsignedInteger.valueOf(serverMessage.getExpiration()-serverMessage.getArrivalTime()));
}
- sections.add(header);
-
Properties properties = new Properties();
properties.setCorrelationId(serverMessage.getMessageHeader().getCorrelationId());
properties.setCreationTime(new Date(serverMessage.getMessageHeader().getTimestamp()));
@@ -89,24 +84,27 @@ public class MessageConverter_Internal_t
}
properties.setReplyTo(serverMessage.getMessageHeader().getReplyTo());
- sections.add(properties);
-
+ ApplicationProperties applicationProperties = null;
if(!serverMessage.getMessageHeader().getHeaderNames().isEmpty())
{
- ApplicationProperties applicationProperties = new ApplicationProperties((Map)serverMessage.getMessageHeader().getHeaderMap() );
- sections.add(applicationProperties);
+ applicationProperties = new ApplicationProperties(serverMessage.getMessageHeader().getHeaderMap() );
}
- if(bodySection != null)
- {
- sections.add(bodySection);
- }
- return new MessageMetaData_1_0(sections, sectionEncoder, bodySections, serverMessage.getArrivalTime());
+
+ return new MessageMetaData_1_0(header.createEncodingRetainingSection(sectionEncoder),
+ null,
+ null,
+ properties.createEncodingRetainingSection(sectionEncoder),
+ applicationProperties == null ? null : applicationProperties.createEncodingRetainingSection(sectionEncoder),
+ null,
+ serverMessage.getArrivalTime(),
+ bodySection.getEncodedSize());
}
- protected NonEncodingRetainingSection<?> getBodySection(final InternalMessage serverMessage)
+ protected EncodingRetainingSection<?> getBodySection(final InternalMessage serverMessage,
+ final SectionEncoder encoder)
{
- return convertToBody(serverMessage.getMessageBody());
+ return convertToBody(serverMessage.getMessageBody()).createEncodingRetainingSection(encoder);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Wed Dec 7 14:36:59 2016
@@ -24,6 +24,7 @@ import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.ListIterator;
@@ -74,17 +75,15 @@ public abstract class MessageConverter_t
private StoredMessage<MessageMetaData_1_0> convertToStoredMessage(final M serverMessage, SectionEncoder sectionEncoder)
{
- NonEncodingRetainingSection<?> bodySection = getBodySection(serverMessage);
- final ArrayList<EncodingRetainingSection<?>> bodySections = new ArrayList<>();
+ EncodingRetainingSection<?> bodySection = getBodySection(serverMessage, sectionEncoder);
- final MessageMetaData_1_0 metaData = convertMetaData(serverMessage, bodySection, sectionEncoder, bodySections);
- return convertServerMessage(metaData, serverMessage, bodySections);
+ final MessageMetaData_1_0 metaData = convertMetaData(serverMessage, bodySection, sectionEncoder);
+ return convertServerMessage(metaData, serverMessage, bodySection);
}
abstract protected MessageMetaData_1_0 convertMetaData(final M serverMessage,
- final NonEncodingRetainingSection<?> bodySection,
- SectionEncoder sectionEncoder,
- final List<EncodingRetainingSection<?>> bodySections);
+ final EncodingRetainingSection<?> bodySection,
+ SectionEncoder sectionEncoder);
private static NonEncodingRetainingSection<?> convertMessageBody(String mimeType, byte[] data)
@@ -203,7 +202,7 @@ public abstract class MessageConverter_t
private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData,
final M serverMessage,
- final ArrayList<EncodingRetainingSection<?>> bodySections)
+ final EncodingRetainingSection<?> section)
{
return new StoredMessage<MessageMetaData_1_0>()
@@ -225,32 +224,29 @@ public abstract class MessageConverter_t
{
int position = 0;
List<QpidByteBuffer> content = new ArrayList<>();
- for(EncodingRetainingSection<?> section : bodySections)
+ for(QpidByteBuffer buf : section.getEncodedForm())
{
- for(QpidByteBuffer buf : section.getEncodedForm())
+ if(position < offset)
{
- if(position < offset)
+ if(offset - position < buf.remaining())
{
- if(offset - position < buf.remaining())
- {
- QpidByteBuffer view = buf.view(offset - position, Math.min(length, buf.remaining() - (offset-position)));
- content.add(view);
- position += view.remaining();
- }
- else
- {
- position += buf.remaining();
- }
- }
- else if(position <= offset+length)
- {
- QpidByteBuffer view = buf.view(0, Math.min(length - (position-offset), buf.remaining()));
+ QpidByteBuffer view = buf.view(offset - position, Math.min(length, buf.remaining() - (offset-position)));
content.add(view);
position += view.remaining();
}
-
- buf.dispose();
+ else
+ {
+ position += buf.remaining();
+ }
+ }
+ else if(position <= offset+length)
+ {
+ QpidByteBuffer view = buf.view(0, Math.min(length - (position-offset), buf.remaining()));
+ content.add(view);
+ position += view.remaining();
}
+
+ buf.dispose();
}
return content;
}
@@ -275,7 +271,7 @@ public abstract class MessageConverter_t
};
}
- protected NonEncodingRetainingSection<?> getBodySection(final M serverMessage)
+ protected EncodingRetainingSection<?> getBodySection(final M serverMessage, final SectionEncoder encoder)
{
final String mimeType = serverMessage.getMessageHeader().getMimeType();
byte[] data = new byte[(int) serverMessage.getSize()];
@@ -295,7 +291,7 @@ public abstract class MessageConverter_t
data = uncompressed;
}
- return convertMessageBody(mimeType, data);
+ return convertMessageBody(mimeType, data).createEncodingRetainingSection(encoder);
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Wed Dec 7 14:36:59 2016
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
@@ -36,16 +35,23 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.*;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationPropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -71,230 +77,55 @@ public class MessageMetaData_1_0 impleme
private FooterSection _footerSection;
- private MessageHeader_1_0 _messageHeader;
+ private final MessageHeader_1_0 _messageHeader = new MessageHeader_1_0();
private final int _version;
private final long _arrivalTime;
- public MessageMetaData_1_0(List<NonEncodingRetainingSection<?>> sections,
- SectionEncoder encoder,
- final List<EncodingRetainingSection<?>> bodySections,
- final long arrivalTime)
- {
- _version = VERSION_BYTE;
+ public MessageMetaData_1_0(final HeaderSection headerSection,
+ final DeliveryAnnotationsSection deliveryAnnotationsSection,
+ final MessageAnnotationsSection messageAnnotationsSection,
+ final PropertiesSection propertiesSection,
+ final ApplicationPropertiesSection applicationPropertiesSection,
+ final FooterSection footerSection,
+ final long arrivalTime,
+ final long contentSize)
+ {
+ this(headerSection,
+ deliveryAnnotationsSection,
+ messageAnnotationsSection,
+ propertiesSection,
+ applicationPropertiesSection,
+ footerSection,
+ arrivalTime,
+ contentSize,
+ VERSION_BYTE);
+ }
+
+ public MessageMetaData_1_0(final HeaderSection headerSection,
+ final DeliveryAnnotationsSection deliveryAnnotationsSection,
+ final MessageAnnotationsSection messageAnnotationsSection,
+ final PropertiesSection propertiesSection,
+ final ApplicationPropertiesSection applicationPropertiesSection,
+ final FooterSection footerSection,
+ final long arrivalTime,
+ final long contentSize,
+ final int version)
+ {
+ _headerSection = headerSection;
+ _deliveryAnnotationsSection = deliveryAnnotationsSection;
+ _messageAnnotationsSection = messageAnnotationsSection;
+ _propertiesSection = propertiesSection;
+ _applicationPropertiesSection = applicationPropertiesSection;
+ _footerSection = footerSection;
_arrivalTime = arrivalTime;
- Iterator<NonEncodingRetainingSection<?>> iter = sections.iterator();
- NonEncodingRetainingSection<?> s = iter.hasNext() ? iter.next() : null;
- long contentSize = 0L;
- if(s instanceof Header)
- {
- encoder.reset();
- encoder.encodeObject(s);
- Binary encodedOutput = encoder.getEncoding();
- final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
- _headerSection = new HeaderSection((Header)s, Collections.singletonList(buf), encoder.getRegistry());
- s = iter.hasNext() ? iter.next() : null;
- }
-
- if(s instanceof DeliveryAnnotations)
- {
- encoder.reset();
- encoder.encodeObject(s);
- Binary encodedOutput = encoder.getEncoding();
- final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
- _deliveryAnnotationsSection = new DeliveryAnnotationsSection((DeliveryAnnotations)s, Collections.singletonList(buf), encoder.getRegistry());
- s = iter.hasNext() ? iter.next() : null;
- }
-
- if(s instanceof MessageAnnotations)
- {
- encoder.reset();
- encoder.encodeObject(s);
- Binary encodedOutput = encoder.getEncoding();
- final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
- _messageAnnotationsSection = new MessageAnnotationsSection((MessageAnnotations)s, Collections.singletonList(buf), encoder.getRegistry());
- s = iter.hasNext() ? iter.next() : null;
- }
-
- if(s instanceof Properties)
- {
- encoder.reset();
- encoder.encodeObject(s);
- Binary encodedOutput = encoder.getEncoding();
- final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
- _propertiesSection = new PropertiesSection((Properties)s, Collections.singletonList(buf), encoder.getRegistry());
- s = iter.hasNext() ? iter.next() : null;
- }
-
- if(s instanceof ApplicationProperties)
- {
- encoder.reset();
- encoder.encodeObject(s);
- Binary encodedOutput = encoder.getEncoding();
- final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
- _applicationPropertiesSection = new ApplicationPropertiesSection((ApplicationProperties)s, Collections.singletonList(buf), encoder.getRegistry());
- s = iter.hasNext() ? iter.next() : null;
- }
-
- if(s instanceof AmqpValue)
- {
- encoder.reset();
- encoder.encodeObject(s);
- Binary encodedOutput = encoder.getEncoding();
- final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
- bodySections.add(new AmqpValueSection((AmqpValue)s, Collections.singletonList(buf), encoder.getRegistry()));
-
- contentSize = buf.remaining();
- s = iter.hasNext() ? iter.next() : null;
- }
- else if(s instanceof Data)
- {
- do
- {
- encoder.reset();
- encoder.encodeObject(s);
- Binary encodedOutput = encoder.getEncoding();
- final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
- bodySections.add(new DataSection((Data)s, Collections.singletonList(buf), encoder.getRegistry()));
-
- contentSize += buf.remaining();
-
- s = iter.hasNext() ? iter.next() : null;
- } while(s instanceof Data);
- }
- else if(s instanceof AmqpSequence)
- {
- do
- {
- encoder.reset();
- encoder.encodeObject(s);
- Binary encodedOutput = encoder.getEncoding();
- final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
- bodySections.add(new AmqpSequenceSection((AmqpSequence)s, Collections.singletonList(buf), encoder.getRegistry()));
-
- contentSize += buf.remaining();
- s = iter.hasNext() ? iter.next() : null;
- }
- while(s instanceof AmqpSequence);
- }
-
- if(s instanceof Footer)
- {
- encoder.reset();
- encoder.encodeObject(s);
- Binary encodedOutput = encoder.getEncoding();
- final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
- _footerSection = new FooterSection((Footer)s, Collections.singletonList(buf), encoder.getRegistry());
- }
_contentSize = contentSize;
-
- }
-
- public Properties getProperties()
- {
- return _propertiesSection == null ? null : _propertiesSection.getValue();
- }
-
-
- public PropertiesSection getPropertiesSection()
- {
- return _propertiesSection;
- }
-
- public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder, List<EncodingRetainingSection<?>> dataSections, long arrivalTime)
- {
- _version = VERSION_BYTE;
- _arrivalTime = arrivalTime;
- List<QpidByteBuffer> src = new ArrayList<>(fragments.length);
- for(QpidByteBuffer buf : fragments)
- {
- src.add(buf.duplicate());
- }
-
- try
- {
- EncodingRetainingSection<?> s = decoder.readSection(src);
- long contentSize = 0L;
- if(s instanceof HeaderSection)
- {
- _headerSection = (HeaderSection) s;
- s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
- }
-
- if(s instanceof DeliveryAnnotationsSection)
- {
- _deliveryAnnotationsSection = (DeliveryAnnotationsSection) s;
- s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
- }
-
- if(s instanceof MessageAnnotationsSection)
- {
- _messageAnnotationsSection = (MessageAnnotationsSection) s;
- s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
- }
-
- if(s instanceof PropertiesSection)
- {
- _propertiesSection = (PropertiesSection) s;
- s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
- }
-
- if(s instanceof ApplicationPropertiesSection)
- {
- _applicationPropertiesSection = (ApplicationPropertiesSection) s;
- s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
- }
-
- if(s instanceof AmqpValueSection)
- {
- contentSize = s.getEncodedSize();
- dataSections.add(s);
- s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
- }
- else if(s instanceof DataSection)
- {
- do
- {
- contentSize += s.getEncodedSize();
- dataSections.add(s);
- s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
- } while(s instanceof DataSection);
- }
- else if(s instanceof AmqpSequenceSection)
- {
- do
- {
- contentSize += s.getEncodedSize();
- dataSections.add(s);
- s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
- }
- while(s instanceof AmqpSequenceSection);
- }
-
- if(s instanceof FooterSection)
- {
- _footerSection = (FooterSection) s;
- }
- _contentSize = contentSize;
- }
- catch (AmqpErrorException e)
- {
- _logger.error("Decoding read section error", e);
- // TODO - fix error handling
- throw new IllegalArgumentException(e);
- }
- finally
- {
- for(QpidByteBuffer buf : src)
- {
- buf.dispose();
- }
- }
-
- _messageHeader = new MessageHeader_1_0();
-
+ _version = version;
}
- private MessageMetaData_1_0(List<EncodingRetainingSection<?>> sections, long contentSize, int version, long arrivalTime)
+ private MessageMetaData_1_0(List<EncodingRetainingSection<?>> sections,
+ long contentSize,
+ int version,
+ long arrivalTime)
{
_contentSize = contentSize;
_version = version;
@@ -303,44 +134,52 @@ public class MessageMetaData_1_0 impleme
Iterator<EncodingRetainingSection<?>> sectIter = sections.iterator();
EncodingRetainingSection<?> section = sectIter.hasNext() ? sectIter.next() : null;
- if(section instanceof HeaderSection)
+ if (section instanceof HeaderSection)
{
_headerSection = (HeaderSection) section;
section = sectIter.hasNext() ? sectIter.next() : null;
}
- if(section instanceof DeliveryAnnotationsSection)
+ if (section instanceof DeliveryAnnotationsSection)
{
_deliveryAnnotationsSection = (DeliveryAnnotationsSection) section;
section = sectIter.hasNext() ? sectIter.next() : null;
}
- if(section instanceof MessageAnnotationsSection)
+ if (section instanceof MessageAnnotationsSection)
{
_messageAnnotationsSection = (MessageAnnotationsSection) section;
section = sectIter.hasNext() ? sectIter.next() : null;
}
- if(section instanceof PropertiesSection)
+ if (section instanceof PropertiesSection)
{
_propertiesSection = ((PropertiesSection) section);
section = sectIter.hasNext() ? sectIter.next() : null;
}
- if(section instanceof ApplicationPropertiesSection)
+ if (section instanceof ApplicationPropertiesSection)
{
_applicationPropertiesSection = (ApplicationPropertiesSection) section;
section = sectIter.hasNext() ? sectIter.next() : null;
}
- if(section instanceof FooterSection)
+ if (section instanceof FooterSection)
{
_footerSection = (FooterSection) section;
section = sectIter.hasNext() ? sectIter.next() : null;
}
+ }
- _messageHeader = new MessageHeader_1_0();
+ public Properties getProperties()
+ {
+ return _propertiesSection == null ? null : _propertiesSection.getValue();
+ }
+
+ public PropertiesSection getPropertiesSection()
+ {
+ return _propertiesSection;
}
@@ -354,27 +193,27 @@ public class MessageMetaData_1_0 impleme
{
long size = 17L;
- if(_headerSection != null)
+ if (_headerSection != null)
{
size += _headerSection.getEncodedSize();
}
- if(_deliveryAnnotationsSection != null)
+ if (_deliveryAnnotationsSection != null)
{
size += _deliveryAnnotationsSection.getEncodedSize();
}
- if(_messageAnnotationsSection != null)
+ if (_messageAnnotationsSection != null)
{
size += _messageAnnotationsSection.getEncodedSize();
}
- if(_propertiesSection != null)
+ if (_propertiesSection != null)
{
size += _propertiesSection.getEncodedSize();
}
- if(_applicationPropertiesSection != null)
+ if (_applicationPropertiesSection != null)
{
size += _applicationPropertiesSection.getEncodedSize();
}
- if(_footerSection != null)
+ if (_footerSection != null)
{
size += _footerSection.getEncodedSize();
}
@@ -387,31 +226,30 @@ public class MessageMetaData_1_0 impleme
dest.put(VERSION_BYTE);
dest.putLong(_arrivalTime);
dest.putLong(_contentSize);
- if(_headerSection != null)
+ if (_headerSection != null)
{
_headerSection.writeTo(dest);
}
- if(_deliveryAnnotationsSection != null)
+ if (_deliveryAnnotationsSection != null)
{
_deliveryAnnotationsSection.writeTo(dest);
}
- if(_messageAnnotationsSection != null)
+ if (_messageAnnotationsSection != null)
{
_messageAnnotationsSection.writeTo(dest);
}
- if(_propertiesSection != null)
+ if (_propertiesSection != null)
{
_propertiesSection.writeTo(dest);
}
- if(_applicationPropertiesSection != null)
+ if (_applicationPropertiesSection != null)
{
_applicationPropertiesSection.writeTo(dest);
}
- if(_footerSection != null)
+ if (_footerSection != null)
{
_footerSection.writeTo(dest);
}
-
}
public int getContentSize()
@@ -430,34 +268,38 @@ public class MessageMetaData_1_0 impleme
}
@Override
- public synchronized void dispose()
+ public synchronized void dispose()
{
- if(_headerSection != null)
+ if (_headerSection != null)
{
_headerSection.dispose();
_headerSection = null;
}
- if(_deliveryAnnotationsSection != null)
+ if (_deliveryAnnotationsSection != null)
{
_deliveryAnnotationsSection.dispose();
_deliveryAnnotationsSection = null;
}
- if(_messageAnnotationsSection != null)
+ if (_messageAnnotationsSection != null)
{
_messageAnnotationsSection.dispose();
_deliveryAnnotationsSection = null;
}
- if(_propertiesSection != null)
+ if (_propertiesSection != null)
{
_propertiesSection.dispose();
_propertiesSection = null;
}
- if(_applicationPropertiesSection != null)
+ if (_applicationPropertiesSection != null)
{
_applicationPropertiesSection.dispose();
_applicationPropertiesSection = null;
}
-
+ if (_footerSection != null)
+ {
+ _footerSection.dispose();
+ _footerSection = null;
+ }
}
@Override
@@ -526,7 +368,6 @@ public class MessageMetaData_1_0 impleme
buf.get();
arrivalTime = buf.getLong();
contentSize = buf.getLong();
-
}
else
{
@@ -537,31 +378,30 @@ public class MessageMetaData_1_0 impleme
List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(Collections.singletonList(buf));
- if(versionByte == 0)
+ if (versionByte == 0)
{
Iterator<EncodingRetainingSection<?>> iter = sections.iterator();
- while(iter.hasNext())
+ while (iter.hasNext())
{
final EncodingRetainingSection<?> section = iter.next();
- if(section instanceof DataSection || section instanceof AmqpValueSection || section instanceof AmqpSequenceSection)
+ if (section instanceof DataSection
+ || section instanceof AmqpValueSection
+ || section instanceof AmqpSequenceSection)
{
contentSize += section.getEncodedSize();
iter.remove();
section.dispose();
}
}
-
}
return new MessageMetaData_1_0(sections, contentSize, (int) versionByte & 0xff, arrivalTime);
-
}
catch (AmqpErrorException e)
{
//TODO
throw new ConnectionScopedRuntimeException(e);
}
-
}
}
@@ -570,7 +410,7 @@ public class MessageMetaData_1_0 impleme
public String getCorrelationId()
{
- if(_propertiesSection == null || _propertiesSection.getValue().getCorrelationId() == null)
+ if (_propertiesSection == null || _propertiesSection.getValue().getCorrelationId() == null)
{
return null;
}
@@ -583,14 +423,16 @@ public class MessageMetaData_1_0 impleme
@Override
public long getExpiration()
{
- final Date absoluteExpiryTime = _propertiesSection == null ? null : _propertiesSection.getValue().getAbsoluteExpiryTime();
- if(absoluteExpiryTime != null)
+ final Date absoluteExpiryTime =
+ _propertiesSection == null ? null : _propertiesSection.getValue().getAbsoluteExpiryTime();
+ if (absoluteExpiryTime != null)
{
return absoluteExpiryTime.getTime();
}
else
{
- final Date creationTime = _propertiesSection == null ? null : _propertiesSection.getValue().getCreationTime();
+ final Date creationTime =
+ _propertiesSection == null ? null : _propertiesSection.getValue().getCreationTime();
final UnsignedInteger ttl = _headerSection == null ? null : _headerSection.getValue().getTtl();
return ttl == null || creationTime == null ? 0L : ttl.longValue() + creationTime.getTime();
}
@@ -598,7 +440,7 @@ public class MessageMetaData_1_0 impleme
public String getMessageId()
{
- if(_propertiesSection == null || _propertiesSection.getValue().getMessageId() == null)
+ if (_propertiesSection == null || _propertiesSection.getValue().getMessageId() == null)
{
return null;
}
@@ -611,7 +453,7 @@ public class MessageMetaData_1_0 impleme
public String getMimeType()
{
- if(_propertiesSection == null || _propertiesSection.getValue().getContentType() == null)
+ if (_propertiesSection == null || _propertiesSection.getValue().getContentType() == null)
{
return null;
}
@@ -628,7 +470,7 @@ public class MessageMetaData_1_0 impleme
public byte getPriority()
{
- if(_headerSection == null || _headerSection.getValue().getPriority() == null)
+ if (_headerSection == null || _headerSection.getValue().getPriority() == null)
{
return 4; //javax.jms.Message.DEFAULT_PRIORITY;
}
@@ -640,7 +482,7 @@ public class MessageMetaData_1_0 impleme
public long getTimestamp()
{
- if(_propertiesSection == null || _propertiesSection.getValue().getCreationTime() == null)
+ if (_propertiesSection == null || _propertiesSection.getValue().getCreationTime() == null)
{
return 0L;
}
@@ -648,7 +490,6 @@ public class MessageMetaData_1_0 impleme
{
return _propertiesSection.getValue().getCreationTime().getTime();
}
-
}
@@ -658,13 +499,15 @@ public class MessageMetaData_1_0 impleme
long notValidBefore;
Object annotation;
- if(_messageAnnotationsSection != null && (annotation = _messageAnnotationsSection.getValue().get(DELIVERY_TIME)) instanceof Number)
+ if (_messageAnnotationsSection != null && (annotation =
+ _messageAnnotationsSection.getValue().get(DELIVERY_TIME)) instanceof Number)
{
- notValidBefore = ((Number)annotation).longValue();
+ notValidBefore = ((Number) annotation).longValue();
}
- else if(_messageAnnotationsSection != null && (annotation = _messageAnnotationsSection.getValue().get(NOT_VALID_BEFORE)) instanceof Number)
+ else if (_messageAnnotationsSection != null && (annotation =
+ _messageAnnotationsSection.getValue().get(NOT_VALID_BEFORE)) instanceof Number)
{
- notValidBefore = ((Number)annotation).longValue();
+ notValidBefore = ((Number) annotation).longValue();
}
else
{
@@ -676,13 +519,13 @@ public class MessageMetaData_1_0 impleme
public String getType()
{
String subject = getSubject();
- if(subject != null)
+ if (subject != null)
{
return subject;
}
// Use legacy annotation if present and there was no subject
- if(_messageAnnotationsSection == null || _messageAnnotationsSection.getValue().get(JMS_TYPE) == null)
+ if (_messageAnnotationsSection == null || _messageAnnotationsSection.getValue().get(JMS_TYPE) == null)
{
return null;
}
@@ -694,7 +537,7 @@ public class MessageMetaData_1_0 impleme
public String getReplyTo()
{
- if(_propertiesSection == null || _propertiesSection.getValue().getReplyTo() == null)
+ if (_propertiesSection == null || _propertiesSection.getValue().getReplyTo() == null)
{
return null;
}
@@ -723,14 +566,14 @@ public class MessageMetaData_1_0 impleme
public boolean containsHeaders(final Set<String> names)
{
- if(_applicationPropertiesSection == null)
+ if (_applicationPropertiesSection == null)
{
return false;
}
- for(String key : names)
+ for (String key : names)
{
- if(!_applicationPropertiesSection.getValue().containsKey(key))
+ if (!_applicationPropertiesSection.getValue().containsKey(key))
{
return false;
}
@@ -741,7 +584,7 @@ public class MessageMetaData_1_0 impleme
@Override
public Collection<String> getHeaderNames()
{
- if(_applicationPropertiesSection == null)
+ if (_applicationPropertiesSection == null)
{
return Collections.emptySet();
}
@@ -765,9 +608,8 @@ public class MessageMetaData_1_0 impleme
public Map<String, Object> getHeadersAsMap()
{
- return _applicationPropertiesSection == null ? new HashMap<String,Object>() : new HashMap<>(
+ return _applicationPropertiesSection == null ? new HashMap<String, Object>() : new HashMap<>(
_applicationPropertiesSection.getValue());
}
}
-
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Wed Dec 7 14:36:59 2016
@@ -59,8 +59,7 @@ public class Message_1_0 extends Abstrac
.registerMessagingLayer()
.registerTransactionLayer()
.registerSecurityLayer();
- public static final MessageMetaData_1_0 DELETED_MESSAGE_METADATA = new MessageMetaData_1_0(Collections.<NonEncodingRetainingSection<?>>emptyList(), new SectionEncoderImpl(DESCRIBED_TYPE_REGISTRY),
- new ArrayList<EncodingRetainingSection<?>>(), 0L);
+ public static final MessageMetaData_1_0 DELETED_MESSAGE_METADATA = new MessageMetaData_1_0(null, null, null, null, null, null, 0L, 0L);
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java Wed Dec 7 14:36:59 2016
@@ -24,18 +24,32 @@ import java.security.AccessControlExcept
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationPropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
@@ -52,6 +66,7 @@ import org.apache.qpid.server.util.Serve
public class StandardReceivingLink_1_0 implements ReceivingLink_1_0
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLink_1_0.class);
private NamedAddressSpace _addressSpace;
private ReceivingDestination _destination;
@@ -84,8 +99,6 @@ public class StandardReceivingLink_1_0 i
public void messageTransfer(Transfer xfr)
{
- // TODO - cope with fragmented messages
-
List<QpidByteBuffer> fragments = null;
org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = xfr.getState();
@@ -145,12 +158,8 @@ public class StandardReceivingLink_1_0 i
}
else
{
- MessageMetaData_1_0 mmd = null;
List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
- mmd = new MessageMetaData_1_0(fragments.toArray(new QpidByteBuffer[fragments.size()]),
- _sectionDecoder,
- dataSections,
- System.currentTimeMillis());
+ MessageMetaData_1_0 mmd = createMessageMetaData(fragments, dataSections);
MessageHandle<MessageMetaData_1_0> handle = _addressSpace.getMessageStore().addMessage(mmd);
for(EncodingRetainingSection<?> dataSection : dataSections)
@@ -160,7 +169,6 @@ public class StandardReceivingLink_1_0 i
handle.addContent(buf);
buf.dispose();
}
-
}
final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
Message_1_0 message = new Message_1_0(storedMessage, getSession().getConnection().getReference());
@@ -269,6 +277,107 @@ public class StandardReceivingLink_1_0 i
}
}
+ private MessageMetaData_1_0 createMessageMetaData(final List<QpidByteBuffer> fragments,
+ final List<EncodingRetainingSection<?>> dataSections)
+ {
+
+ List<EncodingRetainingSection<?>> sections;
+ try
+ {
+ sections = _sectionDecoder.parseAll(fragments);
+ }
+ catch (AmqpErrorException e)
+ {
+ LOGGER.error("Decoding read section error", e);
+ // TODO - fix error handling
+ throw new IllegalArgumentException(e);
+ }
+
+ long contentSize = 0L;
+
+ HeaderSection headerSection = null;
+ PropertiesSection propertiesSection = null;
+ DeliveryAnnotationsSection deliveryAnnotationsSection = null;
+ MessageAnnotationsSection messageAnnotationsSection = null;
+ ApplicationPropertiesSection applicationPropertiesSection = null;
+ FooterSection footerSection = null;
+ Iterator<EncodingRetainingSection<?>> iter = sections.iterator();
+ EncodingRetainingSection<?> s = iter.hasNext() ? iter.next() : null;
+ if (s instanceof HeaderSection)
+ {
+ headerSection = (HeaderSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s instanceof DeliveryAnnotationsSection)
+ {
+ deliveryAnnotationsSection = (DeliveryAnnotationsSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s instanceof MessageAnnotationsSection)
+ {
+ messageAnnotationsSection = (MessageAnnotationsSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s instanceof PropertiesSection)
+ {
+ propertiesSection = (PropertiesSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s instanceof ApplicationPropertiesSection)
+ {
+ applicationPropertiesSection = (ApplicationPropertiesSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s instanceof AmqpValueSection)
+ {
+ contentSize = s.getEncodedSize();
+ dataSections.add(s);
+ s = iter.hasNext() ? iter.next() : null;
+ }
+ else if (s instanceof DataSection)
+ {
+ do
+ {
+ contentSize += s.getEncodedSize();
+ dataSections.add(s);
+ s = iter.hasNext() ? iter.next() : null;
+ } while (s instanceof DataSection);
+ }
+ else if (s instanceof AmqpSequenceSection)
+ {
+ do
+ {
+ contentSize += s.getEncodedSize();
+ dataSections.add(s);
+ s = iter.hasNext() ? iter.next() : null;
+ }
+ while (s instanceof AmqpSequenceSection);
+ }
+
+ if (s instanceof FooterSection)
+ {
+ footerSection = (FooterSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+ if (s != null)
+ {
+ // TODO error
+ }
+ return new MessageMetaData_1_0(headerSection,
+ deliveryAnnotationsSection,
+ messageAnnotationsSection,
+ propertiesSection,
+ applicationPropertiesSection,
+ footerSection,
+ System.currentTimeMillis(),
+ contentSize);
+ }
+
private ReceiverSettleMode getReceivingSettlementMode()
{
return _receivingSettlementMode;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpSequence.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpSequence.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpSequence.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpSequence.java Wed Dec 7 14:36:59 2016
@@ -24,8 +24,13 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
+import java.util.Collections;
import java.util.List;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+
public class AmqpSequence implements NonEncodingRetainingSection<List>
{
@@ -48,4 +53,13 @@ public class AmqpSequence implements Non
return "AmqpSequence{" + _value + '}';
}
+ @Override
+ public AmqpSequenceSection createEncodingRetainingSection(final SectionEncoder encoder)
+ {
+ encoder.reset();
+ encoder.encodeObject(this);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ return new AmqpSequenceSection(this, Collections.singletonList(buf), encoder.getRegistry());
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpValue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpValue.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpValue.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpValue.java Wed Dec 7 14:36:59 2016
@@ -24,6 +24,12 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
+import java.util.Collections;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+
public class AmqpValue implements NonEncodingRetainingSection<Object>
{
@@ -49,4 +55,13 @@ public class AmqpValue implements NonEnc
}
+ @Override
+ public AmqpValueSection createEncodingRetainingSection(final SectionEncoder encoder)
+ {
+ encoder.reset();
+ encoder.encodeObject(this);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ return new AmqpValueSection(this, Collections.singletonList(buf), encoder.getRegistry());
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/ApplicationProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/ApplicationProperties.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/ApplicationProperties.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/ApplicationProperties.java Wed Dec 7 14:36:59 2016
@@ -24,8 +24,13 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
+import java.util.Collections;
import java.util.Map;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+
public class ApplicationProperties implements NonEncodingRetainingSection<Map<String,Object>>
{
@@ -41,4 +46,13 @@ public class ApplicationProperties imple
return _value;
}
+ @Override
+ public ApplicationPropertiesSection createEncodingRetainingSection(final SectionEncoder encoder)
+ {
+ encoder.reset();
+ encoder.encodeObject(this);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ return new ApplicationPropertiesSection(this, Collections.singletonList(buf), encoder.getRegistry());
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Data.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Data.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Data.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Data.java Wed Dec 7 14:36:59 2016
@@ -21,6 +21,10 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
+import java.util.Collections;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
public class Data implements NonEncodingRetainingSection<Binary>
@@ -44,4 +48,14 @@ public class Data implements NonEncoding
{
return "Data{" + _value + '}';
}
+
+ @Override
+ public DataSection createEncodingRetainingSection(final SectionEncoder encoder)
+ {
+ encoder.reset();
+ encoder.encodeObject(this);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ return new DataSection(this, Collections.singletonList(buf), encoder.getRegistry());
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/DeliveryAnnotations.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/DeliveryAnnotations.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/DeliveryAnnotations.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/DeliveryAnnotations.java Wed Dec 7 14:36:59 2016
@@ -21,8 +21,12 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
+import java.util.Collections;
import java.util.Map;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
public class DeliveryAnnotations implements NonEncodingRetainingSection<Map<Symbol,Object>>
@@ -40,4 +44,13 @@ public class DeliveryAnnotations impleme
return _value;
}
+ @Override
+ public DeliveryAnnotationsSection createEncodingRetainingSection(final SectionEncoder encoder)
+ {
+ encoder.reset();
+ encoder.encodeObject(this);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ return new DeliveryAnnotationsSection(this, Collections.singletonList(buf), encoder.getRegistry());
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Footer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Footer.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Footer.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Footer.java Wed Dec 7 14:36:59 2016
@@ -21,8 +21,12 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
+import java.util.Collections;
import java.util.Map;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
public class Footer implements NonEncodingRetainingSection<Map<Symbol,Object>>
@@ -47,4 +51,13 @@ public class Footer implements NonEncodi
return "Footer{" + _value + '}';
}
+ @Override
+ public FooterSection createEncodingRetainingSection(final SectionEncoder encoder)
+ {
+ encoder.reset();
+ encoder.encodeObject(this);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ return new FooterSection(this, Collections.singletonList(buf), encoder.getRegistry());
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/FooterSection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/FooterSection.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/FooterSection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/FooterSection.java Wed Dec 7 14:36:59 2016
@@ -22,19 +22,21 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
import java.util.List;
+import java.util.Map;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.messaging.codec.FooterConstructor;
-public class FooterSection extends AbstractSection<Footer>
+public class FooterSection extends AbstractSection<Map<Symbol,Object>>
{
private final DescribedTypeConstructorRegistry _typeRegistry;
- private Footer _footer;
+ private Map<Symbol,Object> _value;
public FooterSection(final DescribedTypeConstructorRegistry describedTypeRegistry)
{
@@ -45,7 +47,7 @@ public class FooterSection extends Abstr
final List<QpidByteBuffer> encodedForm,
final DescribedTypeConstructorRegistry registry)
{
- _footer = footer;
+ _value = footer.getValue();
_typeRegistry = registry;
setEncodedForm(encodedForm);
}
@@ -57,13 +59,13 @@ public class FooterSection extends Abstr
}
@Override
- public synchronized Footer getValue()
+ public synchronized Map<Symbol,Object> getValue()
{
- if(_footer == null)
+ if(_value == null)
{
decode();
}
- return _footer;
+ return _value;
}
private void decode()
@@ -81,7 +83,7 @@ public class FooterSection extends Abstr
ValueHandler handler = new ValueHandler(_typeRegistry);
Object descriptor = handler.parse(input);
FooterConstructor constructor = new FooterConstructor();
- _footer = constructor.construct(descriptor, input, originalPositions, handler).construct(input, handler);
+ _value = constructor.construct(descriptor, input, originalPositions, handler).construct(input, handler).getValue();
for(int i = 0; i < input.size(); i++)
{
input.get(i).dispose();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Header.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Header.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Header.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Header.java Wed Dec 7 14:36:59 2016
@@ -24,6 +24,11 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
+import java.util.Collections;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
@@ -151,4 +156,13 @@ public class Header implements NonEncodi
return this;
}
+ @Override
+ public HeaderSection createEncodingRetainingSection(final SectionEncoder encoder)
+ {
+ encoder.reset();
+ encoder.encodeObject(this);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ return new HeaderSection(this, Collections.singletonList(buf), encoder.getRegistry());
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/MessageAnnotations.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/MessageAnnotations.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/MessageAnnotations.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/MessageAnnotations.java Wed Dec 7 14:36:59 2016
@@ -21,8 +21,12 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
+import java.util.Collections;
import java.util.Map;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
public class MessageAnnotations implements NonEncodingRetainingSection<Map<Symbol,Object>>
@@ -40,4 +44,13 @@ public class MessageAnnotations implemen
return _value;
}
+ @Override
+ public MessageAnnotationsSection createEncodingRetainingSection(final SectionEncoder encoder)
+ {
+ encoder.reset();
+ encoder.encodeObject(this);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ return new MessageAnnotationsSection(this, Collections.singletonList(buf), encoder.getRegistry());
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/NonEncodingRetainingSection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/NonEncodingRetainingSection.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/NonEncodingRetainingSection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/NonEncodingRetainingSection.java Wed Dec 7 14:36:59 2016
@@ -20,8 +20,10 @@
*/
package org.apache.qpid.server.protocol.v1_0.type.messaging;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
import org.apache.qpid.server.protocol.v1_0.type.Section;
public interface NonEncodingRetainingSection<T> extends Section<T>
{
+ EncodingRetainingSection<T> createEncodingRetainingSection(SectionEncoder encoder);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Properties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Properties.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Properties.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Properties.java Wed Dec 7 14:36:59 2016
@@ -21,8 +21,11 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
+import java.util.Collections;
import java.util.Date;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
@@ -318,4 +321,13 @@ public class Properties implements NonEn
return this;
}
+ @Override
+ public PropertiesSection createEncodingRetainingSection(final SectionEncoder encoder)
+ {
+ encoder.reset();
+ encoder.encodeObject(this);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ return new PropertiesSection(this, Collections.singletonList(buf), encoder.getRegistry());
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java Wed Dec 7 14:36:59 2016
@@ -21,9 +21,7 @@
package org.apache.qpid.server.protocol.converter.v0_10_v1_0;
-import java.util.ArrayList;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import org.apache.qpid.server.plugin.PluggableService;
@@ -39,7 +37,6 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.messaging.Data;
import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.NonEncodingRetainingSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageDeliveryMode;
@@ -58,11 +55,9 @@ public class MessageConverter_0_10_to_1_
@Override
protected MessageMetaData_1_0 convertMetaData(MessageTransferMessage serverMessage,
- final NonEncodingRetainingSection<?> bodySection,
- SectionEncoder sectionEncoder,
- final List<EncodingRetainingSection<?>> bodySections)
+ final EncodingRetainingSection<?> bodySection,
+ SectionEncoder sectionEncoder)
{
- List<NonEncodingRetainingSection<?>> sections = new ArrayList<>(3);
final MessageProperties msgProps = serverMessage.getHeader().getMessageProperties();
final DeliveryProperties deliveryProps = serverMessage.getHeader().getDeliveryProperties();
@@ -78,10 +73,11 @@ public class MessageConverter_0_10_to_1_
{
header.setTtl(UnsignedInteger.valueOf(deliveryProps.getTtl()));
}
- sections.add(header);
}
Properties props = new Properties();
+ ApplicationProperties applicationProperties = null;
+
/*
TODO: the current properties are not currently set:
@@ -134,27 +130,27 @@ public class MessageConverter_0_10_to_1_
props.setUserId(new Binary(msgProps.getUserId()));
}
- sections.add(props);
-
- Map<String, Object> applicationProperties = msgProps.getApplicationHeaders();
-
- if(applicationProperties != null)
+ Map<String, Object> applicationPropertiesMap = msgProps.getApplicationHeaders();
+ if(applicationPropertiesMap != null)
{
- if(applicationProperties.containsKey("qpid.subject"))
+ if(applicationPropertiesMap.containsKey("qpid.subject"))
{
- props.setSubject(String.valueOf(applicationProperties.get("qpid.subject")));
- applicationProperties = new LinkedHashMap<>(applicationProperties);
- applicationProperties.remove("qpid.subject");
+ props.setSubject(String.valueOf(applicationPropertiesMap.get("qpid.subject")));
+ applicationPropertiesMap = new LinkedHashMap<>(applicationPropertiesMap);
+ applicationPropertiesMap.remove("qpid.subject");
}
- sections.add(new ApplicationProperties((Map)applicationProperties));
+ applicationProperties = new ApplicationProperties(applicationPropertiesMap);
}
}
- if(bodySection != null)
- {
- sections.add(bodySection);
- }
- return new MessageMetaData_1_0(sections, sectionEncoder, bodySections, serverMessage.getArrivalTime());
+ return new MessageMetaData_1_0(header.createEncodingRetainingSection(sectionEncoder),
+ null,
+ null,
+ props.createEncodingRetainingSection(sectionEncoder),
+ applicationProperties == null ? null : applicationProperties.createEncodingRetainingSection(sectionEncoder),
+ null,
+ serverMessage.getArrivalTime(),
+ bodySection.getEncodedSize());
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java?rev=1773086&r1=1773085&r2=1773086&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java Wed Dec 7 14:36:59 2016
@@ -21,9 +21,7 @@
package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import org.apache.qpid.framing.AMQShortString;
@@ -42,7 +40,6 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.messaging.Data;
import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.NonEncodingRetainingSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.util.GZIPUtils;
@@ -57,13 +54,9 @@ public class MessageConverter_0_8_to_1_0
}
protected MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage,
- final NonEncodingRetainingSection<?> bodySection,
- SectionEncoder sectionEncoder,
- final List<EncodingRetainingSection<?>> bodySections)
+ final EncodingRetainingSection<?> bodySection,
+ SectionEncoder sectionEncoder)
{
-
- List<NonEncodingRetainingSection<?>> sections = new ArrayList<>(3);
-
Header header = new Header();
header.setDurable(serverMessage.isPersistent());
@@ -79,7 +72,6 @@ public class MessageConverter_0_8_to_1_0
{
header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime));
}
- sections.add(header);
Properties props = new Properties();
@@ -162,25 +154,27 @@ public class MessageConverter_0_8_to_1_0
props.setUserId(new Binary(contentHeader.getUserId().getBytes()));
}
- sections.add(props);
-
- Map<String, Object> applicationProperties = FieldTable.convertToMap(contentHeader.getHeaders());
+ Map<String, Object> applicationPropertiesMap = FieldTable.convertToMap(contentHeader.getHeaders());
- if(applicationProperties.containsKey("qpid.subject"))
+ if(applicationPropertiesMap.containsKey("qpid.subject"))
{
- props.setSubject(String.valueOf(applicationProperties.get("qpid.subject")));
- applicationProperties = new LinkedHashMap<>(applicationProperties);
- applicationProperties.remove("qpid.subject");
+ props.setSubject(String.valueOf(applicationPropertiesMap.get("qpid.subject")));
+ applicationPropertiesMap = new LinkedHashMap<>(applicationPropertiesMap);
+ applicationPropertiesMap.remove("qpid.subject");
}
// TODO: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-application
// Adhere to "the values are restricted to be of simple types only, that is, excluding map, list, and array types".
- // 0-8..0-91 for instance suppoorted field tables with maps as values.
- sections.add(new ApplicationProperties((Map)applicationProperties));
- if(bodySection != null)
- {
- sections.add(bodySection);
- }
- return new MessageMetaData_1_0(sections, sectionEncoder, bodySections, serverMessage.getArrivalTime());
+ // 0-8..0-91 for instance supported field tables with maps as values.
+ final ApplicationProperties applicationProperties = new ApplicationProperties(applicationPropertiesMap);
+
+ return new MessageMetaData_1_0(header.createEncodingRetainingSection(sectionEncoder),
+ null,
+ null,
+ props.createEncodingRetainingSection(sectionEncoder),
+ applicationProperties.createEncodingRetainingSection(sectionEncoder),
+ null,
+ serverMessage.getArrivalTime(),
+ bodySection.getEncodedSize());
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org