You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/05 21:33:12 UTC
svn commit: r682887 [1/2] - in /incubator/qpid/trunk/qpid/java:
client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/
client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/
client/example/src/main/java/org/apache/qp...
Author: rhs
Date: Tue Aug 5 12:33:11 2008
New Revision: 682887
URL: http://svn.apache.org/viewvc?rev=682887&view=rev
Log:
Profiling driven changes:
- made AMQShortString cache the toString() value
- added static initializer to IoTransport to disable use of pooled
byte buffers
- modified IoSender to permit buffering
- removed OutputHandler and eliminated intermediate Frame generation
between Disassembler and Sender<ByteBuffer> (IoSender)
- made Disassembler take advantage of IoSender's buffering
- removed Header and Data as distinct protocol events, added Header
and Body members to MessageTransfer
- modified Assembler and Disassembler to decode/encode Header and
Data directly to/from MessageTransfer
- modified Disassembler to only write data if encoding of headers is
successful
- added Strings.toUTF8(String) -> byte[] to do proper UTF-8 encoding
that is also fast for 7-bit ascii
- modified JMSTextMessage to use the Strings.toUTF8
- modified QpidBench to only generate 7-bit ascii when using
TextMessage
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java (with props)
Removed:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Data.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutputHandler.java
Modified:
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
incubator/qpid/trunk/qpid/java/common/Composite.tpl
incubator/qpid/trunk/qpid/java/common/Invoker.tpl
incubator/qpid/trunk/qpid/java/common/genutil.py
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
incubator/qpid/trunk/qpid/java/tools/bin/qpid-bench
incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
Modified: incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java Tue Aug 5 12:33:11 2008
@@ -8,6 +8,7 @@
import org.apache.qpid.nclient.Session;
import org.apache.qpid.nclient.util.MessageListener;
import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
@@ -67,16 +68,14 @@
for (int i=0; i<10; i++)
{
- session.messageTransfer("amq.direct", MessageAcceptMode.EXPLICIT,MessageAcquireMode.PRE_ACQUIRED);
- session.header(deliveryProps);
- session.data("Message " + i);
- session.endData();
+ session.messageTransfer("amq.direct", MessageAcceptMode.EXPLICIT,MessageAcquireMode.PRE_ACQUIRED,
+ new Header(deliveryProps),
+ "Message " + i);
}
- session.messageTransfer("amq.direct", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED);
- session.header(deliveryProps);
- session.data("That's all, folks!");
- session.endData();
+ session.messageTransfer("amq.direct", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(deliveryProps),
+ "That's all, folks!");
// confirm completion
session.sync();
Modified: incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java Tue Aug 5 12:33:11 2008
@@ -4,6 +4,7 @@
import org.apache.qpid.nclient.Connection;
import org.apache.qpid.nclient.Session;
import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
@@ -34,16 +35,13 @@
for (int i=0; i<10; i++)
{
- session.messageTransfer("amq.fanout", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED);
- session.header(deliveryProps);
- session.data("Message " + i);
- session.endData();
+ session.messageTransfer("amq.fanout", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(deliveryProps), "Message " + i);
}
- session.messageTransfer("amq.fanout", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED);
- session.header(deliveryProps);
- session.data("That's all, folks!");
- session.endData();
+ session.messageTransfer("amq.fanout", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(deliveryProps),
+ "That's all, folks!");
// confirm completion
session.sync();
Modified: incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java (original)
+++ incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java Tue Aug 5 12:33:11 2008
@@ -4,6 +4,7 @@
import org.apache.qpid.nclient.Connection;
import org.apache.qpid.nclient.Session;
import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
@@ -18,20 +19,17 @@
deliveryProps.setRoutingKey(routing_key);
for (int i=0; i<5; i++) {
- session.messageTransfer("amq.topic", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED);
- session.header(deliveryProps);
- session.data("Message " + i);
- session.endData();
+ session.messageTransfer("amq.topic", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(deliveryProps), "Message " + i);
}
}
public void noMoreMessages(Session session)
{
- session.messageTransfer("amq.topic", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED);
- session.header(new DeliveryProperties().setRoutingKey("control"));
- session.data("That's all, folks!");
- session.endData();
+ session.messageTransfer("amq.topic", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties().setRoutingKey("control")),
+ "That's all, folks!");
}
public static void main(String[] args)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Tue Aug 5 12:33:11 2008
@@ -31,6 +31,7 @@
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.util.Strings;
public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage
{
@@ -111,20 +112,17 @@
try
{
if (text != null)
- {
- _data = ByteBuffer.allocate(text.length());
- _data.limit(text.length()) ;
- //_data.sweep();
- _data.setAutoExpand(true);
+ {
final String encoding = getContentHeaderProperties().getEncodingAsString();
- if (encoding == null)
+ if (encoding == null || encoding.equalsIgnoreCase("UTF-8"))
{
- _data.put(text.getBytes(DEFAULT_CHARSET.name()));
+ _data = ByteBuffer.wrap(Strings.toUTF8(text));
}
else
{
- _data.put(text.getBytes(encoding));
+ _data = ByteBuffer.wrap(text.getBytes(encoding));
}
+ _data.position(_data.limit());
_changedData=true;
}
_decodedValue = text;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java Tue Aug 5 12:33:11 2008
@@ -109,42 +109,6 @@
/**
- * <p>This transfer streams a complete message using a single method.
- * It uses pull-semantics instead of doing a push.</p>
- * <p>Data is pulled from a Message object using read()
- * and pushed using messageTransfer() and headers() followed by data() and endData().
- * <br><b><i>This method should only be used by large messages</b></i><br>
- * There are two convenience Message classes to do this.
- * <ul>
- * <li> <code>{@link org.apache.qpid.nclient.util.FileMessage}</code>
- * <li> <code>{@link org.apache.qpid.nclient.util.StreamingMessage}</code>
- * </ul>
- * You can also implement a <code>Message</code> interface to wrap any
- * data stream.
- * </p>
- *
- * @param destination The exchange the message is being sent to.
- * @param msg The Message to be sent.
- * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation
- * is not required. Once a message has been transferred in pre-acquire
- * mode (or once acquire has been sent in no-acquire mode) the message is considered
- * transferred.
- * <p/>
- * <li> on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message
- * is not considered transferred until the original
- * transfer is complete. A complete transfer is signaled by execution.complete.
- * </ul>
- * @param acquireMode <ul>
- * <li> no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message
- * must be explicitly acquired.
- * <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message
- * is acquired when the transfer starts.
- * </ul>
- * @throws java.io.IOException If transferring a message fails due to some internal communication error, an exception is thrown.
- */
- public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException;
-
- /**
* This command transfers a message between two peers.
*
* @param destination Specifies the destination to which the message is to be transferred.
@@ -154,46 +118,31 @@
* @param acquireMode Indicates whether or not the transferred message has been acquired.
*/
public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
- Option ... options);
-
- /**
- * Make a set of headers to be sent together with a message
- *
- * @param headers headers to be added
- * @see org.apache.qpid.transport.DeliveryProperties
- * @see org.apache.qpid.transport.MessageProperties
- * @return The added headers.
- */
- public Header header(Struct... headers);
+ Header header, ByteBuffer body, Option ... options);
/**
- * Add a byte array to the content of the message being sent.
- *
- * @param data Data to be added.
- */
- public void data(byte[] data);
-
- /**
- * A Add a ByteBuffer to the content of the message being sent.
- * <p> Note that only the data between the buffer's current position and the
- * buffer limit is added.
- * It is therefore recommended to flip the buffer before adding it to the message,
+ * This command transfers a message between two peers.
*
- * @param buf Data to be added.
+ * @param destination Specifies the destination to which the message is to be transferred.
+ * @param acceptMode Indicates whether message.accept, session.complete,
+ * or nothing at all is required to indicate successful transfer of the message.
+ *
+ * @param acquireMode Indicates whether or not the transferred message has been acquired.
*/
- public void data(ByteBuffer buf);
+ public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
+ Header header, byte[] body, Option ... options);
/**
- * Add a string to the content of the message being sent.
+ * This command transfers a message between two peers.
*
- * @param str String to be added.
- */
- public void data(String str);
-
- /**
- * Signals the end of data for the message.
+ * @param destination Specifies the destination to which the message is to be transferred.
+ * @param acceptMode Indicates whether message.accept, session.complete,
+ * or nothing at all is required to indicate successful transfer of the message.
+ *
+ * @param acquireMode Indicates whether or not the transferred message has been acquired.
*/
- public void endData();
+ public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
+ Header header, String body, Option ... options);
//------------------------------------------------------
// Messaging methods
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java Tue Aug 5 12:33:11 2008
@@ -11,8 +11,11 @@
import org.apache.qpid.api.Message;
import org.apache.qpid.nclient.ClosedListener;
import org.apache.qpid.nclient.MessagePartListener;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
@@ -85,24 +88,29 @@
public void messageTransfer(String destination, Message msg, short acceptMode, short acquireMode) throws IOException
{
- // The javadoc clearly says that this method is suitable for small messages
- // therefore reading the content in one shot.
- ByteBuffer data = msg.readData();
- super.messageTransfer(destination, MessageAcceptMode.get(acceptMode),
- MessageAcquireMode.get(acquireMode));
- // super.header(msg.getDeliveryProperties(),msg.getMessageProperties() );
- if( msg.getHeader() == null || msg.getDeliveryProperties().isDirty() || msg.getMessageProperties().isDirty() )
- {
- msg.setHeader( super.header(msg.getDeliveryProperties(),msg.getMessageProperties()) );
- msg.getDeliveryProperties().setDirty(false);
- msg.getMessageProperties().setDirty(false);
+ DeliveryProperties dp = msg.getDeliveryProperties();
+ MessageProperties mp = msg.getMessageProperties();
+ Header header;
+ if (msg.getHeader() == null || dp.isDirty() || mp.isDirty())
+ {
+ header = new Header(dp, mp);
+ msg.setHeader(header);
+ dp.setDirty(false);
+ mp.setDirty(false);
}
else
{
- super.header(msg.getHeader());
+ header = msg.getHeader();
}
- data( data );
- endData();
+ // The javadoc clearly says that this method is suitable for small messages
+ // therefore reading the content in one shot.
+ ByteBuffer body = msg.readData();
+ int size = body.remaining();
+ super.messageTransfer
+ (destination, MessageAcceptMode.get(acceptMode),
+ MessageAcquireMode.get(acquireMode), header, body);
+ _currentDataSizeNotSynced += size;
+ _currentDataSizeNotFlushed += size;
}
public void sync()
@@ -111,65 +119,6 @@
_currentDataSizeNotSynced = 0;
}
- /* -------------------------
- * Data methods
- * ------------------------*/
-
- public void data(ByteBuffer buf)
- {
- _currentDataSizeNotSynced = _currentDataSizeNotSynced + buf.remaining();
- _currentDataSizeNotFlushed = _currentDataSizeNotFlushed + buf.remaining();
- super.data(buf);
- }
-
- public void data(String str)
- {
- _currentDataSizeNotSynced = _currentDataSizeNotSynced + str.getBytes().length;
- super.data(str);
- }
-
- public void data(byte[] bytes)
- {
- _currentDataSizeNotSynced = _currentDataSizeNotSynced + bytes.length;
- super.data(bytes);
- }
-
- public void messageStream(String destination, Message msg, short acceptMode, short acquireMode) throws IOException
- {
- super.messageTransfer(destination, MessageAcceptMode.get(acceptMode),
- MessageAcquireMode.get(acquireMode));
- super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
- boolean b = true;
- int count = 0;
- while(b)
- {
- try
- {
- System.out.println("count : " + count++);
- data(msg.readData());
- }
- catch(EOFException e)
- {
- b = false;
- }
- }
- endData();
- }
-
- public void endData()
- {
- super.endData();
- /* if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= MAX_NOT_SYNC_DATA_LENGH)
- {
- sync();
- }
- if( MAX_NOT_FLUSH_DATA_LENGH != -1 && _currentDataSizeNotFlushed >= MAX_NOT_FLUSH_DATA_LENGH)
- {
- executionFlush();
- _currentDataSizeNotFlushed = 0;
- }*/
- }
-
public RangeSet getRejectedMessages()
{
return _rejectedMessages;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java Tue Aug 5 12:33:11 2008
@@ -7,7 +7,6 @@
import org.apache.qpid.nclient.MessagePartListener;
import org.apache.qpid.QpidException;
-import org.apache.qpid.transport.Data;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageReject;
import org.apache.qpid.transport.MessageTransfer;
@@ -18,46 +17,27 @@
public class ClientSessionDelegate extends SessionDelegate
-{
- private MessageTransfer _currentTransfer;
- private MessagePartListener _currentMessageListener;
-
- @Override public void sessionDetached(Session ssn, SessionDetached dtc)
- {
- ((ClientSession)ssn).notifyException(new QpidException("", ErrorCode.get(dtc.getCode().getValue()),null));
- }
-
+{
+
// --------------------------------------------
// Message methods
// --------------------------------------------
- @Override public void data(Session ssn, Data data)
+ @Override public void messageTransfer(Session session, MessageTransfer xfr)
{
- _currentMessageListener.data(data.getData());
- if (data.isLast())
+ MessagePartListener listener = ((ClientSession)session).getMessageListeners()
+ .get(xfr.getDestination());
+ listener.messageTransfer(xfr.getId());
+ listener.messageHeader(xfr.getHeader());
+ ByteBuffer body = xfr.getBody();
+ if (body == null)
{
- _currentMessageListener.messageReceived();
+ body = ByteBuffer.allocate(0);
}
+ listener.data(body);
+ listener.messageReceived();
}
- @Override public void header(Session ssn, Header header)
- {
- _currentMessageListener.messageHeader(header);
- if( header.hasNoPayload())
- {
- _currentMessageListener.data(ByteBuffer.allocate(0));
- _currentMessageListener.messageReceived();
- }
- }
-
-
- @Override public void messageTransfer(Session session, MessageTransfer currentTransfer)
- {
- _currentTransfer = currentTransfer;
- _currentMessageListener = ((ClientSession)session).getMessageListeners().get(currentTransfer.getDestination());
- _currentMessageListener.messageTransfer(currentTransfer.getId());
- }
-
- @Override public void messageReject(Session session, MessageReject struct)
+ @Override public void messageReject(Session session, MessageReject struct)
{
for (Range range : struct.getTransfers())
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java Tue Aug 5 12:33:11 2008
@@ -9,10 +9,12 @@
import org.apache.qpid.nclient.util.MessageListener;
import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageProperties;
+import java.nio.ByteBuffer;
import java.util.UUID;
public class DemoClient
@@ -56,17 +58,15 @@
ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
// queue
- ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
- ssn.header(new DeliveryProperties().setRoutingKey("queue1"),
- new MessageProperties().setMessageId(UUID.randomUUID()));
- ssn.data("this is the data");
- ssn.endData();
+ ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties().setRoutingKey("queue1"),
+ new MessageProperties().setMessageId(UUID.randomUUID())),
+ ByteBuffer.wrap("this is the data".getBytes()));
//reject
- ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
- ssn.data("this should be rejected");
- ssn.header(new DeliveryProperties().setRoutingKey("stocks"));
- ssn.endData();
+ ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties().setRoutingKey("stocks")),
+ ByteBuffer.wrap("this should be rejected".getBytes()));
ssn.sync();
// topic subs
@@ -84,11 +84,10 @@
ssn.sync();
// topic
- ssn.messageTransfer("amq.topic", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
- ssn.data("Topic message");
- ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),
- new MessageProperties().setMessageId(UUID.randomUUID()));
- ssn.endData();
+ ssn.messageTransfer("amq.topic", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),
+ new MessageProperties().setMessageId(UUID.randomUUID())),
+ ByteBuffer.wrap("Topic message".getBytes()));
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java Tue Aug 5 12:33:11 2008
@@ -1,5 +1,6 @@
package org.apache.qpid.nclient.interop;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
@@ -13,6 +14,7 @@
import org.apache.qpid.nclient.util.MessageListener;
import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
@@ -77,18 +79,15 @@
public void testSendMessage(){
System.out.println("------- Sending a message --------");
- session.messageTransfer("test", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
-
Map<String,Object> props = new HashMap<String,Object>();
props.put("name", "rajith");
props.put("age", 10);
props.put("spf", 8.5);
- session.header(new DeliveryProperties().setRoutingKey("testKey"),new MessageProperties().setApplicationHeaders(props));
-
- //session.header(new DeliveryProperties().setRoutingKey("testKey"));
+ session.messageTransfer("test", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties().setRoutingKey("testKey"),
+ new MessageProperties().setApplicationHeaders(props)),
+ ByteBuffer.wrap("TestMessage".getBytes()));
- session.data("TestMessage");
- session.endData();
session.sync();
System.out.println("------- Message sent --------");
}
Modified: incubator/qpid/trunk/qpid/java/common/Composite.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Composite.tpl?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Composite.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/Composite.tpl Tue Aug 5 12:33:11 2008
@@ -1,5 +1,6 @@
package org.apache.qpid.transport;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@@ -9,7 +10,6 @@
import org.apache.qpid.transport.codec.Decoder;
import org.apache.qpid.transport.codec.Encodable;
import org.apache.qpid.transport.codec.Encoder;
-import org.apache.qpid.transport.codec.Validator;
import org.apache.qpid.transport.network.Frame;
@@ -18,11 +18,13 @@
cls = klass(type)["@name"]
+segments = type["segments"]
+
if type.name in ("control", "command"):
base = "Method"
size = 0
pack = 2
- if type["segments"]:
+ if segments:
payload = "true"
else:
payload = "false"
@@ -86,6 +88,10 @@
for f in fields:
if not f.empty:
out(" private $(f.type) $(f.name);\n")
+
+if segments:
+ out(" private Header header;\n")
+ out(" private ByteBuffer body;\n")
}
${
@@ -99,6 +105,10 @@
if f.option: continue
out(" $(f.set)($(f.name));\n")
+if segments:
+ out(" setHeader(header);\n")
+ out(" setBody(body);\n")
+
if options or base == "Method":
out("""
for (int i=0; i < _options.length; i++) {
@@ -154,7 +164,6 @@
}
public final $name $(f.set)($(f.type) value) {
- $(f.check)
${
if not f.empty:
out(" this.$(f.name) = value;")
@@ -173,6 +182,44 @@
""")
}
+${
+if segments:
+ out(""" public final Header getHeader() {
+ return this.header;
+ }
+
+ public final void setHeader(Header header) {
+ this.header = header;
+ }
+
+ public final $name header(Header header) {
+ setHeader(header);
+ return this;
+ }
+
+ public final ByteBuffer getBody() {
+ if (this.body == null)
+ {
+ return null;
+ }
+ else
+ {
+ return this.body.slice();
+ }
+ }
+
+ public final void setBody(ByteBuffer body) {
+ this.body = body;
+ }
+
+ public final $name body(ByteBuffer body)
+ {
+ setBody(body);
+ return this;
+ }
+""")
+}
+
public void write(Encoder enc)
{
${
Modified: incubator/qpid/trunk/qpid/java/common/Invoker.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Invoker.tpl?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Invoker.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/Invoker.tpl Tue Aug 5 12:33:11 2008
@@ -1,5 +1,6 @@
package org.apache.qpid.transport;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -32,9 +33,9 @@
jclass = ""
out("""
- public final $jresult $(dromedary(name))($(", ".join(params))) {
- $(jreturn)invoke(new $name($(", ".join(args)))$jclass);
- }
+ public final $jresult $(dromedary(name))($(", ".join(params))) {
+ $(jreturn)invoke(new $name($(", ".join(args)))$jclass);
+ }
""")
}
Modified: incubator/qpid/trunk/qpid/java/common/genutil.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/genutil.py?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/genutil.py (original)
+++ incubator/qpid/trunk/qpid/java/common/genutil.py Tue Aug 5 12:33:11 2008
@@ -170,18 +170,15 @@
if self.type_node.name == "struct":
self.read = "(%s) dec.readStruct(%s.TYPE)" % (tname, tname)
self.write = "enc.writeStruct(%s.TYPE, check(struct).%s)" % (tname, self.name)
- self.check = ""
self.coder = "Struct"
elif self.type_node.name == "domain":
self.coder = camel(0, self.prim_type["@name"])
self.read = "%s.get(dec.read%s())" % (tname, self.coder)
self.write = "enc.write%s(check(struct).%s.getValue())" % (self.coder, self.name)
- self.check = ""
else:
self.coder = camel(0, self.type_node["@name"])
self.read = "dec.read%s()" % self.coder
self.write = "enc.write%s(check(struct).%s)" % (self.coder, self.name)
- self.check = "Validator.check%s(value);" % self.coder
self.type = jtype(self.type_node)
self.default = DEFAULTS.get(self.type, "null")
self.has = camel(1, "has", self.name)
@@ -214,6 +211,9 @@
options = True
else:
params.append("%s %s" % (f.type, f.name))
+ if type["segments"]:
+ params.append("Header header")
+ params.append("ByteBuffer body")
if options or type.name in ("control", "command"):
params.append("Option ... _options")
return params
@@ -226,6 +226,9 @@
options = True
else:
args.append(f.name)
+ if type["segments"]:
+ args.append("header")
+ args.append("body")
if options or type.name in ("control", "command"):
args.append("_options")
return args
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java Tue Aug 5 12:33:11 2008
@@ -45,10 +45,6 @@
{
private ToyExchange exchange;
- private MessageTransfer xfr = null;
- private DeliveryProperties props = null;
- private Header header = null;
- private List<Data> body = null;
private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>();
public ToyBroker(ToyExchange exchange)
@@ -103,22 +99,10 @@
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
- this.xfr = xfr;
- body = new ArrayList<Data>();
- System.out.println("received transfer " + xfr.getDestination());
- }
-
- @Override public void header(Session ssn, Header header)
- {
- if (xfr == null || body == null)
- {
- ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR,
- "no method segment");
- ssn.close();
- return;
- }
-
- props = header.get(DeliveryProperties.class);
+ String dest = xfr.getDestination();
+ System.out.println("received transfer " + dest);
+ Header header = xfr.getHeader();
+ DeliveryProperties props = header.get(DeliveryProperties.class);
if (props != null)
{
System.out.println("received headers routing_key " + props.getRoutingKey());
@@ -130,67 +114,31 @@
System.out.println(mp.getApplicationHeaders());
}
- this.header = header;
- }
-
- @Override public void data(Session ssn, Data data)
- {
- if (xfr == null || body == null)
+ if (exchange.route(dest,props.getRoutingKey(),xfr))
{
- ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, "no method segment");
- ssn.close();
- return;
+ System.out.println("queued " + xfr);
+ dispatchMessages(ssn);
}
-
- body.add(data);
-
- if (data.isLast())
+ else
{
- String dest = xfr.getDestination();
- Message m = new Message(header, body);
- if (exchange.route(dest,props.getRoutingKey(),m))
+ if (props == null || !props.getDiscardUnroutable())
{
- System.out.println("queued " + m);
- dispatchMessages(ssn);
+ RangeSet ranges = new RangeSet();
+ ranges.add(xfr.getId());
+ ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
+ "no such destination");
}
- else
- {
-
- reject(ssn);
- }
- ssn.processed(xfr);
- xfr = null;
- body = null;
- }
- }
-
- private void reject(Session ssn)
- {
- if (props != null && props.getDiscardUnroutable())
- {
- return;
- }
- else
- {
- RangeSet ranges = new RangeSet();
- ranges.add(xfr.getId());
- ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
- "no such destination");
}
+ ssn.processed(xfr);
}
- private void transferMessageToPeer(Session ssn,String dest, Message m)
+ private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m)
{
System.out.println("\n==================> Transfering message to: " +dest + "\n");
- ssn.messageTransfer(dest, MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED);
- ssn.header(m.header);
- for (Data d : m.body)
- {
- ssn.data(d.getData());
- }
- ssn.endData();
+ ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED,
+ m.getHeader(), m.getBody());
}
private void dispatchMessages(Session ssn)
@@ -204,8 +152,8 @@
private void checkAndSendMessagesToConsumer(Session ssn,String dest)
{
Consumer c = consumers.get(dest);
- LinkedBlockingQueue<Message> queue = exchange.getQueue(c._queueName);
- Message m = queue.poll();
+ LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName);
+ MessageTransfer m = queue.poll();
while (m != null && c._credit>0)
{
transferMessageToPeer(ssn,dest,m);
@@ -214,43 +162,6 @@
}
}
- class Message
- {
- private final Header header;
- private final List<Data> body;
-
- public Message(Header header, List<Data> body)
- {
- this.header = header;
- this.body = body;
- }
-
- public String toString()
- {
- StringBuilder sb = new StringBuilder();
-
- if (header != null)
- {
- boolean first = true;
- for (Struct st : header.getStructs())
- {
- if (first) { first = false; }
- else { sb.append(" "); }
- sb.append(st);
- }
- }
-
- for (Data d : body)
- {
- sb.append(" | ");
- sb.append(d);
- }
-
- return sb.toString();
- }
-
- }
-
// ugly, but who cares :)
// assumes unit is always no of messages, not bytes
// assumes it's credit mode and not window
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java Tue Aug 5 12:33:11 2008
@@ -20,6 +20,7 @@
*/
package org.apache.qpid;
+import java.nio.*;
import java.util.*;
import org.apache.qpid.transport.*;
@@ -47,17 +48,9 @@
}
}
- @Override public void header(Session ssn, Header header)
+ @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
- for (Struct st : header.getStructs())
- {
- System.out.println("header: " + st);
- }
- }
-
- @Override public void data(Session ssn, Data data)
- {
- System.out.println("got data: " + data);
+ System.out.println("msg: " + xfr);
}
public static final void main(String[] args)
@@ -111,16 +104,16 @@
map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED);
- ssn.header(new DeliveryProperties(),
- new MessageProperties().setApplicationHeaders(map));
- ssn.data("this is the data");
- ssn.endData();
+ MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties(),
+ new MessageProperties()
+ .setApplicationHeaders(map)),
+ "this is the data");
ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED);
- ssn.data("this should be rejected");
- ssn.endData();
+ MessageAcquireMode.PRE_ACQUIRED,
+ null,
+ "this should be rejected");
ssn.sync();
Future<QueueQueryResult> future = ssn.queueQuery("asdf");
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java Tue Aug 5 12:33:11 2008
@@ -9,42 +9,43 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.qpid.ToyBroker.Message;
+import org.apache.qpid.transport.MessageTransfer;
+
public class ToyExchange
{
final static String DIRECT = "amq.direct";
final static String TOPIC = "amq.topic";
- private Map<String,List<LinkedBlockingQueue<Message>>> directEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>();
- private Map<String,List<LinkedBlockingQueue<Message>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>();
- private Map<String,LinkedBlockingQueue<Message>> queues = new HashMap<String,LinkedBlockingQueue<Message>>();
+ private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> directEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>();
+ private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>();
+ private Map<String,LinkedBlockingQueue<MessageTransfer>> queues = new HashMap<String,LinkedBlockingQueue<MessageTransfer>>();
public void createQueue(String name)
{
- queues.put(name, new LinkedBlockingQueue<Message>());
+ queues.put(name, new LinkedBlockingQueue<MessageTransfer>());
}
- public LinkedBlockingQueue<Message> getQueue(String name)
+ public LinkedBlockingQueue<MessageTransfer> getQueue(String name)
{
return queues.get(name);
}
public void bindQueue(String type,String binding,String queueName)
{
- LinkedBlockingQueue<Message> queue = queues.get(queueName);
+ LinkedBlockingQueue<MessageTransfer> queue = queues.get(queueName);
binding = normalizeKey(binding);
if(DIRECT.equals(type))
{
if (directEx.containsKey(binding))
{
- List<LinkedBlockingQueue<Message>> list = directEx.get(binding);
+ List<LinkedBlockingQueue<MessageTransfer>> list = directEx.get(binding);
list.add(queue);
}
else
{
- List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>();
+ List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
list.add(queue);
directEx.put(binding,list);
}
@@ -53,21 +54,21 @@
{
if (topicEx.containsKey(binding))
{
- List<LinkedBlockingQueue<Message>> list = topicEx.get(binding);
+ List<LinkedBlockingQueue<MessageTransfer>> list = topicEx.get(binding);
list.add(queue);
}
else
{
- List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>();
+ List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
list.add(queue);
topicEx.put(binding,list);
}
}
}
- public boolean route(String dest,String routingKey,Message msg)
+ public boolean route(String dest, String routingKey, MessageTransfer msg)
{
- List<LinkedBlockingQueue<Message>> queues;
+ List<LinkedBlockingQueue<MessageTransfer>> queues;
if(DIRECT.equals(dest))
{
queues = directEx.get(routingKey);
@@ -101,9 +102,9 @@
}
}
- private List<LinkedBlockingQueue<Message>> matchWildCard(String routingKey)
+ private List<LinkedBlockingQueue<MessageTransfer>> matchWildCard(String routingKey)
{
- List<LinkedBlockingQueue<Message>> selected = new ArrayList<LinkedBlockingQueue<Message>>();
+ List<LinkedBlockingQueue<MessageTransfer>> selected = new ArrayList<LinkedBlockingQueue<MessageTransfer>>();
for(String key: topicEx.keySet())
{
@@ -111,7 +112,7 @@
Matcher m = p.matcher(routingKey);
if (m.find())
{
- for(LinkedBlockingQueue<Message> queue : topicEx.get(key))
+ for(LinkedBlockingQueue<MessageTransfer> queue : topicEx.get(key))
{
selected.add(queue);
}
@@ -121,9 +122,9 @@
return selected;
}
- private void storeMessage(Message msg,List<LinkedBlockingQueue<Message>> selected)
+ private void storeMessage(MessageTransfer msg,List<LinkedBlockingQueue<MessageTransfer>> selected)
{
- for(LinkedBlockingQueue<Message> queue : selected)
+ for(LinkedBlockingQueue<MessageTransfer> queue : selected)
{
queue.offer(msg);
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Tue Aug 5 12:33:11 2008
@@ -418,9 +418,15 @@
return chars;
}
+ private String str = null;
+
public String asString()
{
- return new String(asChars());
+ if (str == null)
+ {
+ str = new String(asChars());
+ }
+ return str;
}
public boolean equals(Object o)
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java Tue Aug 5 12:33:11 2008
@@ -53,11 +53,6 @@
// session may be null
private Session session;
- private Lock commandLock = new ReentrantLock();
- private boolean first = true;
- private ByteBuffer data = null;
- private boolean batch = false;
-
public Channel(Connection connection, int channel, SessionDelegate delegate)
{
this.connection = connection;
@@ -105,16 +100,6 @@
method.delegate(session, sessionDelegate);
}
- public void header(Void v, Header header)
- {
- header.delegate(session, sessionDelegate);
- }
-
- public void data(Void v, Data data)
- {
- data.delegate(session, sessionDelegate);
- }
-
public void error(Void v, ProtocolError error)
{
throw new RuntimeException(error.getMessage());
@@ -157,62 +142,12 @@
public void method(Method m)
{
- if (m.getEncodedTrack() == Frame.L4)
- {
- commandLock.lock();
- }
-
emit(m);
- if (!m.isBatch() && !m.hasPayload())
- {
- connection.flush();
- }
-
- batch = m.isBatch();
-
- if (m.getEncodedTrack() == Frame.L4 && !m.hasPayload())
- {
- commandLock.unlock();
- }
- }
-
- public void header(Header header)
- {
- emit(header);
- }
-
- public void data(ByteBuffer buf)
- {
- if (data != null)
- {
- emit(new Data(data, first, false));
- first = false;
- }
-
- data = buf;
- }
-
- public void data(String str)
- {
- data(str.getBytes());
- }
-
- public void data(byte[] bytes)
- {
- data(ByteBuffer.wrap(bytes));
- }
-
- public void end()
- {
- emit(new Data(data, first, true));
- first = true;
- data = null;
- if (!batch)
+ if (!m.isBatch())
{
connection.flush();
}
- commandLock.unlock();
}
protected void invoke(Method m)
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java Tue Aug 5 12:33:11 2008
@@ -40,22 +40,6 @@
{
this.xfr = xfr;
ssn.invoke(xfr);
- }
-
- public void header(Session ssn, Header hdr)
- {
- ssn.header(hdr);
- }
-
- public void data(Session ssn, Data data)
- {
- ssn.data(data.getData());
- if (data.isLast())
- {
- ssn.endData();
- }
-
- // XXX: should be able to get command-id from any segment
ssn.processed(xfr);
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java Tue Aug 5 12:33:11 2008
@@ -22,6 +22,7 @@
import org.apache.qpid.transport.network.Frame;
+import java.util.Arrays;
import java.util.List;
import java.nio.ByteBuffer;
@@ -32,33 +33,25 @@
* @author Rafael H. Schloming
*/
-public class Header implements ProtocolEvent {
+public class Header {
private final List<Struct> structs;
- private ByteBuffer _buf;
- private boolean _noPayload;
- private int channel;
- public Header(List<Struct> structs, boolean lastframe)
+ public Header(List<Struct> structs)
{
this.structs = structs;
- _noPayload= lastframe;
}
- public List<Struct> getStructs()
+ public Header(Struct ... structs)
{
- return structs;
+ this(Arrays.asList(structs));
}
- public void setBuf(ByteBuffer buf)
+ public List<Struct> getStructs()
{
- _buf = buf;
+ return structs;
}
- public ByteBuffer getBuf()
- {
- return _buf;
- }
public <T> T get(Class<T> klass)
{
for (Struct st : structs)
@@ -72,36 +65,9 @@
return null;
}
- public final int getChannel()
- {
- return channel;
- }
-
- public final void setChannel(int channel)
- {
- this.channel = channel;
- }
-
- public byte getEncodedTrack()
- {
- return Frame.L4;
- }
-
- public <C> void delegate(C context, ProtocolDelegate<C> delegate)
- {
- delegate.header(context, this);
- }
-
- public boolean hasNoPayload()
- {
- return _noPayload;
- }
-
public String toString()
{
StringBuffer str = new StringBuffer();
- str.append("ch=");
- str.append(channel);
str.append(" Header(");
boolean first = true;
for (Struct s : structs)
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java Tue Aug 5 12:33:11 2008
@@ -22,6 +22,10 @@
import org.apache.qpid.transport.network.Frame;
+import java.nio.ByteBuffer;
+
+import static org.apache.qpid.transport.util.Functions.*;
+
/**
* Method
*
@@ -88,6 +92,26 @@
public abstract boolean hasPayload();
+ public Header getHeader()
+ {
+ return null;
+ }
+
+ public void setHeader(Header header)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public ByteBuffer getBody()
+ {
+ return null;
+ }
+
+ public void setBody(ByteBuffer body)
+ {
+ throw new UnsupportedOperationException();
+ }
+
public abstract byte getEncodedTrack();
public abstract <C> void dispatch(C context, MethodDelegate<C> delegate);
@@ -134,6 +158,21 @@
str.append(" ");
str.append(super.toString());
+ Header hdr = getHeader();
+ if (hdr != null)
+ {
+ for (Struct st : hdr.getStructs())
+ {
+ str.append("\n ");
+ str.append(st);
+ }
+ }
+ ByteBuffer body = getBody();
+ if (body != null)
+ {
+ str.append("\n body=");
+ str.append(str(body, 64));
+ }
return str.toString();
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java Tue Aug 5 12:33:11 2008
@@ -35,10 +35,6 @@
void command(C context, Method command);
- void header(C context, Header header);
-
- void data(C context, Data data);
-
void error(C context, ProtocolError error);
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Tue Aug 5 12:33:11 2008
@@ -36,6 +36,7 @@
import static org.apache.qpid.transport.Option.*;
import static org.apache.qpid.transport.util.Functions.*;
import static org.apache.qpid.util.Serial.*;
+import static org.apache.qpid.util.Strings.*;
/**
* Session
@@ -271,7 +272,7 @@
}
needSync = !m.isSync();
channel.method(m);
- if (autoSync && !m.hasPayload())
+ if (autoSync)
{
sync();
}
@@ -290,50 +291,6 @@
}
}
- public void header(Header header)
- {
- channel.header(header);
- }
-
- public Header header(List<Struct> structs)
- {
- Header res = new Header(structs, false);
- header(res);
- return res;
- }
-
- public Header header(Struct ... structs)
- {
- return header(Arrays.asList(structs));
- }
-
- public void data(ByteBuffer buf)
- {
- channel.data(buf);
- }
-
- public void data(String str)
- {
- channel.data(str);
- }
-
- public void data(byte[] bytes)
- {
- channel.data(bytes);
- }
-
- public void endData()
- {
- channel.end();
- synchronized (commands)
- {
- if (autoSync)
- {
- sync();
- }
- }
- }
-
public void sync()
{
sync(timeout);
@@ -501,6 +458,26 @@
}
+ public final void messageTransfer(String destination,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ Header header,
+ byte[] body,
+ Option ... _options) {
+ messageTransfer(destination, acceptMode, acquireMode, header,
+ ByteBuffer.wrap(body), _options);
+ }
+
+ public final void messageTransfer(String destination,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ Header header,
+ String body,
+ Option ... _options) {
+ messageTransfer(destination, acceptMode, acquireMode, header,
+ toUTF8(body), _options);
+ }
+
public void close()
{
sessionRequestTimeout(0);
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Tue Aug 5 12:33:11 2008
@@ -48,10 +48,6 @@
}
}
- public void header(Session ssn, Header header) { }
-
- public void data(Session ssn, Data data) { }
-
public void error(Session ssn, ProtocolError error) { }
@Override public void executionResult(Session ssn, ExecutionResult result)
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java Tue Aug 5 12:33:11 2008
@@ -35,24 +35,29 @@
{
private ByteBuffer out;
+ private int segment;
public BBEncoder(int capacity) {
out = ByteBuffer.allocate(capacity);
out.order(ByteOrder.BIG_ENDIAN);
+ segment = 0;
}
public void init()
{
out.clear();
+ segment = 0;
}
- public ByteBuffer done()
+ public ByteBuffer segment()
{
- out.flip();
- ByteBuffer encoded = ByteBuffer.allocate(out.remaining());
- encoded.put(out);
- encoded.flip();
- return encoded;
+ int pos = out.position();
+ out.position(segment);
+ ByteBuffer slice = out.slice();
+ slice.limit(pos - segment);
+ out.position(pos);
+ segment = pos;
+ return slice;
}
private void grow(int size)
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java Tue Aug 5 12:33:11 2008
@@ -154,7 +154,7 @@
public static final void checkMap(Map<String,Object> map)
{
- if (map == null)
+ if (map == null || map.isEmpty())
{
return;
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java Tue Aug 5 12:33:11 2008
@@ -30,7 +30,6 @@
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.transport.codec.Decoder;
-import org.apache.qpid.transport.Data;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolError;
@@ -51,6 +50,7 @@
private final Receiver<ProtocolEvent> receiver;
private final Map<Integer,List<Frame>> segments;
+ private final Method[] incomplete;
private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>()
{
public BBDecoder initialValue()
@@ -63,6 +63,7 @@
{
this.receiver = receiver;
segments = new HashMap<Integer,List<Frame>>();
+ incomplete = new Method[64*1024];
}
private int segmentKey(Frame frame)
@@ -97,11 +98,6 @@
receiver.received(event);
}
- private void emit(Frame frame, ProtocolEvent event)
- {
- emit(frame.getChannel(), event);
- }
-
public void received(NetworkEvent event)
{
event.delegate(this);
@@ -122,32 +118,18 @@
emit(0, header);
}
- public void frame(Frame frame)
- {
- switch (frame.getType())
- {
- case BODY:
- emit(frame, new Data(frame.getBody(), frame.isFirstFrame(),
- frame.isLastFrame()));
- break;
- default:
- assemble(frame);
- break;
- }
- }
-
public void error(ProtocolError error)
{
emit(0, error);
}
- private void assemble(Frame frame)
+ public void frame(Frame frame)
{
ByteBuffer segment;
if (frame.isFirstFrame() && frame.isLastFrame())
{
segment = frame.getBody();
- emit(frame, decode(frame, segment));
+ assemble(frame, segment);
}
else
{
@@ -179,38 +161,63 @@
segment.put(f.getBody());
}
segment.flip();
- emit(frame, decode(frame, segment));
+ assemble(frame, segment);
}
}
}
- private ProtocolEvent decode(Frame frame, ByteBuffer segment)
+ private void assemble(Frame frame, ByteBuffer segment)
{
BBDecoder dec = decoder.get();
dec.init(segment);
+ int channel = frame.getChannel();
+ Method command;
+
switch (frame.getType())
{
case CONTROL:
int controlType = dec.readUint16();
Method control = Method.create(controlType);
control.read(dec);
- return control;
+ emit(channel, control);
+ break;
case COMMAND:
int commandType = dec.readUint16();
// read in the session header, right now we don't use it
dec.readUint16();
- Method command = Method.create(commandType);
+ command = Method.create(commandType);
command.read(dec);
- return command;
+ if (command.hasPayload())
+ {
+ incomplete[channel] = command;
+ }
+ else
+ {
+ emit(channel, command);
+ }
+ break;
case HEADER:
+ command = incomplete[channel];
List<Struct> structs = new ArrayList();
while (dec.hasRemaining())
{
structs.add(dec.readStruct32());
}
- return new Header(structs, frame.isLastFrame() && frame.isLastSegment());
+ command.setHeader(new Header(structs));
+ if (frame.isLastSegment())
+ {
+ incomplete[channel] = null;
+ emit(channel, command);
+ }
+ break;
+ case BODY:
+ command = incomplete[channel];
+ command.setBody(segment);
+ incomplete[channel] = null;
+ emit(channel, command);
+ break;
default:
throw new IllegalStateException("unknown frame type: " + frame.getType());
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Tue Aug 5 12:33:11 2008
@@ -22,7 +22,6 @@
import org.apache.qpid.transport.codec.BBEncoder;
-import org.apache.qpid.transport.Data;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolDelegate;
@@ -34,7 +33,8 @@
import org.apache.qpid.transport.Struct;
import java.nio.ByteBuffer;
-import java.util.Iterator;
+import java.nio.ByteOrder;
+import java.util.List;
import static org.apache.qpid.transport.network.Frame.*;
@@ -46,12 +46,14 @@
*
*/
-public class Disassembler implements Sender<ProtocolEvent>,
- ProtocolDelegate<Void>
+public final class Disassembler implements Sender<ProtocolEvent>,
+ ProtocolDelegate<Void>
{
- private final Sender<NetworkEvent> sender;
+ private final Sender<ByteBuffer> sender;
private final int maxPayload;
+ private final ByteBuffer header;
+ private final Object sendlock = new Object();
private final ThreadLocal<BBEncoder> encoder = new ThreadLocal()
{
public BBEncoder initialValue()
@@ -60,7 +62,7 @@
}
};
- public Disassembler(Sender<NetworkEvent> sender, int maxFrame)
+ public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
{
if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
{
@@ -69,6 +71,8 @@
}
this.sender = sender;
this.maxPayload = maxFrame - HEADER_SIZE;
+ this.header = ByteBuffer.allocate(HEADER_SIZE);
+ this.header.order(ByteOrder.BIG_ENDIAN);
}
@@ -79,60 +83,80 @@
public void flush()
{
- sender.flush();
+ synchronized (sendlock)
+ {
+ sender.flush();
+ }
}
public void close()
{
- sender.close();
+ synchronized (sendlock)
+ {
+ sender.close();
+ }
+ }
+
+ private final void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
+ {
+ synchronized (sendlock)
+ {
+ header.put(0, flags);
+ header.put(1, type);
+ header.putShort(2, (short) (size + HEADER_SIZE));
+ header.put(5, track);
+ header.putShort(6, (short) channel);
+
+ header.rewind();
+
+ sender.send(header);
+
+ int limit = buf.limit();
+ buf.limit(buf.position() + size);
+ sender.send(buf);
+ buf.limit(limit);
+ }
}
private void fragment(byte flags, SegmentType type, ProtocolEvent event,
ByteBuffer buf, boolean first, boolean last)
{
+ byte typeb = (byte) type.getValue();
byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
- if(!buf.hasRemaining())
+ int remaining = buf.remaining();
+ while (true)
{
- //empty data
- byte nflags = flags;
+ int size = min(maxPayload, remaining);
+ remaining -= size;
+
+ byte newflags = flags;
if (first)
{
- nflags |= FIRST_FRAME;
+ newflags |= FIRST_FRAME;
first = false;
}
- nflags |= LAST_FRAME;
- Frame frame = new Frame(nflags, type, track, event.getChannel(), buf.slice());
- sender.send(frame);
- }
- else
- {
- while (buf.hasRemaining())
- {
- ByteBuffer slice = buf.slice();
- slice.limit(min(maxPayload, slice.remaining()));
- buf.position(buf.position() + slice.remaining());
-
- byte newflags = flags;
- if (first)
- {
- newflags |= FIRST_FRAME;
- first = false;
- }
- if (last && !buf.hasRemaining())
- {
- newflags |= LAST_FRAME;
- }
+ if (last && remaining == 0)
+ {
+ newflags |= LAST_FRAME;
+ }
+
+ frame(newflags, typeb, track, event.getChannel(), size, buf);
- Frame frame = new Frame(newflags, type, track, event.getChannel(), slice);
- sender.send(frame);
+ if (remaining == 0)
+ {
+ break;
}
}
}
public void init(Void v, ProtocolHeader header)
{
- sender.send(header);
+ synchronized (sendlock)
+ {
+ sender.send(header.toByteBuffer());
+ sender.flush();
+ }
}
public void control(Void v, Method method)
@@ -170,48 +194,43 @@
}
}
method.write(enc);
- ByteBuffer buf = enc.done();
+ ByteBuffer methodSeg = enc.segment();
byte flags = FIRST_SEG;
- if (!method.hasPayload())
+ boolean payload = method.hasPayload();
+ if (!payload)
{
flags |= LAST_SEG;
}
- fragment(flags, type, method, buf, true, true);
- }
-
- public void header(Void v, Header header)
- {
- ByteBuffer buf;
- if (header.getBuf() == null)
+ ByteBuffer headerSeg = null;
+ if (payload)
{
- BBEncoder enc = encoder.get();
- enc.init();
- for (Struct st : header.getStructs())
+ final Header hdr = method.getHeader();
+ final List<Struct> structs = hdr.getStructs();
+ final int nstructs = structs.size();
+ for (int i = 0; i < nstructs; i++)
{
- enc.writeStruct32(st);
+ enc.writeStruct32(structs.get(i));
}
- buf = enc.done();
- header.setBuf(buf);
+ headerSeg = enc.segment();
}
- else
+
+ synchronized (sendlock)
{
- buf = header.getBuf();
- buf.flip();
+ fragment(flags, type, method, methodSeg, true, true);
+ if (payload)
+ {
+ fragment((byte) 0x0, SegmentType.HEADER, method, headerSeg, true, true);
+ fragment(LAST_SEG, SegmentType.BODY, method, method.getBody(), true, true);
+ }
}
- fragment((byte) 0x0, SegmentType.HEADER, header, buf, true, true);
- }
-
- public void data(Void v, Data data)
- {
- fragment(LAST_SEG, SegmentType.BODY, data, data.getData(), data.isFirst(), data.isLast());
}
public void error(Void v, ProtocolError error)
{
- sender.send(error);
+ throw new IllegalArgumentException("" + error);
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Tue Aug 5 12:33:11 2008
@@ -23,7 +23,6 @@
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
@@ -48,8 +47,9 @@
private final OutputStream out;
private final byte[] buffer;
- private final AtomicInteger head = new AtomicInteger(START);
- private final AtomicInteger tail = new AtomicInteger(START);
+ private volatile int head = START;
+ private volatile int tail = START;
+ private volatile boolean idle = true;
private final Object notFull = new Object();
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -96,16 +96,17 @@
while (remaining > 0)
{
- final int hd = head.get();
- final int tl = tail.get();
+ final int hd = head;
+ final int tl = tail;
if (hd - tl >= size)
{
+ flush();
synchronized (notFull)
{
long start = System.currentTimeMillis();
long elapsed = 0;
- while (head.get() - tail.get() >= size && elapsed < timeout)
+ while (head - tail >= size && elapsed < timeout)
{
try
{
@@ -118,9 +119,9 @@
elapsed += System.currentTimeMillis() - start;
}
- if (head.get() - tail.get() >= size)
+ if (head - tail >= size)
{
- throw new TransportException(String.format("write timed out: %s, %s", head.get(), tail.get()));
+ throw new TransportException(String.format("write timed out: %s, %s", head, tail));
}
}
continue;
@@ -140,21 +141,20 @@
}
buf.get(buffer, hd_idx, length);
- head.getAndAdd(length);
- if (hd == tail.get())
- {
- synchronized (notEmpty)
- {
- notEmpty.notify();
- }
- }
+ head += length;
remaining -= length;
}
}
public void flush()
{
- // pass
+ if (idle)
+ {
+ synchronized (notEmpty)
+ {
+ notEmpty.notify();
+ }
+ }
}
public void close()
@@ -206,8 +206,8 @@
while (true)
{
- final int hd = head.get();
- final int tl = tail.get();
+ final int hd = head;
+ final int tl = tail;
if (hd == tl)
{
@@ -216,9 +216,11 @@
break;
}
+ idle = true;
+
synchronized (notEmpty)
{
- while (head.get() == tail.get() && !closed.get())
+ while (head == tail && !closed.get())
{
try
{
@@ -231,6 +233,8 @@
}
}
+ idle = false;
+
continue;
}
@@ -258,8 +262,8 @@
close(false);
break;
}
- tail.getAndAdd(length);
- if (head.get() - tl >= size)
+ tail += length;
+ if (head - tl >= size)
{
synchronized (notFull)
{
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java Tue Aug 5 12:33:11 2008
@@ -33,7 +33,6 @@
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.OutputHandler;
import org.apache.qpid.transport.util.Logger;
/**
@@ -48,6 +47,14 @@
public final class IoTransport
{
+ static
+ {
+ org.apache.mina.common.ByteBuffer.setAllocator
+ (new org.apache.mina.common.SimpleByteBufferAllocator());
+ org.apache.mina.common.ByteBuffer.setUseDirectBuffers
+ (Boolean.getBoolean("amqj.enableDirectBuffers"));
+ }
+
private static final Logger log = Logger.get(IoTransport.class);
private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024;
@@ -104,8 +111,7 @@
sender = new IoSender(this, 2*writeBufferSize, timeout);
Connection conn = new Connection
- (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
- delegate);
+ (new Disassembler(sender, 64*1024 - 1), delegate);
receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
2*readBufferSize, timeout);
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java Tue Aug 5 12:33:11 2008
@@ -44,7 +44,6 @@
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.OutputHandler;
import static org.apache.qpid.transport.util.Functions.*;
@@ -292,7 +291,7 @@
{
// XXX: hardcoded max-frame
return new Connection
- (new Disassembler(new OutputHandler(sender), MAX_FRAME_SIZE), delegate);
+ (new Disassembler(sender, MAX_FRAME_SIZE), delegate);
}
public Receiver<java.nio.ByteBuffer> receiver(Connection conn)
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java Tue Aug 5 12:33:11 2008
@@ -17,7 +17,6 @@
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.OutputHandler;
public class NioHandler implements Runnable
{
@@ -68,8 +67,7 @@
NioSender sender = new NioSender(_ch);
Connection con = new Connection
- (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
- delegate);
+ (new Disassembler(sender, 64*1024 - 1), delegate);
con.setConnectionId(_count.incrementAndGet());
_handlers.put(con.getConnectionId(),sender);