You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/06 14:23:00 UTC
svn commit: r1772901 [1/5] - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpi...
Author: rgodfrey
Date: Tue Dec 6 14:22:59 2016
New Revision: 1772901
URL: http://svn.apache.org/viewvc?rev=1772901&view=rev
Log:
QPID-7575 : Retain encoded form when parsing 1.0 messages; only store the message body in content, all other sections in meta data
Added:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java
- copied, changed from r1772899, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SectionDecoderRegistry.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpSequenceSection.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpValueSection.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/ApplicationPropertiesSection.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/DataSection.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/DeliveryAnnotationsSection.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/FooterSection.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/HeaderSection.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/MessageAnnotationsSection.java
- copied, changed from r1772546, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/MessageAnnotations.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/PropertiesSection.java
- copied, changed from r1772546, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Properties.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AbstractEncodingRetainingConstructor.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AmqpSequenceSectionConstructor.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AmqpValueSectionConstructor.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ApplicationPropertiesSectionConstructor.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DataSectionConstructor.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DeliveryAnnotationsSectionConstructor.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DescribedListSectionConstructor.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DescribedMapSectionConstructor.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/EncodingRetaining.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/FooterSectionConstructor.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/HeaderSectionConstructor.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/MessageAnnotationsSectionConstructor.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/PropertiesSectionConstructor.java
- copied, changed from r1772546, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/PropertiesConstructor.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryString.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeAssembler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DefaultDescribedTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructorRegistry.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/NullTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SymbolTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/TimestampTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/TypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UByteTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UIntTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ULongTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UShortTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UUIDTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/VariableWidthTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ZeroListConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ZeroUIntConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ZeroULongConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoder.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionEncoder.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionEncoderImpl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Section.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/codec/AMQPDescribedTypeRegistry.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpSequence.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpValue.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/ApplicationProperties.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Data.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/DeliveryAnnotations.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Footer.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Header.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/MessageAnnotations.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Properties.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AcceptedConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AmqpSequenceConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AmqpValueConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ApplicationPropertiesConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DataConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DeleteOnCloseConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DeleteOnNoLinksConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DeleteOnNoLinksOrMessagesConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DeleteOnNoMessagesConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DeliveryAnnotationsConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ExactSubjectFilterConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/FooterConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/HeaderConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/JMSSelectorFilterConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/MatchingSubjectFilterConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/MessageAnnotationsConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ModifiedConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/NoLocalFilterConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/PropertiesConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ReceivedConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/RejectedConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ReleasedConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/SourceConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/TargetConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/security/codec/SaslChallengeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/security/codec/SaslInitConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/security/codec/SaslMechanismsConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/security/codec/SaslOutcomeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/security/codec/SaslResponseConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/codec/CoordinatorConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/codec/DeclareConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/codec/DeclaredConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/codec/DischargeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/codec/TransactionalStateConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Transfer.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/codec/AttachConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/codec/BeginConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/codec/CloseConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/codec/DetachConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/codec/DispositionConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/codec/EndConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/codec/ErrorConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/codec/FlowConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/codec/OpenConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/codec/TransferConstructor.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/QueueRestTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Dec 6 14:22:59 2016
@@ -203,6 +203,8 @@ public abstract class AbstractQueue<X ex
private volatile long _estimatedAverageMessageHeaderSize;
+ private volatile long _estimatedMessageMemoryOverhead;
+ private volatile long _estimatedMinimumMemoryFootprint;
private AtomicBoolean _stopped = new AtomicBoolean(false);
@@ -275,6 +277,7 @@ public abstract class AbstractQueue<X ex
protected AbstractQueue(Map<String, Object> attributes, QueueManagingVirtualHost<?> virtualHost)
{
super(parentsMap(virtualHost), attributes);
+ _queueConsumerManager = new QueueConsumerManagerImpl(this);
_virtualHost = virtualHost;
}
@@ -342,8 +345,11 @@ public abstract class AbstractQueue<X ex
_arguments = Collections.synchronizedMap(arguments);
- _queueConsumerManager = new QueueConsumerManagerImpl(this);
_logSubject = new QueueLogSubject(this);
+
+ _estimatedMinimumMemoryFootprint = getContextValue(Long.class, QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT);
+ _estimatedMessageMemoryOverhead = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD);
+
_queueHouseKeepingTask = new AdvanceConsumersTask();
Subject activeSubject = Subject.getSubject(AccessController.getContext());
Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
@@ -2208,8 +2214,9 @@ public abstract class AbstractQueue<X ex
@Override
public long getPotentialMemoryFootprint()
{
- return Math.max(getContextValue(Long.class,QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT),
- getQueueDepthBytes() + getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages());
+
+ return Math.max(_estimatedMinimumMemoryFootprint,
+ getQueueDepthBytes() + _estimatedMessageMemoryOverhead * getQueueDepthMessages());
}
public long getAlertRepeatGap()
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Tue Dec 6 14:22:59 2016
@@ -43,9 +43,6 @@ import java.util.concurrent.ConcurrentLi
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,6 +62,7 @@ import org.apache.qpid.server.protocol.C
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
+import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
@@ -79,6 +77,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
@@ -181,7 +180,6 @@ public class AMQPConnection_1_0 extends
private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.wrap(new byte[0]);
private static final int DEFAULT_CHANNEL_MAX = Math.min(Integer.getInteger("amqp.channel_max", 255), 0xFFFF);
- private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15);
private AmqpPort<?> _port;
private SubjectCreator _subjectCreator;
@@ -418,6 +416,13 @@ public class AMQPConnection_1_0 extends
return _describedTypeRegistry;
}
+ public SectionDecoderRegistry getSectionDecoderRegistry()
+ {
+ return _describedTypeRegistry.getSectionDecoderRegistry();
+ }
+
+
+
private boolean closedForOutput()
{
@@ -444,11 +449,6 @@ public class AMQPConnection_1_0 extends
}
}
- public int send(final short channel, final FrameBody body, final QpidByteBuffer payload)
- {
- return sendFrame(channel, body, payload);
- }
-
private void inputClosed()
{
if (!_closedForInput)
@@ -705,7 +705,9 @@ public class AMQPConnection_1_0 extends
_receivingSessions = new Session_1_0[_channelMax + 1];
_sendingSessions = new Session_1_0[_channelMax + 1];
}
- _maxFrameSize = open.getMaxFrameSize() == null ? DEFAULT_MAX_FRAME : open.getMaxFrameSize().intValue();
+ _maxFrameSize = open.getMaxFrameSize() == null
+ ? getBroker().getNetworkBufferSize()
+ : Math.min(open.getMaxFrameSize().intValue(), getBroker().getNetworkBufferSize());
_remoteContainerId = open.getContainerId();
_localHostname = open.getHostname();
if (open.getIdleTimeOut() != null)
@@ -1027,44 +1029,62 @@ public class AMQPConnection_1_0 extends
}
}
- int sendFrame(final short channel, final FrameBody body, final QpidByteBuffer payload)
+ int sendFrame(final short channel, final FrameBody body, final List<QpidByteBuffer> payload)
{
if (!_closedForOutput)
{
ValueWriter<FrameBody> writer = _describedTypeRegistry.getValueWriter(body);
- int size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
- QpidByteBuffer payloadDup = payload == null ? null : payload.duplicate();
- int payloadSent = _maxFrameSize - (size + 9);
- try
+ if (payload == null)
{
- if (payload != null && payloadSent < payload.remaining())
+ send(AMQFrame.createAMQFrame(channel, body));
+ return 0;
+ }
+ else
+ {
+ int size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
+ int maxPayloadSize = _maxFrameSize - (size + 9);
+ long payloadLength = QpidByteBufferUtils.remaining(payload);
+ if(payloadLength <= maxPayloadSize)
+ {
+ send(AMQFrame.createAMQFrame(channel, body, payload));
+ return (int)payloadLength;
+ }
+ else
{
+ ((Transfer) body).setMore(Boolean.TRUE);
- if (body instanceof Transfer)
+ writer = _describedTypeRegistry.getValueWriter(body);
+ size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
+ maxPayloadSize = _maxFrameSize - (size + 9);
+
+ List<QpidByteBuffer> payloadDup = new ArrayList<>(payload.size());
+ int payloadSize = 0;
+ for(QpidByteBuffer buf : payload)
{
- ((Transfer) body).setMore(Boolean.TRUE);
+ if(payloadSize + buf.remaining() < maxPayloadSize)
+ {
+ payloadSize += buf.remaining();
+ payloadDup.add(buf.duplicate());
+ }
+ else
+ {
+ QpidByteBuffer dup = buf.slice();
+ dup.limit(maxPayloadSize-payloadSize);
+ payloadDup.add(dup);
+ break;
+ }
}
- writer = _describedTypeRegistry.getValueWriter(body);
- size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
- payloadSent = _maxFrameSize - (size + 9);
+ QpidByteBufferUtils.skip(payload, maxPayloadSize);
+ send(AMQFrame.createAMQFrame(channel, body, payloadDup));
+ for(QpidByteBuffer buf : payloadDup)
+ {
+ buf.dispose();
+ }
- payloadDup.limit(payloadDup.position() + payloadSent);
- }
- else
- {
- payloadSent = payload == null ? 0 : payload.remaining();
- }
- send(AMQFrame.createAMQFrame(channel, body, payloadDup));
- }
- finally
- {
- if (payloadDup != null)
- {
- payloadDup.dispose();
+ return maxPayloadSize;
}
}
- return payloadSent;
}
else
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Tue Dec 6 14:22:59 2016
@@ -21,7 +21,10 @@
package org.apache.qpid.server.protocol.v1_0;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,15 +39,16 @@ import org.apache.qpid.server.protocol.M
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
-import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Target;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AbstractSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
@@ -53,7 +57,6 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
{
@@ -118,58 +121,19 @@ class ConsumerTarget_1_0 extends Abstrac
Transfer transfer = new Transfer();
try
{
- QpidByteBuffer payload = null;
//TODO
- Collection<QpidByteBuffer> fragments = message.getFragments();
- if (fragments.size() == 1)
- {
- payload = fragments.iterator().next();
- }
- else
- {
- int size = 0;
- for (QpidByteBuffer fragment : fragments)
- {
- size += fragment.remaining();
- }
-
- payload = QpidByteBuffer.allocateDirect(size);
-
- for (QpidByteBuffer fragment : fragments)
- {
- payload.put(fragment);
- fragment.dispose();
- }
-
- payload.flip();
- }
+ Collection<QpidByteBuffer> bodyContent = message.getContent(0, (int) message.getSize());
+ HeaderSection headerSection = message.getHeaderSection();
if (entry.getDeliveryCount() != 0)
{
ValueHandler valueHandler = new ValueHandler(_typeRegistry);
- Header oldHeader = null;
- try
- {
- Object value = valueHandler.parse(payload);
- if (value instanceof Header)
- {
- oldHeader = (Header) value;
- }
- else
- {
- payload.position(0);
- }
- }
- catch (AmqpErrorException e)
- {
- //TODO
- throw new ConnectionScopedRuntimeException(e);
- }
Header header = new Header();
- if (oldHeader != null)
+ if (headerSection != null)
{
+ final Header oldHeader = headerSection.getValue();
header.setDurable(oldHeader.getDurable());
header.setPriority(oldHeader.getPriority());
header.setTtl(oldHeader.getTtl());
@@ -179,15 +143,52 @@ class ConsumerTarget_1_0 extends Abstrac
_sectionEncoder.encodeObject(header);
Binary encodedHeader = _sectionEncoder.getEncoding();
- QpidByteBuffer oldPayload = payload;
- payload = QpidByteBuffer.allocateDirect(oldPayload.remaining() + encodedHeader.getLength());
- payload.put(encodedHeader.getArray(), encodedHeader.getArrayOffset(), encodedHeader.getLength());
- payload.put(oldPayload);
- oldPayload.dispose();
- payload.flip();
+ QpidByteBuffer headerPayload = QpidByteBuffer.wrap(encodedHeader.getArray(), encodedHeader.getArrayOffset(), encodedHeader.getLength());
+
+ headerSection = new HeaderSection(_typeRegistry);
+ headerSection.setEncodedForm(Collections.singletonList(headerPayload));
+ }
+ List<QpidByteBuffer> payload = new ArrayList<>();
+ if(headerSection != null)
+ {
+ payload.addAll(headerSection.getEncodedForm());
+ }
+ AbstractSection<?> section;
+ if((section = message.getDeliveryAnnotationsSection()) != null)
+ {
+ payload.addAll(section.getEncodedForm());
+ }
+
+ if((section = message.getMessageAnnotationsSection()) != null)
+ {
+ payload.addAll(section.getEncodedForm());
+ }
+
+ if((section = message.getPropertiesSection()) != null)
+ {
+ payload.addAll(section.getEncodedForm());
+ }
+
+ if((section = message.getApplicationPropertiesSection()) != null)
+ {
+ payload.addAll(section.getEncodedForm());
}
+ payload.addAll(bodyContent);
+
+ if((section = message.getFooterSection()) != null)
+ {
+ payload.addAll(section.getEncodedForm());
+ }
+
+
transfer.setPayload(payload);
+
+ for(QpidByteBuffer buf : payload)
+ {
+ buf.dispose();
+ }
+
byte[] data = new byte[8];
ByteBuffer.wrap(data).putLong(_deliveryTag++);
final Binary tag = new Binary(data);
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java Tue Dec 6 14:22:59 2016
@@ -35,6 +35,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AbstractSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Data;
@@ -58,7 +59,9 @@ public class MessageConverter_Internal_t
@Override
protected MessageMetaData_1_0 convertMetaData(final InternalMessage serverMessage,
- final Section bodySection, final SectionEncoder sectionEncoder)
+ final Section bodySection,
+ final SectionEncoder sectionEncoder,
+ final ArrayList<AbstractSection<?>> bodySections)
{
List<Section> sections = new ArrayList<Section>(3);
Header header = new Header();
@@ -91,14 +94,14 @@ public class MessageConverter_Internal_t
if(!serverMessage.getMessageHeader().getHeaderNames().isEmpty())
{
- ApplicationProperties applicationProperties = new ApplicationProperties(serverMessage.getMessageHeader().getHeaderMap() );
+ ApplicationProperties applicationProperties = new ApplicationProperties((Map)serverMessage.getMessageHeader().getHeaderMap() );
sections.add(applicationProperties);
}
if(bodySection != null)
{
sections.add(bodySection);
}
- return new MessageMetaData_1_0(sections, sectionEncoder);
+ return new MessageMetaData_1_0(sections, sectionEncoder, bodySections);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java Tue Dec 6 14:22:59 2016
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -47,10 +48,11 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequence;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AbstractSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Data;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.codec.BBEncoder;
import org.apache.qpid.typedmessage.TypedBytesContentWriter;
@@ -62,34 +64,30 @@ public class MessageConverter_from_1_0
public static Object convertBodyToObject(final Message_1_0 serverMessage)
{
- byte[] data = new byte[(int) serverMessage.getSize()];
final Collection<QpidByteBuffer> allData = serverMessage.getContent(0, (int) serverMessage.getSize());
- int offset = 0;
- for(QpidByteBuffer buf : allData)
- {
- int len = buf.remaining();
- buf.get(data, offset, len);
- offset+=len;
- buf.dispose();
- }
- SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY);
+ SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY.getSectionDecoderRegistry());
Object bodyObject;
try
{
- List<Section> sections = sectionDecoder.parseAll(QpidByteBuffer.wrap(data));
- ListIterator<Section> iterator = sections.listIterator();
+ List<AbstractSection<?>> sections = sectionDecoder.parseAll(new ArrayList<>(allData));
+ for(QpidByteBuffer buf : allData)
+ {
+ buf.dispose();
+ }
+
+ ListIterator<AbstractSection<?>> iterator = sections.listIterator();
Section previousSection = null;
while(iterator.hasNext())
{
Section section = iterator.next();
- if(!(section instanceof AmqpValue || section instanceof Data || section instanceof AmqpSequence))
+ if(!(section instanceof AmqpValueSection || section instanceof DataSection || section instanceof AmqpSequenceSection))
{
iterator.remove();
}
else
{
- if(previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValue))
+ if(previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValueSection))
{
throw new ConnectionScopedRuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence");
}
@@ -109,16 +107,16 @@ public class MessageConverter_from_1_0
else
{
Section firstBodySection = sections.get(0);
- if(firstBodySection instanceof AmqpValue)
+ if(firstBodySection instanceof AmqpValueSection)
{
- bodyObject = convertValue(((AmqpValue)firstBodySection).getValue());
+ bodyObject = convertValue(firstBodySection.getValue());
}
- else if(firstBodySection instanceof Data)
+ else if(firstBodySection instanceof DataSection)
{
int totalSize = 0;
for(Section section : sections)
{
- totalSize += ((Data)section).getValue().getLength();
+ totalSize += ((DataSection)section).getValue().getLength();
}
byte[] bodyData = new byte[totalSize];
ByteBuffer buf = ByteBuffer.wrap(bodyData);
@@ -130,10 +128,10 @@ public class MessageConverter_from_1_0
}
else
{
- ArrayList totalSequence = new ArrayList();
+ ArrayList<Object> totalSequence = new ArrayList<>();
for(Section section : sections)
{
- totalSequence.addAll(((AmqpSequence)section).getValue());
+ totalSequence.addAll(((AmqpSequenceSection)section).getValue());
}
bodyObject = convertValue(totalSequence);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Tue Dec 6 14:22:59 2016
@@ -24,24 +24,24 @@ import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Section;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AbstractSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Data;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.codec.BBDecoder;
@@ -75,14 +75,16 @@ public abstract class MessageConverter_t
private StoredMessage<MessageMetaData_1_0> convertToStoredMessage(final M serverMessage, SectionEncoder sectionEncoder)
{
Section bodySection = getBodySection(serverMessage);
+ final ArrayList<AbstractSection<?>> bodySections = new ArrayList<>();
- final MessageMetaData_1_0 metaData = convertMetaData(serverMessage, bodySection, sectionEncoder);
- return convertServerMessage(metaData, serverMessage);
+ final MessageMetaData_1_0 metaData = convertMetaData(serverMessage, bodySection, sectionEncoder, bodySections);
+ return convertServerMessage(metaData, serverMessage, bodySections);
}
abstract protected MessageMetaData_1_0 convertMetaData(final M serverMessage,
final Section bodySection,
- SectionEncoder sectionEncoder);
+ SectionEncoder sectionEncoder,
+ final ArrayList<AbstractSection<?>> bodySections);
private static Section convertMessageBody(String mimeType, byte[] data)
@@ -209,17 +211,9 @@ public abstract class MessageConverter_t
}
private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData,
- final M serverMessage)
+ final M serverMessage,
+ final ArrayList<AbstractSection<?>> bodySections)
{
- final QpidByteBuffer allData = QpidByteBuffer.allocateDirect(metaData.getStorableSize());
- metaData.writeToBuffer(allData);
- allData.rewind();
-
- if(metaData.getPropertiesSection() != null)
- {
- metaData.getPropertiesSection().setContentEncoding(null);
- }
-
return new StoredMessage<MessageMetaData_1_0>()
{
@@ -238,7 +232,36 @@ public abstract class MessageConverter_t
@Override
public Collection<QpidByteBuffer> getContent(int offset, int length)
{
- return Collections.singleton(allData.view(offset, length));
+ int position = 0;
+ List<QpidByteBuffer> content = new ArrayList<>();
+ for(AbstractSection<?> section : bodySections)
+ {
+ for(QpidByteBuffer buf : section.getEncodedForm())
+ {
+ if(position < offset)
+ {
+ if(offset - position < buf.remaining())
+ {
+ QpidByteBuffer view = buf.view(offset - position, Math.min(length, buf.remaining() - (offset-position)));
+ content.add(view);
+ position += view.remaining();
+ }
+ else
+ {
+ position += buf.remaining();
+ }
+ }
+ else if(position <= offset+length)
+ {
+ QpidByteBuffer view = buf.view(0, Math.min(length - (position-offset), buf.remaining()));
+ content.add(view);
+ position += view.remaining();
+ }
+
+ buf.dispose();
+ }
+ }
+ return content;
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Tue Dec 6 14:22:59 2016
@@ -33,26 +33,20 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Section;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequence;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Data;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Footer;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.*;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -61,241 +55,285 @@ public class MessageMetaData_1_0 impleme
private static final Logger _logger = LoggerFactory.getLogger(MessageMetaData_1_0.class);
private static final MessageMetaDataType_1_0 TYPE = new MessageMetaDataType_1_0();
public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory();
+ private static final byte VERSION_BYTE = 1;
+
+ private long _contentSize;
// TODO move to somewhere more useful
private static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
private static final Symbol DELIVERY_TIME = Symbol.valueOf("x-opt-delivery-time");
private static final Symbol NOT_VALID_BEFORE = Symbol.valueOf("x-qpid-not-valid-before");
+ private HeaderSection _headerSection;
+ private PropertiesSection _propertiesSection;
+ private DeliveryAnnotationsSection _deliveryAnnotationsSection;
+ private MessageAnnotationsSection _messageAnnotationsSection;
+ private ApplicationPropertiesSection _applicationPropertiesSection;
+ private FooterSection _footerSection;
- private Header _header;
- private Properties _properties;
- private Map _deliveryAnnotations;
- private Map _messageAnnotations;
- private Map _appProperties;
- private Map _footer;
-
- private volatile List<QpidByteBuffer> _encodedSections = new ArrayList<>(3);
- private volatile QpidByteBuffer _encoded;
private MessageHeader_1_0 _messageHeader;
-
- public MessageMetaData_1_0(List<Section> sections, SectionEncoder encoder)
- {
- this(sections, encodeSections(sections, encoder));
- }
-
- public Properties getPropertiesSection()
- {
- return _properties;
- }
-
-
- public Header getHeaderSection()
- {
- return _header;
- }
-
- private static ArrayList<QpidByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder)
- {
- ArrayList<QpidByteBuffer> encodedSections = new ArrayList<QpidByteBuffer>(sections.size());
- for(Section section : sections)
+ public MessageMetaData_1_0(List<Section> sections,
+ SectionEncoder encoder,
+ final List<AbstractSection<?>> bodySections)
+ {
+ Iterator<Section> iter = sections.iterator();
+ Section s = iter.hasNext() ? iter.next() : null;
+ long contentSize = 0L;
+ if(s instanceof Header)
{
- encoder.encodeObject(section);
- encodedSections.add(QpidByteBuffer.wrap(encoder.getEncoding().asByteBuffer()));
encoder.reset();
+ encoder.encodeObject(s);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ _headerSection = new HeaderSection((Header)s, Collections.singletonList(buf), encoder.getRegistry());
+ s = iter.hasNext() ? iter.next() : null;
}
- return encodedSections;
- }
-
- public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder)
- {
- this(fragments, decoder, new ArrayList<QpidByteBuffer>(3));
- }
-
- public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder, List<QpidByteBuffer> immutableSections)
- {
- this(constructSections(fragments, decoder,immutableSections), immutableSections);
- }
-
- private MessageMetaData_1_0(List<Section> sections, List<QpidByteBuffer> encodedSections)
- {
- _encodedSections = encodedSections;
- Iterator<Section> sectIter = sections.iterator();
+ if(s instanceof DeliveryAnnotations)
+ {
+ encoder.reset();
+ encoder.encodeObject(s);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ _deliveryAnnotationsSection = new DeliveryAnnotationsSection((DeliveryAnnotations)s, Collections.singletonList(buf), encoder.getRegistry());
+ s = iter.hasNext() ? iter.next() : null;
+ }
- Section section = sectIter.hasNext() ? sectIter.next() : null;
- if(section instanceof Header)
+ if(s instanceof MessageAnnotations)
{
- _header = (Header) section;
- section = sectIter.hasNext() ? sectIter.next() : null;
+ encoder.reset();
+ encoder.encodeObject(s);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ _messageAnnotationsSection = new MessageAnnotationsSection((MessageAnnotations)s, Collections.singletonList(buf), encoder.getRegistry());
+ s = iter.hasNext() ? iter.next() : null;
}
- if(section instanceof DeliveryAnnotations)
+ if(s instanceof Properties)
{
- _deliveryAnnotations = ((DeliveryAnnotations) section).getValue();
- section = sectIter.hasNext() ? sectIter.next() : null;
+ encoder.reset();
+ encoder.encodeObject(s);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ _propertiesSection = new PropertiesSection((Properties)s, Collections.singletonList(buf), encoder.getRegistry());
+ s = iter.hasNext() ? iter.next() : null;
}
- if(section instanceof MessageAnnotations)
+ if(s instanceof ApplicationProperties)
{
- _messageAnnotations = ((MessageAnnotations) section).getValue();
- section = sectIter.hasNext() ? sectIter.next() : null;
+ encoder.reset();
+ encoder.encodeObject(s);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ _applicationPropertiesSection = new ApplicationPropertiesSection((ApplicationProperties)s, Collections.singletonList(buf), encoder.getRegistry());
+ s = iter.hasNext() ? iter.next() : null;
}
- if(section instanceof Properties)
+ if(s instanceof AmqpValue)
{
- _properties = (Properties) section;
- section = sectIter.hasNext() ? sectIter.next() : null;
+ encoder.reset();
+ encoder.encodeObject(s);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ bodySections.add(new AmqpValueSection((AmqpValue)s, Collections.singletonList(buf), encoder.getRegistry()));
+
+ contentSize = buf.remaining();
+ s = iter.hasNext() ? iter.next() : null;
}
+ else if(s instanceof Data)
+ {
+ do
+ {
+ encoder.reset();
+ encoder.encodeObject(s);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ bodySections.add(new DataSection((Data)s, Collections.singletonList(buf), encoder.getRegistry()));
- if(section instanceof ApplicationProperties)
+ contentSize += buf.remaining();
+
+ s = iter.hasNext() ? iter.next() : null;
+ } while(s instanceof Data);
+ }
+ else if(s instanceof AmqpSequence)
{
- _appProperties = ((ApplicationProperties) section).getValue();
- section = sectIter.hasNext() ? sectIter.next() : null;
+ do
+ {
+ encoder.reset();
+ encoder.encodeObject(s);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ bodySections.add(new AmqpSequenceSection((AmqpSequence)s, Collections.singletonList(buf), encoder.getRegistry()));
+
+ contentSize += buf.remaining();
+ s = iter.hasNext() ? iter.next() : null;
+ }
+ while(s instanceof AmqpSequence);
}
- if(section instanceof Footer)
+ if(s instanceof Footer)
{
- _footer = ((Footer) section).getValue();
- section = sectIter.hasNext() ? sectIter.next() : null;
+ encoder.reset();
+ encoder.encodeObject(s);
+ Binary encodedOutput = encoder.getEncoding();
+ final QpidByteBuffer buf = QpidByteBuffer.wrap(encodedOutput.asByteBuffer());
+ _footerSection = new FooterSection((Footer)s, Collections.singletonList(buf), encoder.getRegistry());
}
+ _contentSize = contentSize;
- _messageHeader = new MessageHeader_1_0();
+ }
+ public Properties getProperties()
+ {
+ return _propertiesSection == null ? null : _propertiesSection.getValue();
}
- private static List<Section> constructSections(final QpidByteBuffer[] fragments, final SectionDecoder decoder, List<QpidByteBuffer> encodedSections)
+
+ public PropertiesSection getPropertiesSection()
{
- List<Section> sections = new ArrayList<Section>(3);
+ return _propertiesSection;
+ }
- QpidByteBuffer src;
- if(fragments.length == 1)
- {
- src = fragments[0].duplicate();
- }
- else
- {
- int size = 0;
- for(QpidByteBuffer buf : fragments)
- {
- size += buf.remaining();
- }
- src = QpidByteBuffer.allocateDirect(size);
- for(QpidByteBuffer buf : fragments)
- {
- QpidByteBuffer duplicate = buf.duplicate();
- src.put(duplicate);
- duplicate.dispose();
- }
- src.flip();
+ public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder, List<AbstractSection<?>> dataSections)
+ {
+ List<QpidByteBuffer> src = new ArrayList<>(fragments.length);
+ for(QpidByteBuffer buf : fragments)
+ {
+ src.add(buf.duplicate());
}
try
{
- int startBarePos = -1;
- int lastPos = src.position();
- Section s = decoder.readSection(src);
-
-
-
- if(s instanceof Header)
+ AbstractSection<?> s = decoder.readSection(src);
+ long contentSize = 0L;
+ if(s instanceof HeaderSection)
{
- sections.add(s);
- lastPos = src.position();
- s = src.hasRemaining() ? decoder.readSection(src) : null;
+ _headerSection = (HeaderSection) s;
+ s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
}
- if(s instanceof DeliveryAnnotations)
+ if(s instanceof DeliveryAnnotationsSection)
{
- sections.add(s);
- lastPos = src.position();
- s = src.hasRemaining() ? decoder.readSection(src) : null;
+ _deliveryAnnotationsSection = (DeliveryAnnotationsSection) s;
+ s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
}
- if(s instanceof MessageAnnotations)
+ if(s instanceof MessageAnnotationsSection)
{
- sections.add(s);
- lastPos = src.position();
- s = src.hasRemaining() ? decoder.readSection(src) : null;
+ _messageAnnotationsSection = (MessageAnnotationsSection) s;
+ s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
}
- if(s instanceof Properties)
+ if(s instanceof PropertiesSection)
{
- sections.add(s);
- if(startBarePos == -1)
- {
- startBarePos = lastPos;
- }
- s = src.hasRemaining() ? decoder.readSection(src) : null;
+ _propertiesSection = (PropertiesSection) s;
+ s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
}
- if(s instanceof ApplicationProperties)
+ if(s instanceof ApplicationPropertiesSection)
{
- sections.add(s);
- if(startBarePos == -1)
- {
- startBarePos = lastPos;
- }
- s = src.hasRemaining() ? decoder.readSection(src) : null;
+ _applicationPropertiesSection = (ApplicationPropertiesSection) s;
+ s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
}
- if(s instanceof AmqpValue)
+ if(s instanceof AmqpValueSection)
{
- if(startBarePos == -1)
- {
- startBarePos = lastPos;
- }
- s = src.hasRemaining() ? decoder.readSection(src) : null;
+ contentSize = s.getEncodedSize();
+ dataSections.add(s);
+ s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
}
- else if(s instanceof Data)
+ else if(s instanceof DataSection)
{
- if(startBarePos == -1)
- {
- startBarePos = lastPos;
- }
do
{
- s = src.hasRemaining() ? decoder.readSection(src) : null;
- } while(s instanceof Data);
+ contentSize += s.getEncodedSize();
+ dataSections.add(s);
+ s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
+ } while(s instanceof DataSection);
}
- else if(s instanceof AmqpSequence)
+ else if(s instanceof AmqpSequenceSection)
{
- if(startBarePos == -1)
- {
- startBarePos = lastPos;
- }
do
{
- s = src.hasRemaining() ? decoder.readSection(src) : null;
+ contentSize += s.getEncodedSize();
+ dataSections.add(s);
+ s = QpidByteBufferUtils.hasRemaining(src) ? decoder.readSection(src) : null;
}
- while(s instanceof AmqpSequence);
- }
-
- if(s instanceof Footer)
- {
- sections.add(s);
+ while(s instanceof AmqpSequenceSection);
}
-
- for(QpidByteBuffer buf : fragments)
+ if(s instanceof FooterSection)
{
- encodedSections.add(buf.duplicate());
+ _footerSection = (FooterSection) s;
}
-
- return sections;
+ _contentSize = contentSize;
}
catch (AmqpErrorException e)
{
_logger.error("Decoding read section error", e);
+ // TODO - fix error handling
throw new IllegalArgumentException(e);
}
finally
{
- src.dispose();
+ for(QpidByteBuffer buf : src)
+ {
+ buf.dispose();
+ }
}
+
+ _messageHeader = new MessageHeader_1_0();
+
+ }
+
+ private MessageMetaData_1_0(List<AbstractSection<?>> sections, long contentSize)
+ {
+ _contentSize = contentSize;
+
+ Iterator<AbstractSection<?>> sectIter = sections.iterator();
+
+ Section section = sectIter.hasNext() ? sectIter.next() : null;
+ if(section instanceof HeaderSection)
+ {
+ _headerSection = (HeaderSection) section;
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ if(section instanceof DeliveryAnnotationsSection)
+ {
+ _deliveryAnnotationsSection = (DeliveryAnnotationsSection) section;
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ if(section instanceof MessageAnnotationsSection)
+ {
+ _messageAnnotationsSection = (MessageAnnotationsSection) section;
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ if(section instanceof PropertiesSection)
+ {
+ _propertiesSection = ((PropertiesSection) section);
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ if(section instanceof ApplicationPropertiesSection)
+ {
+ _applicationPropertiesSection = (ApplicationPropertiesSection) section;
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ if(section instanceof FooterSection)
+ {
+ _footerSection = (FooterSection) section;
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ _messageHeader = new MessageHeader_1_0();
+
}
@@ -307,70 +345,76 @@ public class MessageMetaData_1_0 impleme
public int getStorableSize()
{
- int size = 0;
- for(QpidByteBuffer bin : _encodedSections)
+ long size = 9L;
+ if(_headerSection != null)
{
- size += bin.limit();
+ size += _headerSection.getEncodedSize();
}
-
- return size;
- }
-
- private QpidByteBuffer encodeAsBuffer()
- {
- QpidByteBuffer buf = QpidByteBuffer.allocateDirect(getStorableSize());
-
- for(QpidByteBuffer bin : _encodedSections)
+ if(_deliveryAnnotationsSection != null)
+ {
+ size += _deliveryAnnotationsSection.getEncodedSize();
+ }
+ if(_messageAnnotationsSection != null)
{
- QpidByteBuffer duplicate = bin.duplicate();
- buf.put(duplicate);
- duplicate.dispose();
+ size += _messageAnnotationsSection.getEncodedSize();
+ }
+ if(_propertiesSection != null)
+ {
+ size += _propertiesSection.getEncodedSize();
+ }
+ if(_applicationPropertiesSection != null)
+ {
+ size += _applicationPropertiesSection.getEncodedSize();
+ }
+ if(_footerSection != null)
+ {
+ size += _footerSection.getEncodedSize();
}
- buf.flip();
- return buf;
+ return (int) size;
}
public int writeToBuffer(QpidByteBuffer dest)
{
- QpidByteBuffer buf = _encoded;
-
- if(buf == null)
+ dest.put(VERSION_BYTE);
+ dest.putLong(_contentSize);
+ if(_headerSection != null)
{
- buf = encodeAsBuffer();
- _encoded = buf;
+ _headerSection.writeTo(dest);
}
-
- buf = buf.duplicate();
-
- buf.position(0);
-
- if(dest.remaining() < buf.limit())
+ if(_deliveryAnnotationsSection != null)
+ {
+ _deliveryAnnotationsSection.writeTo(dest);
+ }
+ if(_messageAnnotationsSection != null)
+ {
+ _messageAnnotationsSection.writeTo(dest);
+ }
+ if(_propertiesSection != null)
{
- buf.limit(dest.remaining());
+ _propertiesSection.writeTo(dest);
}
- final int length = buf.limit();
- dest.putCopyOf(buf);
- buf.dispose();
- return length;
+ if(_applicationPropertiesSection != null)
+ {
+ _applicationPropertiesSection.writeTo(dest);
+ }
+ if(_footerSection != null)
+ {
+ _footerSection.writeTo(dest);
+ }
+
+ return getStorableSize();
}
public int getContentSize()
{
- QpidByteBuffer buf = _encoded;
-
- if(buf == null)
- {
- buf = encodeAsBuffer();
- _encoded = buf;
- }
- return buf.remaining();
+ return (int) _contentSize;
}
public boolean isPersistent()
{
- return _header != null && Boolean.TRUE.equals(_header.getDurable());
+ return _headerSection != null && Boolean.TRUE.equals(_headerSection.getValue().getDurable());
}
public MessageHeader_1_0 getMessageHeader()
@@ -379,21 +423,65 @@ public class MessageMetaData_1_0 impleme
}
@Override
- public void dispose()
+ public synchronized void dispose()
{
- for(QpidByteBuffer bin : _encodedSections)
+ if(_headerSection != null)
+ {
+ _headerSection.dispose();
+ _headerSection = null;
+ }
+ if(_deliveryAnnotationsSection != null)
+ {
+ _deliveryAnnotationsSection.dispose();
+ _deliveryAnnotationsSection = null;
+ }
+ if(_messageAnnotationsSection != null)
{
- bin.dispose();
+ _messageAnnotationsSection.dispose();
+ _deliveryAnnotationsSection = null;
}
- _encodedSections = null;
- _encoded.dispose();
- _encoded = null;
+ if(_propertiesSection != null)
+ {
+ _propertiesSection.dispose();
+ _propertiesSection = null;
+ }
+ if(_applicationPropertiesSection != null)
+ {
+ _applicationPropertiesSection.dispose();
+ _applicationPropertiesSection = null;
+ }
+
}
@Override
public void clearEncodedForm()
{
+ dispose();
+ }
+
+ public HeaderSection getHeaderSection()
+ {
+ return _headerSection;
+ }
+
+ public DeliveryAnnotationsSection getDeliveryAnnotationsSection()
+ {
+ return _deliveryAnnotationsSection;
+ }
+
+ public MessageAnnotationsSection getMessageAnnotationsSection()
+ {
+ return _messageAnnotationsSection;
+ }
+
+ public ApplicationPropertiesSection getApplicationPropertiesSection()
+ {
+ return _applicationPropertiesSection;
+ }
+ public FooterSection getFooterSection()
+ {
+ return _footerSection;
}
private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_1_0>
@@ -410,32 +498,22 @@ public class MessageMetaData_1_0 impleme
public MessageMetaData_1_0 createMetaData(QpidByteBuffer buf)
{
- ValueHandler valueHandler = new ValueHandler(_typeRegistry);
+ byte versionByte = buf.get();
+ long contentSize = buf.getLong();
+ SectionDecoder sectionDecoder = new SectionDecoderImpl(_typeRegistry.getSectionDecoderRegistry());
- ArrayList<Section> sections = new ArrayList<Section>(3);
- ArrayList<QpidByteBuffer> encodedSections = new ArrayList<>(3);
-
- while(buf.hasRemaining())
+ try
{
- try
- {
- int start = buf.position();
- QpidByteBuffer encodedBuf = buf.slice();
- Object parse = valueHandler.parse(buf);
- sections.add((Section) parse);
- encodedBuf.limit(buf.position()-start);
- encodedSections.add(encodedBuf);
-
- }
- catch (AmqpErrorException e)
- {
- //TODO
- throw new ConnectionScopedRuntimeException(e);
- }
+ List<AbstractSection<?>> sections = sectionDecoder.parseAll(Collections.singletonList(buf));
+ return new MessageMetaData_1_0(sections,contentSize);
}
+ catch (AmqpErrorException e)
+ {
+ //TODO
+ throw new ConnectionScopedRuntimeException(e);
+ }
- return new MessageMetaData_1_0(sections,encodedSections);
}
}
@@ -445,54 +523,54 @@ public class MessageMetaData_1_0 impleme
public String getCorrelationId()
{
- if(_properties == null || _properties.getCorrelationId() == null)
+ if(_propertiesSection == null || _propertiesSection.getValue().getCorrelationId() == null)
{
return null;
}
else
{
- return _properties.getCorrelationId().toString();
+ return _propertiesSection.getValue().getCorrelationId().toString();
}
}
@Override
public long getExpiration()
{
- final Date absoluteExpiryTime = _properties == null ? null : _properties.getAbsoluteExpiryTime();
+ final Date absoluteExpiryTime = _propertiesSection == null ? null : _propertiesSection.getValue().getAbsoluteExpiryTime();
if(absoluteExpiryTime != null)
{
return absoluteExpiryTime.getTime();
}
else
{
- final Date creationTime = _properties == null ? null : _properties.getCreationTime();
- final UnsignedInteger ttl = _header == null ? null : _header.getTtl();
+ final Date creationTime = _propertiesSection == null ? null : _propertiesSection.getValue().getCreationTime();
+ final UnsignedInteger ttl = _headerSection == null ? null : _headerSection.getValue().getTtl();
return ttl == null || creationTime == null ? 0L : ttl.longValue() + creationTime.getTime();
}
}
public String getMessageId()
{
- if(_properties == null || _properties.getMessageId() == null)
+ if(_propertiesSection == null || _propertiesSection.getValue().getMessageId() == null)
{
return null;
}
else
{
- return _properties.getMessageId().toString();
+ return _propertiesSection.getValue().getMessageId().toString();
}
}
public String getMimeType()
{
- if(_properties == null || _properties.getContentType() == null)
+ if(_propertiesSection == null || _propertiesSection.getValue().getContentType() == null)
{
return null;
}
else
{
- return _properties.getContentType().toString();
+ return _propertiesSection.getValue().getContentType().toString();
}
}
@@ -503,25 +581,25 @@ public class MessageMetaData_1_0 impleme
public byte getPriority()
{
- if(_header == null || _header.getPriority() == null)
+ if(_headerSection == null || _headerSection.getValue().getPriority() == null)
{
return 4; //javax.jms.Message.DEFAULT_PRIORITY;
}
else
{
- return _header.getPriority().byteValue();
+ return _headerSection.getValue().getPriority().byteValue();
}
}
public long getTimestamp()
{
- if(_properties == null || _properties.getCreationTime() == null)
+ if(_propertiesSection == null || _propertiesSection.getValue().getCreationTime() == null)
{
return 0L;
}
else
{
- return _properties.getCreationTime().getTime();
+ return _propertiesSection.getValue().getCreationTime().getTime();
}
}
@@ -533,11 +611,11 @@ public class MessageMetaData_1_0 impleme
long notValidBefore;
Object annotation;
- if(_messageAnnotations != null && (annotation = _messageAnnotations.get(DELIVERY_TIME)) instanceof Number)
+ if(_messageAnnotationsSection != null && (annotation = _messageAnnotationsSection.getValue().get(DELIVERY_TIME)) instanceof Number)
{
notValidBefore = ((Number)annotation).longValue();
}
- else if(_messageAnnotations != null && (annotation = _messageAnnotations.get(NOT_VALID_BEFORE)) instanceof Number)
+ else if(_messageAnnotationsSection != null && (annotation = _messageAnnotationsSection.getValue().get(NOT_VALID_BEFORE)) instanceof Number)
{
notValidBefore = ((Number)annotation).longValue();
}
@@ -557,25 +635,25 @@ public class MessageMetaData_1_0 impleme
}
// Use legacy annotation if present and there was no subject
- if(_messageAnnotations == null || _messageAnnotations.get(JMS_TYPE) == null)
+ if(_messageAnnotationsSection == null || _messageAnnotationsSection.getValue().get(JMS_TYPE) == null)
{
return null;
}
else
{
- return _messageAnnotations.get(JMS_TYPE).toString();
+ return _messageAnnotationsSection.getValue().get(JMS_TYPE).toString();
}
}
public String getReplyTo()
{
- if(_properties == null || _properties.getReplyTo() == null)
+ if(_propertiesSection == null || _propertiesSection.getValue().getReplyTo() == null)
{
return null;
}
else
{
- return _properties.getReplyTo();
+ return _propertiesSection.getValue().getReplyTo();
}
}
@@ -593,19 +671,19 @@ public class MessageMetaData_1_0 impleme
public Object getHeader(final String name)
{
- return _appProperties == null ? null : _appProperties.get(name);
+ return _applicationPropertiesSection == null ? null : _applicationPropertiesSection.getValue().get(name);
}
public boolean containsHeaders(final Set<String> names)
{
- if(_appProperties == null)
+ if(_applicationPropertiesSection == null)
{
return false;
}
for(String key : names)
{
- if(!_appProperties.containsKey(key))
+ if(!_applicationPropertiesSection.getValue().containsKey(key))
{
return false;
}
@@ -616,31 +694,32 @@ public class MessageMetaData_1_0 impleme
@Override
public Collection<String> getHeaderNames()
{
- if(_appProperties == null)
+ if(_applicationPropertiesSection == null)
{
return Collections.emptySet();
}
- return Collections.unmodifiableCollection(_appProperties.keySet());
+ return Collections.unmodifiableCollection(_applicationPropertiesSection.getValue().keySet());
}
public boolean containsHeader(final String name)
{
- return _appProperties != null && _appProperties.containsKey(name);
+ return _applicationPropertiesSection != null && _applicationPropertiesSection.getValue().containsKey(name);
}
public String getSubject()
{
- return _properties == null ? null : _properties.getSubject();
+ return _propertiesSection == null ? null : _propertiesSection.getValue().getSubject();
}
public String getTo()
{
- return _properties == null ? null : _properties.getTo();
+ return _propertiesSection == null ? null : _propertiesSection.getValue().getTo();
}
public Map<String, Object> getHeadersAsMap()
{
- return _appProperties == null ? new HashMap<String,Object>() : new HashMap<String,Object>(_appProperties);
+ return _applicationPropertiesSection == null ? new HashMap<String,Object>() : new HashMap<>(
+ _applicationPropertiesSection.getValue());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org