You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/09/13 23:43:04 UTC
svn commit: r575474 [1/2] - in /incubator/qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/message/
client/src/main/java/org/apache/qpidity/client/
client/src/main/java/org/apache/qpidity/cl...
Author: rhs
Date: Thu Sep 13 14:42:57 2007
New Revision: 575474
URL: http://svn.apache.org/viewvc?rev=575474&view=rev
Log:
* moved most of the classes in the org.apache.qpidity package to
org.apache.qpidity.transport
* factored out the network specific pieces into
org.apache.qpidity.transport
* moved the mina specific code to
org.apache.qpidity.transport.network.mina
* replaced the handler chain with Sender/Receiver chains that can
deal with close request/closed notifications
* moved from an anonymous struct[] to a real Header class
* removed an excess copy from message data transmit
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/AbstractDelegate.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Future.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Future.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Header.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Method.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolError.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Range.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Result.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Result.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Struct.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Functions.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java
- copied, changed from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SliceIterator.java
Removed:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/BodyHandler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ContentHandler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Delegator.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Event.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Functions.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Future.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Handler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Header.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/InputHandler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Method.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodHandler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolError.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Range.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Result.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Segment.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionResolver.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SliceIterator.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Struct.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Stub.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Switch.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/SegmentEncoder.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
incubator/qpid/trunk/qpid/java/common/generate
incubator/qpid/trunk/qpid/java/common/pom.xml
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java
incubator/qpid/trunk/qpid/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Sep 13 14:42:57 2007
@@ -27,8 +27,8 @@
import org.apache.qpidity.client.Session;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Option;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Sep 13 14:42:57 2007
@@ -28,7 +28,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Struct;
import javax.jms.JMSException;
import java.io.IOException;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Sep 13 14:42:57 2007
@@ -26,7 +26,7 @@
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpidity.jms.ExceptionHelper;
import org.apache.qpidity.client.util.ByteBufferMessage;
-import org.apache.qpidity.ReplyTo;
+import org.apache.qpidity.transport.ReplyTo;
import javax.jms.Message;
import javax.jms.JMSException;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Thu Sep 13 14:42:57 2007
@@ -27,9 +27,9 @@
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpidity.Struct;
-import org.apache.qpidity.MessageProperties;
-import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.transport.Struct;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Thu Sep 13 14:42:57 2007
@@ -27,7 +27,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Struct;
public interface MessageFactory
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Thu Sep 13 14:42:57 2007
@@ -30,9 +30,9 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpidity.Struct;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.transport.Struct;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java Thu Sep 13 14:42:57 2007
@@ -25,8 +25,8 @@
import java.util.List;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.Struct;
/**
* This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java Thu Sep 13 14:42:57 2007
@@ -6,16 +6,18 @@
import java.util.concurrent.locks.ReentrantLock;
import org.apache.qpidity.BrokerDetails;
-import org.apache.qpidity.Channel;
-import org.apache.qpidity.Connection;
-import org.apache.qpidity.ConnectionClose;
-import org.apache.qpidity.ConnectionDelegate;
import org.apache.qpidity.ErrorCode;
-import org.apache.qpidity.MinaHandler;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.SessionDelegate;
import org.apache.qpidity.client.impl.ClientSession;
import org.apache.qpidity.client.impl.ClientSessionDelegate;
+import org.apache.qpidity.transport.Channel;
+import org.apache.qpidity.transport.Connection;
+import org.apache.qpidity.transport.ConnectionClose;
+import org.apache.qpidity.transport.ConnectionDelegate;
+import org.apache.qpidity.transport.ConnectionEvent;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.SessionDelegate;
+import org.apache.qpidity.transport.network.mina.MinaHandler;
import org.apache.qpidity.url.QpidURL;
@@ -25,47 +27,48 @@
private Connection _conn;
private ExceptionListener _exceptionListner;
private final Lock _lock = new ReentrantLock();
-
+
/**
- *
+ *
* @return returns a new connection to the broker.
*/
public static org.apache.qpidity.client.Connection createConnection()
{
return new Client();
}
-
+
public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException
{
Condition negotiationComplete = _lock.newCondition();
_lock.lock();
-
+
ConnectionDelegate connectionDelegate = new ConnectionDelegate()
- {
+ {
public SessionDelegate getSessionDelegate()
{
return new ClientSessionDelegate();
}
-
- @Override public void connectionClose(Channel context, ConnectionClose connectionClose)
+
+ @Override public void connectionClose(Channel context, ConnectionClose connectionClose)
{
_exceptionListner.onException(
- new QpidException("Server closed the connection: Reason " +
+ new QpidException("Server closed the connection: Reason " +
connectionClose.getReplyText(),
ErrorCode.get(connectionClose.getReplyCode()),
null));
}
};
-
+
connectionDelegate.setCondition(_lock,negotiationComplete);
connectionDelegate.setUsername(username);
connectionDelegate.setPassword(password);
connectionDelegate.setVirtualHost(virtualHost);
-
+
_conn = MinaHandler.connect(host, port,connectionDelegate);
-
- _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer());
-
+
+ // XXX: hardcoded version numbers
+ _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10)));
+
try
{
negotiationComplete.await();
@@ -79,7 +82,7 @@
_lock.unlock();
}
}
-
+
/*
* Until the dust settles with the URL disucssion
* I am not going to implement this.
@@ -94,9 +97,9 @@
details.getUserName(),
details.getPassword());
}
-
+
public void close() throws QpidException
- {
+ {
Channel ch = _conn.getChannel(0);
ch.connectionClose(0, "client is closing", 0, 0);
//need to close the connection underneath as well
@@ -104,7 +107,7 @@
public Session createSession(long expiryInSeconds)
{
- Channel ch = _conn.getChannel(_channelNo.incrementAndGet());
+ Channel ch = _conn.getChannel(_channelNo.incrementAndGet());
ClientSession ssn = new ClientSession();
ssn.attach(ch);
ssn.sessionOpen(expiryInSeconds);
@@ -116,10 +119,10 @@
// TODO Auto-generated method stub
return null;
}
-
+
public void setExceptionListener(ExceptionListener exceptionListner)
{
- _exceptionListner = exceptionListner;
+ _exceptionListner = exceptionListner;
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java Thu Sep 13 14:42:57 2007
@@ -18,15 +18,15 @@
*/
package org.apache.qpidity.client;
-import org.apache.qpidity.DtxCoordinationCommitResult;
-import org.apache.qpidity.DtxCoordinationGetTimeoutResult;
-import org.apache.qpidity.DtxCoordinationPrepareResult;
-import org.apache.qpidity.DtxCoordinationRecoverResult;
-import org.apache.qpidity.DtxCoordinationRollbackResult;
-import org.apache.qpidity.DtxDemarcationEndResult;
-import org.apache.qpidity.DtxDemarcationStartResult;
-import org.apache.qpidity.Future;
-import org.apache.qpidity.Option;
+import org.apache.qpidity.transport.DtxCoordinationCommitResult;
+import org.apache.qpidity.transport.DtxCoordinationGetTimeoutResult;
+import org.apache.qpidity.transport.DtxCoordinationPrepareResult;
+import org.apache.qpidity.transport.DtxCoordinationRecoverResult;
+import org.apache.qpidity.transport.DtxCoordinationRollbackResult;
+import org.apache.qpidity.transport.DtxDemarcationEndResult;
+import org.apache.qpidity.transport.DtxDemarcationStartResult;
+import org.apache.qpidity.transport.Future;
+import org.apache.qpidity.transport.Option;
/**
* This session�s resources are control under the scope of a distributed transaction.
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java Thu Sep 13 14:42:57 2007
@@ -19,7 +19,7 @@
import java.nio.ByteBuffer;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Header;
/**
* Assembles message parts.
@@ -47,7 +47,7 @@
*
* @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
*/
- public void messageHeaders(Struct... headers);
+ public void messageHeader(Header header);
/**
* Add the following byte array to the content of the message being received
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java Thu Sep 13 14:42:57 2007
@@ -22,9 +22,9 @@
import java.nio.ByteBuffer;
import java.util.Map;
-import org.apache.qpidity.Option;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Struct;
import org.apache.qpidity.api.Message;
/**
@@ -186,7 +186,7 @@
* @see org.apache.qpidity.DeliveryProperties
* @see org.apache.qpidity.MessageProperties
*/
- public void headers(Struct... headers);
+ public void header(Struct... headers);
/**
* Add the following byte array to the content of the message being sent.
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java Thu Sep 13 14:42:57 2007
@@ -5,10 +5,10 @@
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpidity.Option;
+import org.apache.qpidity.transport.Option;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Range;
-import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.transport.Range;
+import org.apache.qpidity.transport.RangeSet;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.ExceptionListener;
import org.apache.qpidity.client.MessagePartListener;
@@ -16,7 +16,7 @@
/**
* Implements a Qpid Sesion.
*/
-public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session
+public class ClientSession extends org.apache.qpidity.transport.Session implements org.apache.qpidity.client.Session
{
private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
private ExceptionListener _exceptionListner;
@@ -46,7 +46,7 @@
// The javadoc clearly says that this method is suitable for small messages
// therefore reading the content in one shot.
super.messageTransfer(destination, confirmMode, acquireMode);
- super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
+ super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
super.data(msg.readData());
super.endData();
}
@@ -54,7 +54,7 @@
public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
{
super.messageTransfer(destination, confirmMode, acquireMode);
- super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
+ super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
boolean b = true;
int count = 0;
while(b)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java Thu Sep 13 14:42:57 2007
@@ -3,18 +3,21 @@
import java.nio.ByteBuffer;
import org.apache.qpidity.ErrorCode;
-import org.apache.qpidity.Frame;
-import org.apache.qpidity.MessageAcquired;
-import org.apache.qpidity.MessageReject;
-import org.apache.qpidity.MessageTransfer;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Range;
-import org.apache.qpidity.Session;
-import org.apache.qpidity.SessionClosed;
-import org.apache.qpidity.SessionDelegate;
-import org.apache.qpidity.Struct;
+
import org.apache.qpidity.client.MessagePartListener;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.transport.Data;
+import org.apache.qpidity.transport.Header;
+import org.apache.qpidity.transport.MessageAcquired;
+import org.apache.qpidity.transport.MessageReject;
+import org.apache.qpidity.transport.MessageTransfer;
+import org.apache.qpidity.transport.Range;
+import org.apache.qpidity.transport.Session;
+import org.apache.qpidity.transport.SessionClosed;
+import org.apache.qpidity.transport.SessionDelegate;
+import org.apache.qpidity.transport.Struct;
+
public class ClientSessionDelegate extends SessionDelegate
{
@@ -29,22 +32,22 @@
// --------------------------------------------
// Message methods
// --------------------------------------------
- @Override public void data(Session ssn, Frame frame)
+ @Override public void data(Session ssn, Data data)
{
- for (ByteBuffer b : frame)
+ for (ByteBuffer b : data.getFragments())
{
_currentMessageListener.data(b);
}
- if (frame.isLastSegment() && frame.isLastFrame())
+ if (data.isLast())
{
_currentMessageListener.messageReceived();
}
}
- @Override public void headers(Session ssn, Struct... headers)
+ @Override public void header(Session ssn, Header header)
{
- _currentMessageListener.messageHeaders(headers);
+ _currentMessageListener.messageHeader(header);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java Thu Sep 13 14:42:57 2007
@@ -1,7 +1,5 @@
package org.apache.qpidity.client.impl;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.Client;
@@ -10,6 +8,8 @@
import org.apache.qpidity.client.Session;
import org.apache.qpidity.client.util.MessageListener;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
public class DemoClient
{
@@ -53,14 +53,14 @@
// queue
ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
- ssn.headers(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123"));
+ ssn.header(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123"));
ssn.data("this is the data");
ssn.endData();
//reject
ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
ssn.data("this should be rejected");
- ssn.headers(new DeliveryProperties().setRoutingKey("stocks"));
+ ssn.header(new DeliveryProperties().setRoutingKey("stocks"));
ssn.endData();
ssn.sync();
@@ -81,7 +81,7 @@
// topic
ssn.messageTransfer("amq.topic", (short) 0, (short) 1);
ssn.data("Topic message");
- ssn.headers(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456"));
+ ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456"));
ssn.endData();
ssn.sync();
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java Thu Sep 13 14:42:57 2007
@@ -2,8 +2,6 @@
import java.io.FileInputStream;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.Client;
@@ -13,6 +11,8 @@
import org.apache.qpidity.client.util.FileMessage;
import org.apache.qpidity.client.util.MessageListener;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
public class LargeMsgDemoClient
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java Thu Sep 13 14:42:57 2007
@@ -5,8 +5,8 @@
import java.util.LinkedList;
import java.util.Queue;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.api.Message;
/**
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java Thu Sep 13 14:42:57 2007
@@ -7,8 +7,8 @@
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.api.Message;
/**
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java Thu Sep 13 14:42:57 2007
@@ -3,16 +3,16 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.Header;
import org.apache.qpidity.client.MessagePartListener;
-/**
+/**
* This is a simple message assembler.
- * Will call onMessage method of the adaptee
+ * Will call onMessage method of the adaptee
* when all message data is read.
- *
+ *
* This is a good convinience utility for handling
* small messages
*/
@@ -20,17 +20,17 @@
{
MessageListener _adaptee;
ByteBufferMessage _currentMsg;
-
+
public MessagePartListenerAdapter(MessageListener listener)
{
- _adaptee = listener;
+ _adaptee = listener;
}
-
+
public void messageTransfer(long transferId)
{
_currentMsg = new ByteBufferMessage(transferId);
}
-
+
public void data(ByteBuffer src)
{
try
@@ -40,28 +40,20 @@
catch(IOException e)
{
// A chance for IO exception
- // doesn't occur as we are using
+ // doesn't occur as we are using
// a ByteBuffer
}
}
- public void messageHeaders(Struct... headers)
- {
- for(Struct struct: headers)
- {
- if(struct instanceof DeliveryProperties)
- {
- _currentMsg.setDeliveryProperties((DeliveryProperties)struct);
- }
- else if (struct instanceof MessageProperties)
- {
- _currentMsg.setMessageProperties((MessageProperties)struct);
- }
- }
- }
-
- public void messageReceived()
- {
+ public void messageHeader(Header header)
+ {
+ _currentMsg.setDeliveryProperties(header.get(DeliveryProperties.class));
+ _currentMsg.setMessageProperties(header.get(MessageProperties.class));
+ }
+
+ public void messageReceived()
+ {
_adaptee.onMessage(_currentMsg);
- }
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java Thu Sep 13 14:42:57 2007
@@ -2,8 +2,8 @@
import java.nio.ByteBuffer;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.api.Message;
public abstract class ReadOnlyMessage implements Message
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java Thu Sep 13 14:42:57 2007
@@ -5,8 +5,8 @@
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.api.Message;
public class StreamingMessage extends ReadOnlyMessage implements Message
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java Thu Sep 13 14:42:57 2007
@@ -26,9 +26,7 @@
import javax.jms.MessageListener;
import javax.jms.Queue;
-import org.apache.qpidity.Option;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.RangeSet;
import org.apache.qpidity.client.MessagePartListener;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
import org.apache.qpidity.exchange.ExchangeDefaults;
@@ -36,6 +34,8 @@
import org.apache.qpidity.filter.MessageFilter;
import org.apache.qpidity.jms.message.MessageFactory;
import org.apache.qpidity.jms.message.QpidMessage;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.transport.RangeSet;
/**
* Implementation of JMS message consumer
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
package org.apache.qpidity.jms;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Option;
+import org.apache.qpidity.transport.Option;
import org.apache.qpidity.url.BindingURL;
import org.apache.qpidity.exchange.ExchangeDefaults;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java Thu Sep 13 14:42:57 2007
@@ -21,7 +21,7 @@
import org.slf4j.LoggerFactory;
import org.apache.qpidity.jms.message.*;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.transport.RangeSet;
import javax.jms.*;
import javax.jms.IllegalStateException;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java Thu Sep 13 14:42:57 2007
@@ -18,8 +18,8 @@
package org.apache.qpidity.jms;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Option;
import org.apache.qpidity.exchange.ExchangeDefaults;
+import org.apache.qpidity.transport.Option;
import org.apache.qpidity.url.BindingURL;
import javax.jms.Topic;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java Thu Sep 13 14:42:57 2007
@@ -21,8 +21,10 @@
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
-import org.apache.qpidity.*;
+import org.apache.qpidity.QpidException;
import org.apache.qpidity.dtx.XidImpl;
+import org.apache.qpidity.transport.*;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java Thu Sep 13 14:42:57 2007
@@ -29,8 +29,8 @@
import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.ReplyTo;
import org.apache.qpidity.client.util.ByteBufferMessage;
+import org.apache.qpidity.transport.ReplyTo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified: incubator/qpid/trunk/qpid/java/common/generate
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/generate?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/generate (original)
+++ incubator/qpid/trunk/qpid/java/common/generate Thu Sep 13 14:42:57 2007
@@ -8,6 +8,7 @@
out_dir=sys.argv[1]
out_pkg = sys.argv[2]
spec_file = sys.argv[3]
+
spec = mllib.xml_parse(spec_file)
class Output:
@@ -27,6 +28,8 @@
self.line("import org.apache.qpidity.codec.Encodable;")
self.line("import org.apache.qpidity.codec.Encoder;")
self.line()
+ self.line("import org.apache.qpidity.transport.network.Frame;")
+ self.line()
self.line()
def line(self, l = ""):
@@ -363,7 +366,7 @@
fct.write()
dlg = Output(out_dir, out_pkg, "Delegate")
-dlg.line("public abstract class Delegate<C> {")
+dlg.line("public abstract class Delegate<C> extends AbstractDelegate<C> {")
for s in structs:
dlg.line(" public void %s(C context, %s struct) {}" %
(dromedary(s.name), s.name))
Modified: incubator/qpid/trunk/qpid/java/common/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/pom.xml?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/common/pom.xml Thu Sep 13 14:42:57 2007
@@ -6,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
-
+
http://www.apache.org/licenses/LICENSE-2.0
-
+
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -87,9 +87,11 @@
<param>-Dpython.path=${basedir}/jython-lib.jar/Lib${path.separator}${mllib.dir}</param>
<param>${basedir}/generate</param>
<param>${generated.path}</param>
- <param>org.apache.qpidity</param>
+ <param>org.apache.qpidity.transport</param>
<param>${specs.dir}/amqp.0-10-preview.xml</param>
</params>
+ <source>${specs.dir}/amqp.0-10-preview.xml</source>
+ <timestamp>${generated.path}/generated.timestamp</timestamp>
</configuration>
<goals>
<goal>jython</goal>
@@ -97,8 +99,8 @@
</execution>
</executions>
</plugin>
-
- <!-- Generates message selector grammar -->
+
+ <!-- Generates message selector grammar -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
@@ -145,13 +147,13 @@
</executions>
</plugin>
-
+
</plugins>
</build>
<dependencies>
- <dependency>
+ <dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</dependency>
@@ -166,14 +168,14 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>1.4.0</version>
+ <version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.4.0</version>
- <scope>test</scope>
+ <scope>test</scope>
</dependency>
<dependency>
@@ -204,7 +206,7 @@
<scope>provided</scope>
</dependency>
<!--- This is used by filter -->
- <dependency>
+ <dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</dependency>
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java Thu Sep 13 14:42:57 2007
@@ -22,7 +22,9 @@
import java.nio.ByteBuffer;
-import static org.apache.qpidity.Functions.*;
+import org.apache.qpidity.transport.Sender;
+
+import static org.apache.qpidity.transport.util.Functions.*;
/**
@@ -31,12 +33,17 @@
* @author Rafael H. Schloming
*/
-class ConsoleOutput implements Handler<ByteBuffer>
+public class ConsoleOutput implements Sender<ByteBuffer>
{
- public void handle(ByteBuffer buf)
+ public void send(ByteBuffer buf)
{
System.out.println(str(buf));
+ }
+
+ public void close()
+ {
+ System.out.println("CLOSED");
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java Thu Sep 13 14:42:57 2007
@@ -20,7 +20,10 @@
*/
package org.apache.qpidity;
-import static org.apache.qpidity.Functions.str;
+import org.apache.qpidity.transport.*;
+import org.apache.qpidity.transport.network.mina.MinaHandler;
+
+import static org.apache.qpidity.transport.util.Functions.str;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -44,10 +47,10 @@
private ToyExchange exchange;
private MessageTransfer xfr = null;
private DeliveryProperties props = null;
- private Struct[] headers = null;
- private List<Frame> frames = null;
+ private Header header = null;
+ private List<Data> body = null;
private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>();
-
+
public ToyBroker(ToyExchange exchange)
{
this.exchange = exchange;
@@ -58,7 +61,7 @@
exchange.createQueue(qd.getQueue());
System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n");
}
-
+
@Override public void queueBind(Session ssn, QueueBind qb)
{
exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue());
@@ -70,22 +73,22 @@
QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue());
ssn.executionResult(qq.getId(), result);
}
-
+
@Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
{
Consumer c = new Consumer();
c._queueName = ms.getQueue();
consumers.put(ms.getDestination(),c);
- System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n");
- }
-
+ System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n");
+ }
+
@Override public void messageFlow(Session ssn,MessageFlow struct)
{
Consumer c = consumers.get(struct.getDestination());
c._credit = struct.getValue();
System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n");
}
-
+
@Override public void messageFlush(Session ssn,MessageFlush struct)
{
System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n");
@@ -95,47 +98,44 @@
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
this.xfr = xfr;
- frames = new ArrayList<Frame>();
+ body = new ArrayList<Data>();
System.out.println("received transfer " + xfr.getDestination());
}
- public void headers(Session ssn, Struct ... headers)
+ @Override public void header(Session ssn, Header header)
{
- if (xfr == null || frames == null)
+ if (xfr == null || body == null)
{
ssn.connectionClose(503, "no method segment", 0, 0);
- // XXX: close at our end
+ ssn.close();
return;
}
- for (Struct hdr : headers)
+ props = header.get(DeliveryProperties.class);
+ if (props != null)
{
- if (hdr instanceof DeliveryProperties)
- {
- props = (DeliveryProperties) hdr;
- System.out.println("received headers routing_key " + props.getRoutingKey());
- }
+ System.out.println("received headers routing_key " + props.getRoutingKey());
}
-
- this.headers = headers;
+
+ this.header = header;
}
- public void data(Session ssn, Frame frame)
+ @Override public void data(Session ssn, Data data)
{
- if (xfr == null || frames == null)
+ if (xfr == null || body == null)
{
ssn.connectionClose(503, "no method segment", 0, 0);
- // XXX: close at our end
+ ssn.close();
return;
}
- frames.add(frame);
+ body.add(data);
- if (frame.isLastSegment() && frame.isLastFrame())
+ if (data.isLast())
{
String dest = xfr.getDestination();
- Message m = new Message(headers, frames);
-
+ Message m = new Message(header, body);
+
if (exchange.route(dest,props.getRoutingKey(),m))
{
System.out.println("queued " + m);
@@ -143,12 +143,12 @@
}
else
{
-
+
reject(ssn);
}
ssn.processed(xfr);
xfr = null;
- frames = null;
+ body = null;
}
}
@@ -165,22 +165,22 @@
ssn.messageReject(ranges, 0, "no such destination");
}
}
-
+
private void transferMessageToPeer(Session ssn,String dest, Message m)
{
System.out.println("\n==================> Transfering message to: " +dest + "\n");
ssn.messageTransfer(dest, (short)0, (short)0);
- ssn.headers(m.headers);
- for (Frame f : m.frames)
+ ssn.header(m.header);
+ for (Data d : m.body)
{
- for (ByteBuffer b : f)
+ for (ByteBuffer b : d.getFragments())
{
ssn.data(b);
}
}
ssn.endData();
}
-
+
private void dispatchMessages(Session ssn)
{
for (String dest: consumers.keySet())
@@ -188,8 +188,8 @@
checkAndSendMessagesToConsumer(ssn,dest);
}
}
-
- private void checkAndSendMessagesToConsumer(Session ssn,String dest)
+
+ private void checkAndSendMessagesToConsumer(Session ssn,String dest)
{
Consumer c = consumers.get(dest);
LinkedBlockingQueue<Message> queue = exchange.getQueue(c._queueName);
@@ -204,33 +204,33 @@
class Message
{
- private final Struct[] headers;
- private final List<Frame> frames;
+ private final Header header;
+ private final List<Data> body;
- public Message(Struct[] headers, List<Frame> frames)
+ public Message(Header header, List<Data> body)
{
- this.headers = headers;
- this.frames = frames;
+ this.header = header;
+ this.body = body;
}
public String toString()
{
StringBuilder sb = new StringBuilder();
- if (headers != null)
+ if (header != null)
{
boolean first = true;
- for (Struct hdr : headers)
+ for (Struct st : header.getStructs())
{
if (first) { first = false; }
else { sb.append(" "); }
- sb.append(hdr);
+ sb.append(st);
}
}
- for (Frame f : frames)
+ for (Data d : body)
{
- for (ByteBuffer b : f)
+ for (ByteBuffer b : d.getFragments())
{
sb.append(" | ");
sb.append(str(b));
@@ -241,7 +241,7 @@
}
}
-
+
// ugly, but who cares :)
// assumes unit is always no of messages, not bytes
// assumes it's credit mode and not window
@@ -253,7 +253,7 @@
public static final void main(String[] args) throws IOException
{
- final ToyExchange exchange = new ToyExchange();
+ final ToyExchange exchange = new ToyExchange();
ConnectionDelegate delegate = new ConnectionDelegate()
{
public SessionDelegate getSessionDelegate()
@@ -261,11 +261,11 @@
return new ToyBroker(exchange);
}
};
-
+
//hack
delegate.setUsername("guest");
delegate.setPassword("guest");
-
+
MinaHandler.accept("0.0.0.0", 5672, delegate);
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java Thu Sep 13 14:42:57 2007
@@ -20,6 +20,9 @@
*/
package org.apache.qpidity;
+import org.apache.qpidity.transport.*;
+import org.apache.qpidity.transport.network.mina.MinaHandler;
+
/**
* ToyClient
@@ -42,17 +45,17 @@
}
}
- public void headers(Session ssn, Struct ... headers)
+ @Override public void header(Session ssn, Header header)
{
- for (Struct hdr : headers)
+ for (Struct st : header.getStructs())
{
- System.out.println("header: " + hdr);
+ System.out.println("header: " + st);
}
}
- public void data(Session ssn, Frame frame)
+ @Override public void data(Session ssn, Data data)
{
- System.out.println("got data: " + frame);
+ System.out.println("got data: " + data);
}
public static final void main(String[] args)
@@ -65,7 +68,7 @@
return new ToyClient();
}
});
- conn.getOutputHandler().handle(conn.getHeader().toByteBuffer());
+ conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10)));
Channel ch = conn.getChannel(0);
Session ssn = new Session();
@@ -76,8 +79,8 @@
ssn.sync();
ssn.messageTransfer("asdf", (short) 0, (short) 1);
- ssn.headers(new DeliveryProperties(),
- new MessageProperties());
+ ssn.header(new DeliveryProperties(),
+ new MessageProperties());
ssn.data("this is the data");
ssn.endData();
@@ -88,6 +91,8 @@
Future<QueueQueryResult> future = ssn.queueQuery("asdf");
System.out.println(future.get().getQueue());
+ ssn.close();
+ conn.close();
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java Thu Sep 13 14:42:57 2007
@@ -3,8 +3,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.qpidity.MessageProperties;
-import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -14,9 +14,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,13 +30,13 @@
public MessageProperties getMessageProperties();
public DeliveryProperties getDeliveryProperties();
-
+
/**
* This will abstract the underlying message data.
* The Message implementation may not hold all message
* data in memory (especially in the case of large messages)
- *
- * The appendData function might write data to
+ *
+ * The appendData function might write data to
* <ul>
* <li> Memory (Ex: ByteBuffer)
* <li> To Disk
@@ -50,54 +50,54 @@
* This will abstract the underlying message data.
* The Message implementation may not hold all message
* data in memory (especially in the case of large messages)
- *
- * The appendData function might write data to
+ *
+ * The appendData function might write data to
* <ul>
* <li> Memory (Ex: ByteBuffer)
* <li> To Disk
* <li> To Socket (Stream)
* </ul>
* @param src - the data to append
- */
+ */
public void appendData(ByteBuffer src) throws IOException;
-
+
/**
* This will abstract the underlying message data.
* The Message implementation may not hold all message
* data in memory (especially in the case of large messages)
- *
+ *
* The read function might copy data from
* <ul>
* <li> From memory (Ex: ByteBuffer)
* <li> From Disk
* <li> From Socket as and when it gets streamed
* </ul>
- * @param target The target byte[] which the data gets copied to
+ * @param target The target byte[] which the data gets copied to
*/
- public void readData(byte[] target) throws IOException;
-
+ public void readData(byte[] target) throws IOException;
+
/**
* * This will abstract the underlying message data.
* The Message implementation may not hold all message
* data in memory (especially in the case of large messages)
- *
+ *
* The read function might copy data from
* <ul>
* <li> From memory (Ex: ByteBuffer)
* <li> From Disk
* <li> From Socket as and when it gets streamed
* </ul>
- *
+ *
* @return A ByteBuffer containing data
* @throws IOException
*/
- public ByteBuffer readData() throws IOException;
-
+ public ByteBuffer readData() throws IOException;
+
/**
* This should clear the body of the message.
*/
public void clearData();
-
+
/**
* The provides access to the command Id assigned to the
* message transfer.
@@ -107,15 +107,14 @@
* you could use this id to accquire it.
* <li>For releasing a message. You can use this id to release an acquired
* message
- * <li>For Acknowledging a message - You need to pass this ID, in order to
+ * <li>For Acknowledging a message - You need to pass this ID, in order to
* acknowledge the message
* <li>For Rejecting a message - You need to pass this ID, in order to reject
- * the message.
+ * the message.
* </ul>
- *
+ *
* @return the message transfer id.
*/
public long getMessageTransferId();
-
-}
+}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java Thu Sep 13 14:42:57 2007
@@ -23,10 +23,10 @@
import java.util.Map;
import java.util.UUID;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Struct;
-import static org.apache.qpidity.Functions.*;
+import static org.apache.qpidity.transport.util.Functions.*;
/**
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java Thu Sep 13 14:42:57 2007
@@ -25,11 +25,11 @@
import java.util.Map;
import java.util.UUID;
-import org.apache.qpidity.Range;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Range;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Struct;
-import static org.apache.qpidity.Functions.*;
+import static org.apache.qpidity.transport.util.Functions.*;
/**
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java Thu Sep 13 14:42:57 2007
@@ -23,8 +23,8 @@
import java.util.Map;
import java.util.UUID;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Struct;
/**
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java Thu Sep 13 14:42:57 2007
@@ -23,8 +23,8 @@
import java.util.Map;
import java.util.UUID;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Struct;
/**
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/AbstractDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/AbstractDelegate.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/AbstractDelegate.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/AbstractDelegate.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * AbstractDelegate
+ *
+ */
+
+class AbstractDelegate<C>
+{
+
+ public void init(C context, ProtocolHeader header) {}
+
+ public void error(C context, ProtocolError error) {}
+
+ public void header(C context, Header header) {}
+
+ public void data(C context, Data data) {}
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/AbstractDelegate.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java Thu Sep 13 14:42:57 2007
@@ -18,18 +18,15 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.ArrayList;
-import org.apache.qpidity.codec.SegmentEncoder;
-import org.apache.qpidity.codec.SizeEncoder;
-
-import static org.apache.qpidity.Frame.*;
-import static org.apache.qpidity.Functions.*;
+import static org.apache.qpidity.transport.network.Frame.*;
+import static org.apache.qpidity.transport.util.Functions.*;
/**
@@ -38,20 +35,18 @@
* @author Rafael H. Schloming
*/
-public class Channel extends Invoker implements Handler<Frame>
+public class Channel extends Invoker implements Receiver<ProtocolEvent>
{
final private Connection connection;
final private int channel;
- final private TrackSwitch<Channel> tracks;
final private Delegate<Channel> delegate;
final private SessionDelegate sessionDelegate;
// session may be null
private Session session;
- private Method method = null;
- private List<ByteBuffer> data = null;
- private int dataSize;
+ private boolean first = true;
+ private ByteBuffer data = null;
public Channel(Connection connection, int channel, SessionDelegate delegate)
{
@@ -59,27 +54,57 @@
this.channel = channel;
this.delegate = new ChannelDelegate();
this.sessionDelegate = delegate;
+ }
+
+ public Connection getConnection()
+ {
+ return connection;
+ }
- tracks = new TrackSwitch<Channel>();
- tracks.map(L1, new MethodHandler<Channel>
- (getMajor(), getMinor(), connection.getConnectionDelegate()));
- tracks.map(L2, new MethodHandler<Channel>
- (getMajor(), getMinor(), this.delegate));
- tracks.map(L3, new SessionResolver<Frame>
- (new MethodHandler<Session>
- (getMajor(), getMinor(), delegate)));
- tracks.map(L4, new SessionResolver<Frame>
- (new ContentHandler(getMajor(), getMinor(), delegate)));
+ public void received(ProtocolEvent event)
+ {
+ switch (event.getEncodedTrack())
+ {
+ case L1:
+ event.delegate(this, connection.getConnectionDelegate());
+ break;
+ case L2:
+ event.delegate(this, delegate);
+ break;
+ case L3:
+ event.delegate(session, sessionDelegate);
+ break;
+ case L4:
+ // XXX
+ if (event instanceof Method)
+ {
+ Method method = (Method) event;
+ method.setId(session.nextCommandId());
+ method.delegate(session, sessionDelegate);
+ if (!method.hasPayload())
+ {
+ session.processed(method);
+ }
+ }
+ else
+ {
+ event.delegate(session, sessionDelegate);
+ }
+ break;
+ default:
+ throw new IllegalStateException
+ ("unknown track: " + event.getEncodedTrack());
+ }
}
- public byte getMajor()
+ public void closed()
{
- return connection.getMajor();
+ System.out.println("channel closed: " + this);
}
- public byte getMinor()
+ public void close()
{
- return connection.getMinor();
+ connection.removeChannel(channel);
}
public int getEncodedChannel() {
@@ -96,90 +121,35 @@
this.session = session;
}
- public void handle(Frame frame)
+ private void emit(ProtocolEvent event)
{
- tracks.handle(new Event<Channel,Frame>(this, frame));
- }
-
- private SegmentEncoder newEncoder(byte flags, byte track, byte type, int size)
- {
- return new SegmentEncoder(getMajor(),
- getMinor(),
- connection.getOutputHandler(),
- connection.getMaxFrame(),
- (byte) (flags | VERSION),
- track,
- type,
- channel,
- size);
+ connection.send(new ConnectionEvent(channel, event));
}
public void method(Method m)
{
- SizeEncoder sizer = new SizeEncoder(getMajor(), getMinor());
- sizer.writeLong(m.getEncodedType());
- m.write(sizer, getMajor(), getMinor());
- sizer.flush();
- int size = sizer.getSize();
-
- byte flags = FIRST_SEG;
-
- if (!m.hasPayload())
- {
- flags |= LAST_SEG;
- }
+ emit(m);
- SegmentEncoder enc = newEncoder(flags, m.getEncodedTrack(),
- m.getSegmentType(), size);
- enc.writeLong(m.getEncodedType());
- m.write(enc, getMajor(), getMinor());
- enc.flush();
-
- if (m.hasPayload())
- {
- method = m;
- }
-
- if (m.getEncodedTrack() != Frame.L4)
+ if (m.getEncodedTrack() != L4)
{
System.out.println("sent control " + m.getClass().getName());
}
}
- public void headers(Struct ... headers)
+ public void header(Header header)
{
- if (method == null)
- {
- throw new IllegalStateException("cannot write headers without method");
- }
-
- SizeEncoder sizer = new SizeEncoder(getMajor(), getMinor());
- for (Struct hdr : headers)
- {
- sizer.writeLongStruct(hdr);
- }
-
- SegmentEncoder enc = newEncoder((byte) 0x0,
- method.getEncodedTrack(),
- HEADER,
- sizer.getSize());
- for (Struct hdr : headers)
- {
- enc.writeLongStruct(hdr);
- enc.flush();
- System.out.println("sent " + hdr);
- }
+ emit(header);
}
public void data(ByteBuffer buf)
{
- if (data == null)
+ if (data != null)
{
- data = new ArrayList<ByteBuffer>();
- dataSize = 0;
+ emit(new Data(data, first, false));
+ first = false;
}
- data.add(buf);
- dataSize += buf.remaining();
+
+ data = buf;
}
public void data(String str)
@@ -194,17 +164,9 @@
public void end()
{
- byte flags = LAST_SEG;
- SegmentEncoder enc = newEncoder(flags, method.getEncodedTrack(),
- BODY, dataSize);
- for (ByteBuffer buf : data)
- {
- enc.put(buf);
- System.out.println("sent " + str(buf));
- }
- enc.flush();
+ emit(new Data(data, first, true));
+ first = true;
data = null;
- dataSize = 0;
}
protected void invoke(Method m)
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
import java.util.UUID;