You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/09/10 17:00:47 UTC
svn commit: r1167529 [1/5] - in
/qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ broker/ broker/bin/
broker/src/main/java/org/apache/qpid/server/exchange/headers/
broker/src/main/java/org/apache/qpid/server/logging/actors/
broker/src/main/java/org/apac...
Author: rgodfrey
Date: Sat Sep 10 15:00:43 2011
New Revision: 1167529
URL: http://svn.apache.org/viewvc?rev=1167529&view=rev
Log:
NO-JIRA: 1.0 sandbox merge from trunk
Added:
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java
- copied unchanged from r1167527, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferOutputStream.java
- copied unchanged from r1167527, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferOutputStream.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java
- copied unchanged from r1167527, qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java
- copied unchanged from r1167527, qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java
- copied unchanged from r1167527, qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/python_tests/
- copied from r1167527, qpid/trunk/qpid/java/test-profiles/python_tests/
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/python_tests/Java010PythonExcludes
- copied unchanged from r1167527, qpid/trunk/qpid/java/test-profiles/python_tests/Java010PythonExcludes
Modified:
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/ (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/ (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/bin/ (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ManagementConsole_logmessages.properties
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/templates/method/version/MethodBodyClass.vm
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/templates/model/MethodRegistryClass.vm
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/templates/model/version/MethodRegistryClass.vm
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/lib/mina-core-1.1.7.jar
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/lib/mina-filter-ssl-1.1.7.jar
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/ (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/ (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/CPPExcludes (contents, props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/Excludes (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/JavaExcludes (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/JavaPre010Excludes (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/JavaTransientExcludes (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/XAExcludes (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/clean-dir (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/cpp.async.testprofile (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/cpp.cluster.testprofile (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/cpp.noprefetch.testprofile (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/cpp.ssl.excludes (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/cpp.ssl.testprofile (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/cpp.testprofile (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/java-dby.0-9-1.testprofile (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/java-mms-spawn.0-10.testprofile (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/java-mms-spawn.0-9-1.testprofile (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/java-mms.0-9-1.testprofile (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/log4j-test.xml (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/test-provider.properties (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/test_resources/ (props changed)
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/testprofile.defaults (props changed)
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Sep 10 15:00:43 2011
@@ -4,4 +4,4 @@
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
/qpid/trunk/qpid:796646-796653,1080001-1085000
-/qpid/trunk/qpid/java:1073294-1157765,1160415-1162726,1162729-1166086
+/qpid/trunk/qpid/java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1167527
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Sep 10 15:00:43 2011
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/broker:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker:787599
/qpid/branches/qpid-2935/qpid/java/broker:1061302-1072333
-/qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086
+/qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1167527
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/bin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Sep 10 15:00:43 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/broker/bin:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker/bin:805429-821809
/qpid/branches/qpid-2935/qpid/java/broker/bin:1061302-1072333
-/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086
+/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1167527
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java Sat Sep 10 15:00:43 2011
@@ -274,132 +274,6 @@ public class HeadersParser
}
- public static void main(String[] args) throws AMQFrameDecodingException
- {
-
- FieldTable bindingTable = new FieldTable();
-
- bindingTable.setString(new AMQShortString("x-match"),"all");
- bindingTable.setInteger("a",1);
- bindingTable.setVoid(new AMQShortString("b"));
- bindingTable.setString("c","");
- bindingTable.setInteger("d",4);
- bindingTable.setInteger("e",1);
-
-
-
- FieldTable bindingTable2 = new FieldTable();
- bindingTable2.setString(new AMQShortString("x-match"),"all");
- bindingTable2.setInteger("a",1);
- bindingTable2.setVoid(new AMQShortString("b"));
- bindingTable2.setString("c","");
- bindingTable2.setInteger("d",4);
- bindingTable2.setInteger("e",1);
- bindingTable2.setInteger("f",1);
-
-
- FieldTable table = new FieldTable();
- table.setInteger("a",1);
- table.setInteger("b",2);
- table.setString("c","");
- table.setInteger("d",4);
- table.setInteger("e",1);
- table.setInteger("f",1);
- table.setInteger("h",1);
- table.setInteger("i",1);
- table.setInteger("j",1);
- table.setInteger("k",1);
- table.setInteger("l",1);
-
- org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.allocate( (int) table.getEncodedSize());
- EncodingUtils.writeFieldTableBytes(buffer, table);
- buffer.flip();
-
- FieldTable table2 = EncodingUtils.readFieldTable(buffer);
-
-
-
- FieldTable bindingTable3 = new FieldTable();
- bindingTable3.setString(new AMQShortString("x-match"),"any");
- bindingTable3.setInteger("a",1);
- bindingTable3.setInteger("b",3);
-
-
- FieldTable bindingTable4 = new FieldTable();
- bindingTable4.setString(new AMQShortString("x-match"),"any");
- bindingTable4.setVoid(new AMQShortString("a"));
-
-
- FieldTable bindingTable5 = new FieldTable();
- bindingTable5.setString(new AMQShortString("x-match"),"all");
- bindingTable5.setString(new AMQShortString("h"),"hello");
-
- for(int i = 0; i < 100; i++)
- {
- printMatches(new FieldTable[] {bindingTable5} , table2);
- }
-
-
-
- }
-
-
-
- private static void printMatches(final FieldTable[] bindingKeys, final FieldTable routingKey)
- {
- HeadersMatcherDFAState sm = null;
- Map<HeaderMatcherResult, String> resultMap = new HashMap<HeaderMatcherResult, String>();
-
- HeadersParser parser = new HeadersParser();
-
- for(int i = 0; i < bindingKeys.length; i++)
- {
- HeaderMatcherResult r = new HeaderMatcherResult();
- resultMap.put(r, bindingKeys[i].toString());
-
-
- if(i==0)
- {
- sm = parser.createStateMachine(bindingKeys[i], r);
- }
- else
- {
- sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeys[i], r));
- }
- }
-
- Collection<HeaderMatcherResult> results = null;
- long beforeTime = System.currentTimeMillis();
- for(int i = 0; i < 1000000; i++)
- {
- routingKey.size();
-
- assert sm != null;
- results = sm.match(routingKey);
-
- }
- long elapsed = System.currentTimeMillis() - beforeTime;
- System.out.println("1000000 Iterations took: " + elapsed);
- Collection<String> resultStrings = new ArrayList<String>();
-
- assert results != null;
- for(HeaderMatcherResult result : results)
- {
- resultStrings.add(resultMap.get(result));
- }
-
- final ArrayList<String> nonMatches = new ArrayList<String>();
- for(FieldTable key : bindingKeys)
- {
- nonMatches.add(key.toString());
- }
- nonMatches.removeAll(resultStrings);
- System.out.println("\""+routingKey+"\" matched with " + resultStrings + " DID NOT MATCH with " + nonMatches);
-
-
- }
-
-
public final static class KeyValuePair
{
public final HeaderKey _key;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java Sat Sep 10 15:00:43 2011
@@ -20,11 +20,15 @@
*/
package org.apache.qpid.server.logging.actors;
-import org.apache.qpid.server.logging.LogMessage;
-import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.RootMessageLogger;
+import java.security.AccessController;
+import java.security.Principal;
import java.text.MessageFormat;
+import java.util.Set;
+
+import javax.management.remote.JMXPrincipal;
+import javax.security.auth.Subject;
/**
* NOTE: This actor is not thread safe.
@@ -40,16 +44,23 @@ import java.text.MessageFormat;
*/
public class ManagementActor extends AbstractActor
{
+ /**
+ * Holds the principal name to display when principal subject is not available.
+ * <p>
+ * This is useful for cases when users invoke JMX operation over JConsole
+ * attached to the local JVM.
+ */
+ private static final String UNKNOWN_PRINCIPAL = "N/A";
+
String _lastThreadName = null;
/**
* LOG FORMAT for the ManagementActor,
- * Uses a MessageFormat call to insert the requried values according to
- * these indicies:
+ * Uses a MessageFormat call to insert the required values according to
+ * these indices:
*
- * 0 - Connection ID
- * 1 - User ID
- * 2 - IP
+ * 0 - User ID
+ * 1 - IP
*/
public static final String MANAGEMENT_FORMAT = "mng:{0}({1})";
@@ -75,19 +86,20 @@ public class ManagementActor extends Abs
_lastThreadName = currentName;
// Management Thread names have this format.
- //RMI TCP Connection(2)-169.24.29.116
+ // RMI TCP Connection(2)-169.24.29.116
// This is true for both LocalAPI and JMX Connections
// However to be defensive lets test.
String[] split = currentName.split("\\(");
if (split.length == 2)
{
- String connectionID = split[1].split("\\)")[0];
String ip = currentName.split("-")[1];
-
- actor = MessageFormat.format(MANAGEMENT_FORMAT,
- connectionID,
- ip);
+ String principalName = getPrincipalName();
+ if (principalName == null)
+ {
+ principalName = UNKNOWN_PRINCIPAL;
+ }
+ actor = MessageFormat.format(MANAGEMENT_FORMAT, principalName, ip);
}
else
{
@@ -105,6 +117,30 @@ public class ManagementActor extends Abs
}
}
+ /**
+ * Returns current JMX principal name.
+ *
+ * @return principal name or null if principal can not be found
+ */
+ protected String getPrincipalName()
+ {
+ String identity = null;
+
+ // retrieve Subject from current AccessControlContext
+ final Subject subject = Subject.getSubject(AccessController.getContext());
+ if (subject != null)
+ {
+ // retrieve JMXPrincipal from Subject
+ final Set<JMXPrincipal> principals = subject.getPrincipals(JMXPrincipal.class);
+ if (principals != null && !principals.isEmpty())
+ {
+ final Principal principal = principals.iterator().next();
+ identity = principal.getName();
+ }
+ }
+ return identity;
+ }
+
public String getLogMessage()
{
updateLogString();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ManagementConsole_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ManagementConsole_logmessages.properties?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ManagementConsole_logmessages.properties (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ManagementConsole_logmessages.properties Sat Sep 10 15:00:43 2011
@@ -30,4 +30,4 @@ STOPPED = MNG-1005 : Stopped
# 0 - Path
SSL_KEYSTORE = MNG-1006 : Using SSL Keystore : {0}
OPEN = MNG-1007 : Open : User {0}
-CLOSE = MNG-1008 : Close
\ No newline at end of file
+CLOSE = MNG-1008 : Close : User {0}
\ No newline at end of file
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Sep 10 15:00:43 2011
@@ -4,4 +4,4 @@
/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/management:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/management:787599
/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/management:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1167527
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java Sat Sep 10 15:00:43 2011
@@ -313,7 +313,7 @@ public class MBeanInvocationHandlerImpl
else if (notification.getType().equals(JMXConnectionNotification.CLOSED) ||
notification.getType().equals(JMXConnectionNotification.FAILED))
{
- _logActor.message(ManagementConsoleMessages.CLOSE());
+ _logActor.message(ManagementConsoleMessages.CLOSE(user));
}
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java Sat Sep 10 15:00:43 2011
@@ -29,7 +29,10 @@ import org.apache.qpid.framing.abstracti
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.MessageMetaDataType;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.util.ByteBufferInputStream;
+import org.apache.qpid.server.util.ByteBufferOutputStream;
+import java.io.*;
import java.nio.ByteBuffer;
import java.util.Set;
@@ -120,38 +123,38 @@ public class MessageMetaData implements
return size;
}
+
public int writeToBuffer(int offset, ByteBuffer dest)
{
- ByteBuffer src = ByteBuffer.allocate((int)getStorableSize());
-
- org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(src);
- EncodingUtils.writeInteger(minaSrc, _contentHeaderBody.getSize());
- _contentHeaderBody.writePayload(minaSrc);
- EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getExchange());
- EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getRoutingKey());
- byte flags = 0;
- if(_messagePublishInfo.isMandatory())
- {
- flags |= MANDATORY_FLAG;
- }
- if(_messagePublishInfo.isImmediate())
+ int oldPosition = dest.position();
+ try
{
- flags |= IMMEDIATE_FLAG;
+
+ DataOutputStream dataOutputStream = new DataOutputStream(new ByteBufferOutputStream(dest));
+ EncodingUtils.writeInteger(dataOutputStream, _contentHeaderBody.getSize());
+ _contentHeaderBody.writePayload(dataOutputStream);
+ EncodingUtils.writeShortStringBytes(dataOutputStream, _messagePublishInfo.getExchange());
+ EncodingUtils.writeShortStringBytes(dataOutputStream, _messagePublishInfo.getRoutingKey());
+ byte flags = 0;
+ if(_messagePublishInfo.isMandatory())
+ {
+ flags |= MANDATORY_FLAG;
+ }
+ if(_messagePublishInfo.isImmediate())
+ {
+ flags |= IMMEDIATE_FLAG;
+ }
+ dest.put(flags);
+ dest.putLong(_arrivalTime);
+
}
- EncodingUtils.writeByte(minaSrc, flags);
- EncodingUtils.writeLong(minaSrc,_arrivalTime);
- src.position(minaSrc.position());
- src.flip();
- src.position(offset);
- src = src.slice();
- if(dest.remaining() < src.limit())
+ catch (IOException e)
{
- src.limit(dest.remaining());
+ // This shouldn't happen as we are not actually using anything that can throw an IO Exception
+ throw new RuntimeException(e);
}
- dest.put(src);
-
- return src.limit();
+ return dest.position()-oldPosition;
}
public int getContentSize()
@@ -173,14 +176,15 @@ public class MessageMetaData implements
{
try
{
- org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(buf);
- int size = EncodingUtils.readInteger(minaSrc);
- ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(minaSrc, size);
- final AMQShortString exchange = EncodingUtils.readAMQShortString(minaSrc);
- final AMQShortString routingKey = EncodingUtils.readAMQShortString(minaSrc);
+ ByteBufferInputStream bbis = new ByteBufferInputStream(buf);
+ DataInputStream dais = new DataInputStream(bbis);
+ int size = EncodingUtils.readInteger(dais);
+ ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(dais, size);
+ final AMQShortString exchange = EncodingUtils.readAMQShortString(dais);
+ final AMQShortString routingKey = EncodingUtils.readAMQShortString(dais);
- final byte flags = EncodingUtils.readByte(minaSrc);
- long arrivalTime = EncodingUtils.readLong(minaSrc);
+ final byte flags = EncodingUtils.readByte(dais);
+ long arrivalTime = EncodingUtils.readLong(dais);
MessagePublishInfo publishBody =
new MessagePublishInfo()
@@ -216,6 +220,10 @@ public class MessageMetaData implements
{
throw new RuntimeException(e);
}
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
};
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Sat Sep 10 15:00:43 2011
@@ -26,6 +26,7 @@
*/
package org.apache.qpid.server.output.amqp0_8;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry;
@@ -34,22 +35,18 @@ import org.apache.qpid.server.output.Hea
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.DeliveryProperties;
-import java.nio.ByteBuffer;
+import java.io.DataOutputStream;
+import java.io.IOException;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
- private static final ProtocolVersionMethodConverter PROTOCOL_CONVERTER =
- METHOD_REGISTRY.getProtocolVersionMethodConverter();
-
public static Factory getInstanceFactory()
{
return new Factory()
@@ -62,6 +59,7 @@ public class ProtocolOutputConverterImpl
};
}
+
private final AMQProtocolSession _protocolSession;
private ProtocolOutputConverterImpl(AMQProtocolSession session)
@@ -78,10 +76,11 @@ public class ProtocolOutputConverterImpl
public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQDataBlock deliver = createEncodedDeliverFrame(entry, channelId, deliveryTag, consumerTag);
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
}
+
private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
throws AMQException
{
@@ -93,65 +92,120 @@ public class ProtocolOutputConverterImpl
{
final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
- ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
- chb.bodySize = message.getSize();
+ ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
return chb;
}
}
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
{
- AMQDataBlock deliver = createEncodedGetOkFrame(entry, channelId, deliveryTag, queueSize);
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
}
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody chb, int channelId, AMQDataBlock deliver)
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
throws AMQException
{
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, chb);
-
+ int bodySize = (int) message.getSize();
- final int bodySize = (int) message.getSize();
if(bodySize == 0)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
+
writeFrame(compositeBlock);
}
else
{
int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
- final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- ByteBuffer buf = ByteBuffer.allocate(capacity);
- int writtenSize = 0;
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+ int writtenSize = capacity;
+
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf));
- AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
while(writtenSize < bodySize)
{
- buf = java.nio.ByteBuffer.allocate(capacity);
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
+
+ writeFrame(new AMQFrame(channelId, body));
}
+ }
+ }
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
}
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
}
- private AMQDataBlock createEncodedDeliverFrame(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
+
+
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
throws AMQException
{
+
final AMQShortString exchangeName;
final AMQShortString routingKey;
@@ -172,21 +226,58 @@ public class ProtocolOutputConverterImpl
final boolean isRedelivered = entry.isRedelivered();
+ final AMQBody returnBlock = new AMQBody()
+ {
+
+ public AMQBody _underlyingBody;
- BasicDeliverBody deliverBody =
- METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
+ public AMQBody createAMQBody()
+ {
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
- return deliverFrame;
+
+
+ }
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
+ };
+ return returnBlock;
}
- private AMQDataBlock createEncodedGetOkFrame(QueueEntry entry, int channelId, long deliveryTag, int queueSize)
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
throws AMQException
{
final AMQShortString exchangeName;
@@ -215,9 +306,8 @@ public class ProtocolOutputConverterImpl
exchangeName,
routingKey,
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame;
+ return getOkBody;
}
public byte getProtocolMinorVersion()
@@ -230,31 +320,28 @@ public class ProtocolOutputConverterImpl
return getProtocolSession().getProtocolMajorVersion();
}
- private AMQDataBlock createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
{
+
BasicReturnBody basicReturnBody =
METHOD_REGISTRY.createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
+ replyText,
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
- return returnFrame;
+
+ return basicReturnBody;
}
- public void writeReturn(MessagePublishInfo messagePublishInfo,
- ContentHeaderBody header,
- MessageContentSource content,
- int channelId,
- int replyCode,
- AMQShortString replyText)
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- AMQDataBlock returnFrame = createEncodedReturnFrame(messagePublishInfo, channelId, replyCode, replyText);
-
- writeMessageDelivery(content, header, channelId, returnFrame);
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+ writeMessageDelivery(message, header, channelId, returnFrame);
}
@@ -266,8 +353,68 @@ public class ProtocolOutputConverterImpl
public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
+
BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Sat Sep 10 15:00:43 2011
@@ -20,9 +20,6 @@ package org.apache.qpid.server.output.am
*
*/
-
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.HeaderPropertiesConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -38,11 +35,13 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- private static final ProtocolVersionMethodConverter
- PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
@@ -121,15 +120,12 @@ public class ProtocolOutputConverterImpl
int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
- final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- int writtenSize = 0;
+ int writtenSize = capacity;
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
CompositeAMQBodyBlock
compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
@@ -137,15 +133,55 @@ public class ProtocolOutputConverterImpl
while(writtenSize < bodySize)
{
- buf = java.nio.ByteBuffer.allocate(capacity);
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ writeFrame(new AMQFrame(channelId, body));
}
}
}
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
@@ -221,7 +257,7 @@ public class ProtocolOutputConverterImpl
return _underlyingBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
if(_underlyingBody == null)
{
@@ -346,7 +382,7 @@ public class ProtocolOutputConverterImpl
return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
}
@@ -374,7 +410,7 @@ public class ProtocolOutputConverterImpl
return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java Sat Sep 10 15:00:43 2011
@@ -20,9 +20,6 @@ package org.apache.qpid.server.output.am
*
*/
-
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.HeaderPropertiesConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -33,17 +30,16 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
- private static final ProtocolVersionMethodConverter
- PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
-
public static Factory getInstanceFactory()
{
@@ -121,15 +117,11 @@ public class ProtocolOutputConverterImpl
int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
- final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- int writtenSize = 0;
+ int writtenSize = capacity;
-
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
CompositeAMQBodyBlock
compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
@@ -137,15 +129,54 @@ public class ProtocolOutputConverterImpl
while(writtenSize < bodySize)
{
- buf = java.nio.ByteBuffer.allocate(capacity);
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ writeFrame(new AMQFrame(channelId, body));
}
}
}
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
@@ -221,7 +252,7 @@ public class ProtocolOutputConverterImpl
return _underlyingBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
if(_underlyingBody == null)
{
@@ -346,7 +377,7 @@ public class ProtocolOutputConverterImpl
return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
}
@@ -374,7 +405,7 @@ public class ProtocolOutputConverterImpl
return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Sat Sep 10 15:00:43 2011
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -348,7 +350,7 @@ public class AMQProtocolEngine implement
private void protocolInitiationReceived(ProtocolInitiation pi)
{
// this ensures the codec never checks for a PI message again
- ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ (_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
try
{
// Log incomming protocol negotiation request
@@ -368,7 +370,7 @@ public class AMQProtocolEngine implement
null,
mechanisms.getBytes(),
locales.getBytes());
- _sender.send(responseBody.generateFrame(0).toNioByteBuffer());
+ _sender.send(asByteBuffer(responseBody.generateFrame(0)));
_sender.flush();
}
@@ -376,11 +378,43 @@ public class AMQProtocolEngine implement
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
- _sender.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
+ _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())));
_sender.flush();
}
}
+ private ByteBuffer asByteBuffer(AMQDataBlock block)
+ {
+ final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+
+ try
+ {
+ block.writePayload(new DataOutputStream(new OutputStream()
+ {
+
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ buf.put((byte) b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ buf.put(b, off, len);
+ }
+ }));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ buf.flip();
+ return buf;
+ }
+
public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
{
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
@@ -491,7 +525,7 @@ public class AMQProtocolEngine implement
public synchronized void writeFrame(AMQDataBlock frame)
{
_lastSent = frame;
- final ByteBuffer buf = frame.toNioByteBuffer();
+ final ByteBuffer buf = asByteBuffer(frame);
_lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
_sender.send(buf);
@@ -1020,7 +1054,7 @@ public class AMQProtocolEngine implement
public void writerIdle()
{
- _sender.send(HeartbeatBody.FRAME.toNioByteBuffer());
+ _sender.send(asByteBuffer(HeartbeatBody.FRAME));
}
public void exception(Throwable throwable)
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Sep 10 15:00:43 2011
@@ -4,4 +4,4 @@
/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1167527
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Sat Sep 10 15:00:43 2011
@@ -139,7 +139,7 @@ public class IncomingMessage implements
public int addContentBodyFrame(final ContentChunk contentChunk)
throws AMQException
{
- _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf());
+ _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
@@ -263,7 +263,7 @@ public class IncomingMessage implements
int written = 0;
for(ContentChunk cb : _contentChunks)
{
- ByteBuffer data = cb.getData().buf();
+ ByteBuffer data = ByteBuffer.wrap(cb.getData());
if(offset+written >= pos && offset < pos + data.limit())
{
ByteBuffer src = data.duplicate();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java Sat Sep 10 15:00:43 2011
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.security.auth.sasl.amqplain;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import javax.security.auth.callback.Callback;
@@ -31,7 +33,6 @@ import javax.security.sasl.AuthorizeCall
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
@@ -60,7 +61,7 @@ public class AmqPlainSaslServer implemen
{
try
{
- final FieldTable ft = FieldTableFactory.newFieldTable(ByteBuffer.wrap(response), response.length);
+ final FieldTable ft = FieldTableFactory.newFieldTable(new DataInputStream(new ByteArrayInputStream(response)), response.length);
String username = (String) ft.getString("LOGIN");
// we do not care about the prompt but it throws if null
NameCallback nameCb = new NameCallback("prompt", username);
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java Sat Sep 10 15:00:43 2011
@@ -20,21 +20,9 @@
*/
package org.apache.qpid.server.security.auth.sasl.anonymous;
-import java.io.IOException;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
public class AnonymousSaslServer implements SaslServer
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Sat Sep 10 15:00:43 2011
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store;
import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
@@ -479,9 +480,15 @@ public class DerbyMessageStore implement
FieldTable arguments;
if(dataAsBytes.length > 0)
{
- org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.wrap(dataAsBytes);
- arguments = new FieldTable(buffer,buffer.limit());
+ try
+ {
+ arguments = new FieldTable(new DataInputStream(new ByteArrayInputStream(dataAsBytes)),dataAsBytes.length);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("IO Exception should not be thrown",e);
+ }
}
else
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Sat Sep 10 15:00:43 2011
@@ -189,7 +189,7 @@ public class Subscription_0_10 implement
public boolean isSuspended()
{
- return !isActive() || _deleted.get(); // TODO check for Session suspension
+ return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension
}
public boolean hasInterest(QueueEntry entry)
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Sat Sep 10 15:00:43 2011
@@ -677,4 +677,18 @@ public class ServerSession extends Sessi
getChannel())
+ "] ";
}
+
+ @Override
+ public void close()
+ {
+ // unregister subscriptions in order to prevent sending of new messages
+ // to subscriptions with closing session
+ final Collection<Subscription_0_10> subscriptions = getSubscriptions();
+ for (Subscription_0_10 subscription_0_10 : subscriptions)
+ {
+ unregister(subscription_0_10);
+ }
+
+ super.close();
+ }
}
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Sep 10 15:00:43 2011
@@ -1,3 +1,3 @@
/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:930288
/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1073294-1157765,1160415-1162726,1162729-1166086
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1167527
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Sat Sep 10 15:00:43 2011
@@ -43,7 +43,10 @@ import org.apache.qpid.framing.FieldTabl
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.util.ByteBufferInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -236,7 +239,14 @@ public class VirtualHostConfigRecoveryHa
FieldTable argumentsFT = null;
if(buf != null)
{
- argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit());
+ try
+ {
+ argumentsFT = new FieldTable(new DataInputStream(new ByteBufferInputStream(buf)),buf.limit());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("IOException should not be thrown here", e);
+ }
}
BindingFactory bf = _virtualHost.getBindingFactory();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1167529&r1=1167528&r2=1167529&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Sat Sep 10 15:00:43 2011
@@ -52,6 +52,7 @@ import org.apache.qpid.server.store.Stor
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -570,8 +571,8 @@ public class AbstractHeadersExchangeTest
int pos = 0;
for(ContentBody body : bodies)
{
- storedMessage.addContent(pos, body.payload.duplicate().buf());
- pos += body.payload.limit();
+ storedMessage.addContent(pos, ByteBuffer.wrap(body._payload));
+ pos += body._payload.length;
}
_incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org