You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/07/09 15:26:56 UTC
svn commit: r675165 [1/2] - in /incubator/qpid/trunk/qpid/java:
client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpidity/nclient/ client/src/main/java/...
Author: rhs
Date: Wed Jul 9 06:26:54 2008
New Revision: 675165
URL: http://svn.apache.org/viewvc?rev=675165&view=rev
Log:
Primarily profiling driven changes:
- added batched writes of commands/controls issued on a session
- copy fragmented frames and segments rather than trying to decode
them piecemeal, removed FragmentDecoder
- added caching for str8 encode/decode
- compute sizes as we encode by going back and filling in the amount
of bytes written rather than computing it up front
- added SYNC option to commands
- renamed NO_OPTION argument to NONE
- added a timeout to Client.java
- removed use of UUID.fromString in BasicMessageProducer_0_10.java
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binary.java (with props)
Removed:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java
Modified:
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java
incubator/qpid/trunk/qpid/java/common/Composite.tpl
incubator/qpid/trunk/qpid/java/common/Invoker.tpl
incubator/qpid/trunk/qpid/java/common/Option.tpl
incubator/qpid/trunk/qpid/java/common/genutil.py
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java
Modified: incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java Wed Jul 9 06:26:54 2008
@@ -55,7 +55,7 @@
Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
new MessagePartListenerAdapter(this),
- null, Option.NO_OPTION);
+ null, Option.NONE);
// issue credits
// XXX: need to be able to set to null
session.messageFlow(queueName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Jul 9 06:26:54 2008
@@ -250,9 +250,9 @@
public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException
{
- getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NO_OPTION,
- autoDelete ? Option.AUTO_DELETE : Option.NO_OPTION,
- exclusive ? Option.EXCLUSIVE : Option.NO_OPTION);
+ getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NONE,
+ autoDelete ? Option.AUTO_DELETE : Option.NONE,
+ exclusive ? Option.EXCLUSIVE : Option.NONE);
// We need to sync so that we get notify of an error.
getQpidSession().sync();
getCurrentException();
@@ -387,7 +387,7 @@
getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED,
preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
- consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
+ consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
catch (JMSException e)
{
@@ -477,9 +477,9 @@
arguments.put("no-local", true);
}
getQpidSession().queueDeclare(res.toString(), null, arguments,
- amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NO_OPTION,
- amqd.isDurable() ? Option.DURABLE : Option.NO_OPTION,
- !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
+ amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+ amqd.isDurable() ? Option.DURABLE : Option.NONE,
+ !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
// passive --> false
// We need to sync so that we get notify of an error.
getQpidSession().sync();
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Wed Jul 9 06:26:54 2008
@@ -453,19 +453,21 @@
}
}
+ UUID messageId = null;
if (_disableMessageId)
{
message.setJMSMessageID(null);
}
else
{
+ messageId = UUID.randomUUID();
StringBuilder b = new StringBuilder(39);
b.append("ID:");
- b.append(UUID.randomUUID());
+ b.append(messageId);
message.setJMSMessageID(b.toString());
}
- sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
+ sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
if (message != origMessage)
{
@@ -484,8 +486,8 @@
}
abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
- int deliveryMode, int priority, long timeToLive, boolean mandatory,
- boolean immediate, boolean wait)throws JMSException;
+ UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
+ boolean immediate, boolean wait) throws JMSException;
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Wed Jul 9 06:26:54 2008
@@ -68,8 +68,8 @@
* Sends a message to a given destination
*/
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
- int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate,
- boolean wait) throws JMSException
+ UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
+ boolean immediate, boolean wait) throws JMSException
{
message.prepareForSending();
if (message.get010Message() == null)
@@ -84,7 +84,16 @@
DeliveryProperties deliveryProp = message.get010Message().getDeliveryProperties();
MessageProperties messageProps = message.get010Message().getMessageProperties();
- // set the delivery properties
+
+ if (messageId != null)
+ {
+ messageProps.setMessageId(messageId);
+ }
+ else if (messageProps.hasMessageId())
+ {
+ messageProps.clearMessageId();
+ }
+
if (!_disableTimestamps)
{
final long currentTime = System.currentTimeMillis();
@@ -142,13 +151,6 @@
messageProps.setContentType(contentHeaderProperties.getContentType().toString());
messageProps.setContentLength(message.getContentLength());
- // XXX: fixme
- String mid = message.getJMSMessageID();
- if( mid != null )
- {
- messageProps.setMessageId(UUID.fromString(mid.substring(3)));
- }
-
AMQShortString correlationID = contentHeaderProperties.getCorrelationId();
if (correlationID != null)
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Wed Jul 9 06:26:54 2008
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import java.util.UUID;
+
import javax.jms.JMSException;
import javax.jms.Message;
@@ -65,9 +67,9 @@
_protocolHandler.writeFrame(declare);
}
- void sendMessage(AMQDestination destination, Message origMessage,AbstractJMSMessage message,
- int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate,
- boolean wait) throws JMSException
+ void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
+ UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
+ boolean immediate, boolean wait) throws JMSException
{
BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
destination.getExchangeName(),
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java Wed Jul 9 06:26:54 2008
@@ -76,7 +76,7 @@
_logger.debug("commit tx branch with xid: ", xid);
}
Future<XaResult> future =
- _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NO_OPTION);
+ _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NONE);
// now wait on the future for the result
XaResult result = null;
@@ -129,8 +129,8 @@
}
Future<XaResult> future = _xaSession.getQpidSession()
.dtxEnd(convertXid(xid),
- flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
- flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION);
+ flag == XAResource.TMFAIL ? Option.FAIL : Option.NONE,
+ flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NONE);
// now wait on the future for the result
XaResult result = null;
try
@@ -400,8 +400,8 @@
}
Future<XaResult> future = _xaSession.getQpidSession()
.dtxStart(convertXid(xid),
- flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
- flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
+ flag == XAResource.TMJOIN ? Option.JOIN : Option.NONE,
+ flag == XAResource.TMRESUME ? Option.RESUME : Option.NONE);
// now wait on the future for the result
XaResult result = null;
try
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java Wed Jul 9 06:26:54 2008
@@ -25,7 +25,6 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.TimeUnit;
import org.apache.qpid.client.url.URLParser_0_10;
import org.apache.qpid.jms.BrokerDetails;
@@ -60,6 +59,7 @@
private static Logger _logger = LoggerFactory.getLogger(Client.class);
private Condition closeOk;
private boolean closed = false;
+ private long timeout = 60000;
/**
*
@@ -191,7 +191,7 @@
try
{
- negotiationComplete.await();
+ negotiationComplete.await(timeout, TimeUnit.MILLISECONDS);
if( connectionDelegate.getUnsupportedProtocol() != null )
{
_conn.close();
@@ -202,7 +202,7 @@
}
catch (InterruptedException e)
{
- //
+ throw new RuntimeException(e);
}
finally
{
@@ -257,7 +257,6 @@
{
try
{
- long timeout = 60000;
long start = System.currentTimeMillis();
long elapsed = 0;
while (!closed && elapsed < timeout)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java Wed Jul 9 06:26:54 2008
@@ -83,7 +83,7 @@
*
* @param xid Specifies the xid of the transaction branch to be forgotten.
*/
- public void dtxForget(Xid xid);
+ public void dtxForget(Xid xid, Option ... options);
/**
* This method obtains the current transaction timeout value in seconds. If set-timeout was not
@@ -93,7 +93,7 @@
* @param xid Specifies the xid of the transaction branch used for getting the timeout.
* @return The current transaction timeout value in seconds.
*/
- public Future<GetTimeoutResult> dtxGetTimeout(Xid xid);
+ public Future<GetTimeoutResult> dtxGetTimeout(Xid xid, Option ... options);
/**
* This method prepares any message produced or consumed on behalf of xid, ready for commitment.
@@ -109,14 +109,14 @@
* <p/>
* xa-rbtimeout: The work represented by this transaction branch took too long.
*/
- public Future<XaResult> dtxPrepare(Xid xid);
+ public Future<XaResult> dtxPrepare(Xid xid, Option ... options);
/**
* This method is called to obtain a list of transaction branches that are in a prepared or
* heuristically completed state.
* @return a array of xids to be recovered.
*/
- public Future<RecoverResult> dtxRecover();
+ public Future<RecoverResult> dtxRecover(Option ... options);
/**
* This method rolls back the work associated with xid. Any produced messages are discarded and
@@ -125,7 +125,7 @@
* @param xid Specifies the xid of the transaction branch to be rolled back.
* @return Confirms to the client that the transaction branch is rolled back or specifies the error condition.
*/
- public Future<XaResult> dtxRollback(Xid xid);
+ public Future<XaResult> dtxRollback(Xid xid, Option ... options);
/**
* Sets the specified transaction branch timeout value in seconds.
@@ -133,5 +133,5 @@
* @param xid Specifies the xid of the transaction branch for setting the timeout.
* @param timeout The transaction timeout value in seconds.
*/
- public void dtxSetTimeout(Xid xid, long timeout);
+ public void dtxSetTimeout(Xid xid, long timeout, Option ... options);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java Wed Jul 9 06:26:54 2008
@@ -65,9 +65,9 @@
public void close();
- public void sessionDetach(byte[] name);
+ public void sessionDetach(byte[] name, Option ... options);
- public void sessionRequestTimeout(long expiry);
+ public void sessionRequestTimeout(long expiry, Option ... options);
public byte[] getName();
@@ -153,7 +153,8 @@
*
* @param acquireMode Indicates whether or not the transferred message has been acquired.
*/
- public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode);
+ public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
+ Option ... options);
/**
* Make a set of headers to be sent together with a message
@@ -207,7 +208,7 @@
* <ul>
* <li>{@link Option#EXCLUSIVE}: <p> Requests exclusive subscription access, so that only this
* subscription can access the queue.
- * <li>{@link Option#NO_OPTION}: <p> This is an empty option, and has no effect.
+ * <li>{@link Option#NONE}: <p> This is an empty option, and has no effect.
* </ul>
*
* @param queue The queue that the receiver is receiving messages from.
@@ -230,7 +231,7 @@
* @param listener The listener for this destination. To transfer large messages
* use a {@link org.apache.qpidity.nclient.MessagePartListener}.
* @param options Set of options. Valid options are {{@link Option#EXCLUSIVE}
- * and {@link Option#NO_OPTION}.
+ * and {@link Option#NONE}.
* @param filter A set of filters for the subscription. The syntax and semantics of these filters varies
* according to the provider's implementation.
*/
@@ -246,7 +247,7 @@
*
* @param destination The destination to be cancelled.
*/
- public void messageCancel(String destination);
+ public void messageCancel(String destination, Option ... options);
/**
* Associate a message listener with a destination.
@@ -274,7 +275,7 @@
* @param mode <ul> <li>credit ({@link Session#MESSAGE_FLOW_MODE_CREDIT}): choose credit based flow control
* <li> window ({@link Session#MESSAGE_FLOW_MODE_WINDOW}): choose window based flow control</ul>
*/
- public void messageSetFlowMode(String destination, MessageFlowMode mode);
+ public void messageSetFlowMode(String destination, MessageFlowMode mode, Option ... options);
/**
@@ -295,7 +296,7 @@
* </ul>
* @param value Number of credits, a value of 0 indicates an infinite amount of credit.
*/
- public void messageFlow(String destination, MessageCreditUnit unit, long value);
+ public void messageFlow(String destination, MessageCreditUnit unit, long value, Option ... options);
/**
* Forces the broker to exhaust its credit supply.
@@ -304,7 +305,7 @@
*
* @param destination The destination on which the credit supply is to be exhausted.
*/
- public void messageFlush(String destination);
+ public void messageFlush(String destination, Option ... options);
/**
* On receipt of this method, the brokers set credit to zero for a given
@@ -314,7 +315,7 @@
*
* @param destination The destination on which to reset credit.
*/
- public void messageStop(String destination);
+ public void messageStop(String destination, Option ... options);
/**
* Acknowledge the receipt of a range of messages.
@@ -338,7 +339,7 @@
* failed).
* @param text String describing the reason for a message transfer rejection.
*/
- public void messageReject(RangeSet ranges, MessageRejectCode code, String text);
+ public void messageReject(RangeSet ranges, MessageRejectCode code, String text, Option ... options);
/**
* As it is possible that the broker does not manage to reject some messages, after completion of
@@ -367,7 +368,7 @@
* @param ranges Ranges of messages to be acquired.
* @return Indicates the acquired messages
*/
- public Future<Acquired> messageAcquire(RangeSet ranges);
+ public Future<Acquired> messageAcquire(RangeSet ranges, Option ... options);
/**
* Give up responsibility for processing ranges of messages.
@@ -384,21 +385,21 @@
/**
* Selects the session for local transaction support.
*/
- public void txSelect();
+ public void txSelect(Option ... options);
/**
* Commit the receipt and delivery of all messages exchanged by this session's resources.
*
* @throws IllegalStateException If this session is not transacted, an exception will be thrown.
*/
- public void txCommit() throws IllegalStateException;
+ public void txCommit(Option ... options) throws IllegalStateException;
/**
* Roll back the receipt and delivery of all messages exchanged by this session's resources.
*
* @throws IllegalStateException If this session is not transacted, an exception will be thrown.
*/
- public void txRollback() throws IllegalStateException;
+ public void txRollback(Option ... options) throws IllegalStateException;
//---------------------------------------------
// Queue methods
@@ -423,7 +424,7 @@
* declaring connection closes.
* <li> {@link Option#PASSIVE}: <p> If set, the server will not create the queue.
* This field allows the client to assert the presence of a queue without modifying the server state.
- * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+ * <li>{@link Option#NONE}: <p> Has no effect as it represents an �empty� option.
* </ul>
* <p>In the absence of a particular option, the defaul value is false for each option
*
@@ -435,7 +436,7 @@
* the queue. </ol>
* @param arguments Used for backward compatibility
* @param options Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
- * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NO_OPTION})
+ * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NONE})
* @see Option
*/
public void queueDeclare(String queueName, String alternateExchange, Map<String, Object> arguments,
@@ -456,7 +457,8 @@
* routing keys depends on the exchange implementation.
* @param arguments Used for backward compatibility
*/
- public void exchangeBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments);
+ public void exchangeBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments,
+ Option ... options);
/**
* Unbind a queue from an exchange.
@@ -465,7 +467,7 @@
* @param exchangeName The name of the exchange to unbind from.
* @param routingKey Specifies the routing key of the binding to unbind.
*/
- public void exchangeUnbind(String queueName, String exchangeName, String routingKey);
+ public void exchangeUnbind(String queueName, String exchangeName, String routingKey, Option ... options);
/**
* This method removes all messages from a queue. It does not cancel consumers. Purged messages
@@ -474,7 +476,7 @@
* @param queueName Specifies the name of the queue to purge. If the queue name is empty, refers to the
* current queue for the session, which is the last declared queue.
*/
- public void queuePurge(String queueName);
+ public void queuePurge(String queueName, Option ... options);
/**
* This method deletes a queue. When a queue is deleted any pending messages are sent to a
@@ -485,7 +487,7 @@
* <li> {@link Option#IF_EMPTY}: <p> If set, the server will only delete the queue if it has no messages.
* <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the queue if it has no consumers.
* If the queue has consumers the server does does not delete it but raises a channel exception instead.
- * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+ * <li>{@link Option#NONE}: <p> Has no effect as it represents an �empty� option.
* </ul>
* </p>
* <p/>
@@ -494,7 +496,7 @@
* @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the
* current queue for the session, which is the last declared queue.
* @param options Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED}
- * and {@link Option#NO_OPTION})
+ * and {@link Option#NONE})
* @see Option
*/
public void queueDelete(String queueName, Option... options);
@@ -506,7 +508,7 @@
* @param queueName The name of the queue for which information is requested.
* @return Information on the specified queue.
*/
- public Future<QueueQueryResult> queueQuery(String queueName);
+ public Future<QueueQueryResult> queueQuery(String queueName, Option ... options);
/**
@@ -519,7 +521,7 @@
* @return Information on the specified binding.
*/
public Future<ExchangeBoundResult> exchangeBound(String exchange, String queue, String routingKey,
- Map<String, Object> arguments);
+ Map<String, Object> arguments, Option ... options);
// --------------------------------------
// exhcange methods
@@ -536,7 +538,7 @@
* exchanges) are purged when a server restarts.
* <li>{@link Option#PASSIVE}: <p>If set, the server will not create the exchange.
* The client can use this to check whether an exchange exists without modifying the server state.
- * <li> {@link Option#NO_OPTION}: <p>This option is an empty option, and has no effect.
+ * <li> {@link Option#NONE}: <p>This option is an empty option, and has no effect.
* </ul>
* <p>In the absence of a particular option, the defaul value is false for each option</p>
*
@@ -548,7 +550,7 @@
* @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which
* the message will be sent.
* @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
- * {@link Option#PASSIVE}, {@link Option#NO_OPTION})
+ * {@link Option#PASSIVE}, {@link Option#NONE})
* @param arguments Used for backward compatibility
* @see Option
*/
@@ -563,12 +565,12 @@
* <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the exchange if it has no queue bindings. If the
* exchange has queue bindings the server does not delete it but raises a channel exception
* instead.
- * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an empty option.
+ * <li> {@link Option#NONE}: <p> Has no effect as it represents an empty option.
* </ul>
* <p>Note that if an option is not set, it will default to false.
*
* @param exchangeName The name of exchange to be deleted.
- * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NO_OPTION}.
+ * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NONE}.
* @see Option
*/
public void exchangeDelete(String exchangeName, Option... options);
@@ -581,7 +583,7 @@
* return information about the default exchange.
* @return Information on the specified exchange.
*/
- public Future<ExchangeQueryResult> exchangeQuery(String exchangeName);
+ public Future<ExchangeQueryResult> exchangeQuery(String exchangeName, Option ... options);
/**
* If the session receives a sessionClosed with an error code it
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java Wed Jul 9 06:26:54 2008
@@ -17,6 +17,8 @@
import org.apache.qpidity.transport.Range;
import org.apache.qpidity.transport.RangeSet;
+import static org.apache.qpidity.transport.Option.*;
+
/**
* Implements a Qpid Sesion.
*/
@@ -66,8 +68,8 @@
{
super.processed(range);
}
- super.flushProcessed();
- if( accept )
+ super.flushProcessed(accept ? BATCH : NONE);
+ if (accept)
{
messageAccept(ranges);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java Wed Jul 9 06:26:54 2008
@@ -32,15 +32,11 @@
// --------------------------------------------
@Override public void data(Session ssn, Data data)
{
- for (ByteBuffer b : data.getFragments())
- {
- _currentMessageListener.data(b);
- }
+ _currentMessageListener.data(data.getData());
if (data.isLast())
{
_currentMessageListener.messageReceived();
}
-
}
@Override public void header(Session ssn, Header header)
Modified: incubator/qpid/trunk/qpid/java/common/Composite.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Composite.tpl?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Composite.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/Composite.tpl Wed Jul 9 06:26:54 2008
@@ -80,7 +80,7 @@
out(" private $(PACK_TYPES[pack]) packing_flags = 0;\n");
fields = get_fields(type)
-params = get_parameters(fields)
+params = get_parameters(type, fields)
options = get_options(fields)
for f in fields:
@@ -99,7 +99,7 @@
if f.option: continue
out(" $(f.set)($(f.name));\n")
-if options:
+if options or base == "Method":
out("""
for (int i=0; i < _options.length; i++) {
switch (_options[i]) {
@@ -108,7 +108,11 @@
for f in options:
out(" case $(f.option): packing_flags |= $(f.flag_mask(pack)); break;\n")
- out(""" case NO_OPTION: break;
+ if base == "Method":
+ out(""" case SYNC: this.setSync(true); break;
+ case BATCH: this.setBatch(true); break;
+""")
+ out(""" case NONE: break;
default: throw new IllegalArgumentException("invalid option: " + _options[i]);
}
}
Modified: incubator/qpid/trunk/qpid/java/common/Invoker.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Invoker.tpl?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Invoker.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/Invoker.tpl Wed Jul 9 06:26:54 2008
@@ -15,8 +15,8 @@
for c in composites:
name = cname(c)
fields = get_fields(c)
- params = get_parameters(fields)
- args = get_arguments(fields)
+ params = get_parameters(c, fields)
+ args = get_arguments(c, fields)
result = c["result"]
if result:
if not result["@type"]:
@@ -32,7 +32,7 @@
jclass = ""
out("""
- public $jresult $(dromedary(name))($(", ".join(params))) {
+ public final $jresult $(dromedary(name))($(", ".join(params))) {
$(jreturn)invoke(new $name($(", ".join(args)))$jclass);
}
""")
Modified: incubator/qpid/trunk/qpid/java/common/Option.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Option.tpl?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Option.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/Option.tpl Wed Jul 9 06:26:54 2008
@@ -15,5 +15,6 @@
if not options.has_key(option):
options[option] = None
out(" $option,\n")}
- NO_OPTION
+ BATCH,
+ NONE
}
Modified: incubator/qpid/trunk/qpid/java/common/genutil.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/genutil.py?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/genutil.py (original)
+++ incubator/qpid/trunk/qpid/java/common/genutil.py Wed Jul 9 06:26:54 2008
@@ -206,7 +206,7 @@
index += 1
return fields
-def get_parameters(fields):
+def get_parameters(type, fields):
params = []
options = False
for f in fields:
@@ -214,11 +214,11 @@
options = True
else:
params.append("%s %s" % (f.type, f.name))
- if options:
+ if options or type.name in ("control", "command"):
params.append("Option ... _options")
return params
-def get_arguments(fields):
+def get_arguments(type, fields):
args = []
options = False
for f in fields:
@@ -226,7 +226,7 @@
options = True
else:
args.append(f.name)
- if options:
+ if options or type.name in ("control", "command"):
args.append("_options")
return args
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java Wed Jul 9 06:26:54 2008
@@ -41,6 +41,11 @@
System.out.println(str(buf));
}
+ public void flush()
+ {
+ // pass
+ }
+
public void close()
{
System.out.println("CLOSED");
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java Wed Jul 9 06:26:54 2008
@@ -188,10 +188,7 @@
ssn.header(m.header);
for (Data d : m.body)
{
- for (ByteBuffer b : d.getFragments())
- {
- ssn.data(b);
- }
+ ssn.data(d.getData());
}
ssn.endData();
}
@@ -245,11 +242,8 @@
for (Data d : body)
{
- for (ByteBuffer b : d.getFragments())
- {
- sb.append(" | ");
- sb.append(str(b));
- }
+ sb.append(" | ");
+ sb.append(d);
}
return sb.toString();
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binary.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binary.java?rev=675165&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binary.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binary.java Wed Jul 9 06:26:54 2008
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * Binary
+ *
+ */
+
+public final class Binary
+{
+
+ private byte[] bytes;
+ private int offset;
+ private int size;
+ private int hash = 0;
+
+ public Binary(byte[] bytes, int offset, int size)
+ {
+ if (offset + size > bytes.length)
+ {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ this.bytes = bytes;
+ this.offset = offset;
+ this.size = size;
+ }
+
+ public Binary(byte[] bytes)
+ {
+ this(bytes, 0, bytes.length);
+ }
+
+ public final byte[] array()
+ {
+ return bytes;
+ }
+
+ public final int offset()
+ {
+ return offset;
+ }
+
+ public final int size()
+ {
+ return size;
+ }
+
+ public final Binary slice(int low, int high)
+ {
+ int sz;
+
+ if (high < 0)
+ {
+ sz = size + high;
+ }
+ else
+ {
+ sz = high - low;
+ }
+
+ if (sz < 0)
+ {
+ sz = 0;
+ }
+
+ return new Binary(bytes, offset + low, sz);
+ }
+
+ public final int hashCode()
+ {
+ if (hash == 0)
+ {
+ int hc = 0;
+ for (int i = 0; i < size; i++)
+ {
+ hc = 31*hc + (0xFF & bytes[offset + i]);
+ }
+ hash = hc;
+ }
+
+ return hash;
+ }
+
+ public final boolean equals(Object o)
+ {
+ if (!(o instanceof Binary))
+ {
+ return false;
+ }
+
+ Binary buf = (Binary) o;
+ if (this.size != buf.size)
+ {
+ return false;
+ }
+
+ for (int i = 0; i < size; i++)
+ {
+ if (bytes[offset + i] != buf.bytes[buf.offset + i])
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binary.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java Wed Jul 9 06:26:54 2008
@@ -56,6 +56,7 @@
private Lock commandLock = new ReentrantLock();
private boolean first = true;
private ByteBuffer data = null;
+ private boolean batch = false;
public Channel(Connection connection, int channel, SessionDelegate delegate)
{
@@ -162,6 +163,13 @@
emit(m);
+ if (!m.isBatch() && !m.hasPayload())
+ {
+ connection.flush();
+ }
+
+ batch = m.isBatch();
+
if (m.getEncodedTrack() == Frame.L4 && !m.hasPayload())
{
commandLock.unlock();
@@ -199,6 +207,10 @@
emit(new Data(data, first, true));
first = true;
data = null;
+ if (!batch)
+ {
+ connection.flush();
+ }
commandLock.unlock();
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java Wed Jul 9 06:26:54 2008
@@ -40,7 +40,6 @@
* short instead of Short
*/
-// RA making this public until we sort out the package issues
public class Connection
implements Receiver<ConnectionEvent>, Sender<ConnectionEvent>
{
@@ -90,6 +89,12 @@
sender.send(event);
}
+ public void flush()
+ {
+ log.debug("FLUSH: [%s]", this);
+ sender.flush();
+ }
+
public int getChannelMax()
{
return channelMax;
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java Wed Jul 9 06:26:54 2008
@@ -37,25 +37,20 @@
public class Data implements ProtocolEvent
{
- private final Iterable<ByteBuffer> fragments;
+ private final ByteBuffer data;
private final boolean first;
private final boolean last;
- public Data(Iterable<ByteBuffer> fragments, boolean first, boolean last)
+ public Data(ByteBuffer data, boolean first, boolean last)
{
- this.fragments = fragments;
+ this.data = data;
this.first = first;
this.last = last;
}
- public Data(ByteBuffer buf, boolean first, boolean last)
+ public ByteBuffer getData()
{
- this(Collections.singletonList(buf), first, last);
- }
-
- public Iterable<ByteBuffer> getFragments()
- {
- return fragments;
+ return data.slice();
}
public boolean isFirst()
@@ -82,25 +77,7 @@
{
StringBuffer str = new StringBuffer();
str.append("Data(");
- boolean first = true;
- int left = 64;
- for (ByteBuffer buf : getFragments())
- {
- if (first)
- {
- first = false;
- }
- else
- {
- str.append(" | ");
- }
- str.append(str(buf, left));
- left -= buf.remaining();
- if (left < 0)
- {
- break;
- }
- }
+ str.append(str(data, 64));
str.append(")");
return str.toString();
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java Wed Jul 9 06:26:54 2008
@@ -49,10 +49,7 @@
public void data(Session ssn, Data data)
{
- for (ByteBuffer buf : data.getFragments())
- {
- ssn.data(buf);
- }
+ ssn.data(data.getData());
if (data.isLast())
{
ssn.endData();
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java Wed Jul 9 06:26:54 2008
@@ -42,6 +42,7 @@
private int id;
private boolean idSet = false;
private boolean sync = false;
+ private boolean batch = false;
public final int getId()
{
@@ -59,11 +60,21 @@
return sync;
}
- void setSync(boolean value)
+ final void setSync(boolean value)
{
this.sync = value;
}
+ public final boolean isBatch()
+ {
+ return batch;
+ }
+
+ final void setBatch(boolean value)
+ {
+ this.batch = value;
+ }
+
public abstract boolean hasPayload();
public abstract byte getEncodedTrack();
@@ -84,26 +95,30 @@
public String toString()
{
- if (getEncodedTrack() != Frame.L4)
- {
- return super.toString();
- }
-
StringBuilder str = new StringBuilder();
- if (idSet)
+ if (getEncodedTrack() == Frame.L4 && idSet)
{
str.append("id=");
str.append(id);
}
- if (sync)
+ if (sync || batch)
{
if (str.length() > 0)
{
str.append(" ");
}
- str.append(" [sync]");
+ str.append("[");
+ if (sync)
+ {
+ str.append("S");
+ }
+ if (batch)
+ {
+ str.append("B");
+ }
+ str.append("]");
}
if (str.length() > 0)
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java Wed Jul 9 06:26:54 2008
@@ -31,6 +31,8 @@
void send(T msg);
+ void flush();
+
void close();
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java Wed Jul 9 06:26:54 2008
@@ -130,7 +130,7 @@
log.debug("ID: [%s] %s", this.channel, id);
if ((id % 65536) == 0)
{
- flushProcessed(true);
+ flushProcessed(TIMELY_REPLY);
}
}
@@ -166,19 +166,14 @@
}
}
- public void flushProcessed()
- {
- flushProcessed(false);
- }
-
- private void flushProcessed(boolean timely_reply)
+ public void flushProcessed(Option ... options)
{
RangeSet copy;
synchronized (processedLock)
{
copy = processed.copy();
}
- sessionCompleted(copy, timely_reply ? TIMELY_REPLY : NO_OPTION);
+ sessionCompleted(copy, options);
}
void knownComplete(RangeSet kc)
@@ -353,9 +348,7 @@
if (needSync && lt(maxComplete, point))
{
- ExecutionSync sync = new ExecutionSync();
- sync.setSync(true);
- invoke(sync);
+ executionSync(SYNC);
}
long start = System.currentTimeMillis();
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java Wed Jul 9 06:26:54 2008
@@ -29,6 +29,7 @@
import java.util.Map;
import java.util.UUID;
+import org.apache.qpidity.transport.Binary;
import org.apache.qpidity.transport.RangeSet;
import org.apache.qpidity.transport.Struct;
import org.apache.qpidity.transport.Type;
@@ -45,6 +46,14 @@
abstract class AbstractDecoder implements Decoder
{
+ private final Map<Binary,String> str8cache = new LinkedHashMap<Binary,String>()
+ {
+ @Override protected boolean removeEldestEntry(Map.Entry<Binary,String> me)
+ {
+ return size() > 4*1024;
+ }
+ };
+
protected abstract byte doGet();
protected abstract void doGet(byte[] bytes);
@@ -59,6 +68,13 @@
doGet(bytes);
}
+ protected Binary get(int size)
+ {
+ byte[] bytes = new byte[size];
+ get(bytes);
+ return new Binary(bytes);
+ }
+
protected short uget()
{
return (short) (0xFF & get());
@@ -105,11 +121,11 @@
return readUint64();
}
- private static final String decode(byte[] bytes, String charset)
+ private static final String decode(byte[] bytes, int offset, int length, String charset)
{
try
{
- return new String(bytes, charset);
+ return new String(bytes, offset, length, charset);
}
catch (UnsupportedEncodingException e)
{
@@ -117,13 +133,22 @@
}
}
+ private static final String decode(byte[] bytes, String charset)
+ {
+ return decode(bytes, 0, bytes.length, charset);
+ }
public String readStr8()
{
short size = readUint8();
- byte[] bytes = new byte[size];
- get(bytes);
- return decode(bytes, "UTF-8");
+ Binary bin = get(size);
+ String str = str8cache.get(bin);
+ if (str == null)
+ {
+ str = decode(bin.array(), bin.offset(), bin.size(), "UTF-8");
+ str8cache.put(bin, str);
+ }
+ return str;
}
public String readStr16()
@@ -233,7 +258,19 @@
public Map<String,Object> readMap()
{
long size = readUint32();
+
+ if (size == 0)
+ {
+ return null;
+ }
+
long count = readUint32();
+
+ if (count == 0)
+ {
+ return Collections.EMPTY_MAP;
+ }
+
Map<String,Object> result = new LinkedHashMap();
for (int i = 0; i < count; i++)
{
@@ -243,13 +280,26 @@
Object value = read(t);
result.put(key, value);
}
+
return result;
}
public List<Object> readList()
{
long size = readUint32();
+
+ if (size == 0)
+ {
+ return null;
+ }
+
long count = readUint32();
+
+ if (count == 0)
+ {
+ return Collections.EMPTY_LIST;
+ }
+
List<Object> result = new ArrayList();
for (int i = 0; i < count; i++)
{
@@ -264,15 +314,21 @@
public List<Object> readArray()
{
long size = readUint32();
+
if (size == 0)
{
- return Collections.EMPTY_LIST;
+ return null;
}
byte code = get();
Type t = getType(code);
long count = readUint32();
+ if (count == 0)
+ {
+ return Collections.EMPTY_LIST;
+ }
+
List<Object> result = new ArrayList<Object>();
for (int i = 0; i < count; i++)
{
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java Wed Jul 9 06:26:54 2008
@@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -64,10 +65,13 @@
ENCODINGS.put(byte[].class, Type.VBIN32);
}
- protected Sizer sizer()
+ private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>()
{
- return new SizeEncoder();
- }
+ @Override protected boolean removeEldestEntry(Map.Entry<String,byte[]> me)
+ {
+ return size() > 4*1024;
+ }
+ };
protected abstract void doPut(byte b);
@@ -88,6 +92,15 @@
put(ByteBuffer.wrap(bytes));
}
+ protected abstract int beginSize8();
+ protected abstract void endSize8(int pos);
+
+ protected abstract int beginSize16();
+ protected abstract void endSize16(int pos);
+
+ protected abstract int beginSize32();
+ protected abstract void endSize32(int pos);
+
public void writeUint8(short b)
{
assert b < 0x100;
@@ -132,23 +145,6 @@
writeUint64(l);
}
- private static final String checkLength(String s, int n)
- {
- if (s == null)
- {
- return "";
- }
-
- if (s.length() > n)
- {
- throw new IllegalArgumentException("string too long: " + s);
- }
- else
- {
- return s;
- }
- }
-
private static final byte[] encode(String s, String charset)
{
try
@@ -163,16 +159,31 @@
public void writeStr8(String s)
{
- s = checkLength(s, 255);
- writeUint8((short) s.length());
- put(ByteBuffer.wrap(encode(s, "UTF-8")));
+ if (s == null)
+ {
+ s = "";
+ }
+
+ byte[] bytes = str8cache.get(s);
+ if (bytes == null)
+ {
+ bytes = encode(s, "UTF-8");
+ str8cache.put(s, bytes);
+ }
+ writeUint8((short) bytes.length);
+ put(bytes);
}
public void writeStr16(String s)
{
- s = checkLength(s, 65535);
- writeUint16(s.length());
- put(ByteBuffer.wrap(encode(s, "UTF-8")));
+ if (s == null)
+ {
+ s = "";
+ }
+
+ byte[] bytes = encode(s, "UTF-8");
+ writeUint16(bytes.length);
+ put(bytes);
}
public void writeVbin8(byte[] bytes)
@@ -245,18 +256,10 @@
}
int width = s.getSizeWidth();
+ int pos = -1;
if (width > 0)
{
- if (empty)
- {
- writeSize(width, 0);
- }
- else
- {
- Sizer sizer = sizer();
- s.write(sizer);
- writeSize(width, sizer.size());
- }
+ pos = beginSize(width);
}
if (type > 0)
@@ -265,6 +268,11 @@
}
s.write(this);
+
+ if (width > 0)
+ {
+ endSize(width, pos);
+ }
}
public void writeStruct32(Struct s)
@@ -275,12 +283,10 @@
}
else
{
- Sizer sizer = sizer();
- sizer.writeUint16(s.getEncodedType());
- s.write(sizer);
- writeUint32(sizer.size());
+ int pos = beginSize32();
writeUint16(s.getEncodedType());
s.write(this);
+ endSize32(pos);
}
}
@@ -338,18 +344,13 @@
public void writeMap(Map<String,Object> map)
{
- if (map == null)
+ int pos = beginSize32();
+ if (map != null)
{
- writeUint32(0);
- return;
+ writeUint32(map.size());
+ writeMapEntries(map);
}
-
- Sizer sizer = sizer();
- sizer.writeMap(map);
- // XXX: - 4
- writeUint32(sizer.size() - 4);
- writeUint32(map.size());
- writeMapEntries(map);
+ endSize32(pos);
}
protected void writeMapEntries(Map<String,Object> map)
@@ -367,12 +368,13 @@
public void writeList(List<Object> list)
{
- Sizer sizer = sizer();
- sizer.writeList(list);
- // XXX: - 4
- writeUint32(sizer.size() - 4);
- writeUint32(list.size());
- writeListEntries(list);
+ int pos = beginSize32();
+ if (list != null)
+ {
+ writeUint32(list.size());
+ writeListEntries(list);
+ }
+ endSize32(pos);
}
protected void writeListEntries(List<Object> list)
@@ -387,16 +389,12 @@
public void writeArray(List<Object> array)
{
- if (array == null)
+ int pos = beginSize32();
+ if (array != null)
{
- array = Collections.EMPTY_LIST;
+ writeArrayEntries(array);
}
-
- Sizer sizer = sizer();
- sizer.writeArray(array);
- // XXX: -4
- writeUint32(sizer.size() - 4);
- writeArrayEntries(array);
+ endSize32(pos);
}
protected void writeArrayEntries(List<Object> array)
@@ -458,6 +456,39 @@
}
}
+ private int beginSize(int width)
+ {
+ switch (width)
+ {
+ case 1:
+ return beginSize8();
+ case 2:
+ return beginSize16();
+ case 4:
+ return beginSize32();
+ default:
+ throw new IllegalStateException("illegal width: " + width);
+ }
+ }
+
+ private void endSize(int width, int pos)
+ {
+ switch (width)
+ {
+ case 1:
+ endSize8(pos);
+ break;
+ case 2:
+ endSize16(pos);
+ break;
+ case 4:
+ endSize32(pos);
+ break;
+ default:
+ throw new IllegalStateException("illegal width: " + width);
+ }
+ }
+
private void writeBytes(Type t, byte[] bytes)
{
writeSize(t, bytes.length);
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java Wed Jul 9 06:26:54 2008
@@ -23,6 +23,8 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.qpidity.transport.Binary;
+
/**
* BBDecoder
@@ -33,9 +35,9 @@
public final class BBDecoder extends AbstractDecoder
{
- private final ByteBuffer in;
+ private ByteBuffer in;
- public BBDecoder(ByteBuffer in)
+ public void init(ByteBuffer in)
{
this.in = in;
this.in.order(ByteOrder.BIG_ENDIAN);
@@ -51,6 +53,21 @@
in.get(bytes);
}
+ protected Binary get(int size)
+ {
+ if (in.hasArray())
+ {
+ byte[] bytes = in.array();
+ Binary bin = new Binary(bytes, in.arrayOffset() + in.position(), size);
+ in.position(in.position() + size);
+ return bin;
+ }
+ else
+ {
+ return super.get(size);
+ }
+ }
+
public boolean hasRemaining()
{
return in.hasRemaining();
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java Wed Jul 9 06:26:54 2008
@@ -20,6 +20,7 @@
*/
package org.apache.qpidity.transport.codec;
+import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -33,47 +34,194 @@
public final class BBEncoder extends AbstractEncoder
{
- private final ByteBuffer out;
+ private ByteBuffer out;
- public BBEncoder(ByteBuffer out) {
- this.out = out;
- this.out.order(ByteOrder.BIG_ENDIAN);
+ public BBEncoder(int capacity) {
+ out = ByteBuffer.allocate(capacity);
+ out.order(ByteOrder.BIG_ENDIAN);
+ }
+
+ public void init()
+ {
+ out.clear();
+ }
+
+ public ByteBuffer done()
+ {
+ out.flip();
+ ByteBuffer encoded = ByteBuffer.allocate(out.remaining());
+ encoded.put(out);
+ encoded.flip();
+ return encoded;
+ }
+
+ private void grow(int size)
+ {
+ ByteBuffer old = out;
+ int capacity = old.capacity();
+ out = ByteBuffer.allocate(Math.max(capacity + size, 2*capacity));
+ out.order(ByteOrder.BIG_ENDIAN);
+ out.put(old);
}
protected void doPut(byte b)
{
- out.put(b);
+ try
+ {
+ out.put(b);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(1);
+ out.put(b);
+ }
}
protected void doPut(ByteBuffer src)
{
- out.put(src);
+ try
+ {
+ out.put(src);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(src.remaining());
+ out.put(src);
+ }
+ }
+
+ protected void put(byte[] bytes)
+ {
+ try
+ {
+ out.put(bytes);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(bytes.length);
+ out.put(bytes);
+ }
}
public void writeUint8(short b)
{
assert b < 0x100;
- out.put((byte) b);
+ try
+ {
+ out.put((byte) b);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(1);
+ out.put((byte) b);
+ }
}
public void writeUint16(int s)
{
assert s < 0x10000;
- out.putShort((short) s);
+ try
+ {
+ out.putShort((short) s);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(2);
+ out.putShort((short) s);
+ }
}
public void writeUint32(long i)
{
assert i < 0x100000000L;
- out.putInt((int) i);
+ try
+ {
+ out.putInt((int) i);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(4);
+ out.putInt((int) i);
+ }
}
public void writeUint64(long l)
{
- out.putLong(l);
+ try
+ {
+ out.putLong(l);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(8);
+ out.putLong(l);
+ }
+ }
+
+ public int beginSize8()
+ {
+ int pos = out.position();
+ try
+ {
+ out.put((byte) 0);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(1);
+ out.put((byte) 0);
+ }
+ return pos;
+ }
+
+ public void endSize8(int pos)
+ {
+ int cur = out.position();
+ out.put(pos, (byte) (cur - pos - 1));
+ }
+
+ public int beginSize16()
+ {
+ int pos = out.position();
+ try
+ {
+ out.putShort((short) 0);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(2);
+ out.putShort((short) 0);
+ }
+ return pos;
+ }
+
+ public void endSize16(int pos)
+ {
+ int cur = out.position();
+ out.putShort(pos, (short) (cur - pos - 2));
+ }
+
+ public int beginSize32()
+ {
+ int pos = out.position();
+ try
+ {
+ out.putInt(0);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(4);
+ out.putInt(0);
+ }
+ return pos;
+ }
+
+ public void endSize32(int pos)
+ {
+ int cur = out.position();
+ out.putInt(pos, (cur - pos - 4));
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java Wed Jul 9 06:26:54 2008
@@ -29,7 +29,6 @@
import org.apache.qpidity.transport.codec.BBDecoder;
import org.apache.qpidity.transport.codec.Decoder;
-import org.apache.qpidity.transport.codec.FragmentDecoder;
import org.apache.qpidity.transport.ConnectionEvent;
import org.apache.qpidity.transport.Data;
@@ -52,26 +51,32 @@
{
private final Receiver<ConnectionEvent> receiver;
- private final Map<Integer,List<ByteBuffer>> segments;
+ private final Map<Integer,List<Frame>> segments;
+ private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>()
+ {
+ public BBDecoder initialValue()
+ {
+ return new BBDecoder();
+ }
+ };
public Assembler(Receiver<ConnectionEvent> receiver)
{
this.receiver = receiver;
- segments = new HashMap<Integer,List<ByteBuffer>>();
+ segments = new HashMap<Integer,List<Frame>>();
}
private int segmentKey(Frame frame)
{
- // XXX: can this overflow?
return (frame.getTrack() + 1) * frame.getChannel();
}
- private List<ByteBuffer> getSegment(Frame frame)
+ private List<Frame> getSegment(Frame frame)
{
return segments.get(segmentKey(frame));
}
- private void setSegment(Frame frame, List<ByteBuffer> segment)
+ private void setSegment(Frame frame, List<Frame> segment)
{
int key = segmentKey(frame);
if (segments.containsKey(key))
@@ -122,7 +127,7 @@
switch (frame.getType())
{
case BODY:
- emit(frame, new Data(frame, frame.isFirstFrame(),
+ emit(frame, new Data(frame.getBody(), frame.isFirstFrame(),
frame.isLastFrame()));
break;
default:
@@ -138,42 +143,54 @@
private void assemble(Frame frame)
{
- List<ByteBuffer> segment;
- if (frame.isFirstFrame())
+ ByteBuffer segment;
+ if (frame.isFirstFrame() && frame.isLastFrame())
{
- segment = new ArrayList<ByteBuffer>();
- setSegment(frame, segment);
+ segment = frame.getBody();
+ emit(frame, decode(frame, segment));
}
else
{
- segment = getSegment(frame);
- }
+ List<Frame> frames;
+ if (frame.isFirstFrame())
+ {
+ frames = new ArrayList<Frame>();
+ setSegment(frame, frames);
+ }
+ else
+ {
+ frames = getSegment(frame);
+ }
- for (ByteBuffer buf : frame)
- {
- segment.add(buf);
- }
+ frames.add(frame);
- if (frame.isLastFrame())
- {
- clearSegment(frame);
- emit(frame, decode(frame, frame.getType(), segment));
+ if (frame.isLastFrame())
+ {
+ clearSegment(frame);
+
+ int size = 0;
+ for (Frame f : frames)
+ {
+ size += f.getSize();
+ }
+ segment = ByteBuffer.allocate(size);
+ for (Frame f : frames)
+ {
+ segment.put(f.getBody());
+ }
+ segment.flip();
+ emit(frame, decode(frame, segment));
+ }
}
+
}
- private ProtocolEvent decode(Frame frame, SegmentType type, List<ByteBuffer> segment)
+ private ProtocolEvent decode(Frame frame, ByteBuffer segment)
{
- Decoder dec;
- if (segment.size() == 1)
- {
- dec = new BBDecoder(segment.get(0));
- }
- else
- {
- dec = new FragmentDecoder(segment.iterator());
- }
+ BBDecoder dec = decoder.get();
+ dec.init(segment);
- switch (type)
+ switch (frame.getType())
{
case CONTROL:
int controlType = dec.readUint16();
@@ -193,9 +210,9 @@
{
structs.add(dec.readStruct32());
}
- return new Header(structs,frame.isLastFrame() && frame.isLastSegment());
+ return new Header(structs, frame.isLastFrame() && frame.isLastSegment());
default:
- throw new IllegalStateException("unknown frame type: " + type);
+ throw new IllegalStateException("unknown frame type: " + frame.getType());
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java Wed Jul 9 06:26:54 2008
@@ -21,7 +21,6 @@
package org.apache.qpidity.transport.network;
import org.apache.qpidity.transport.codec.BBEncoder;
-import org.apache.qpidity.transport.codec.SizeEncoder;
import org.apache.qpidity.transport.ConnectionEvent;
import org.apache.qpidity.transport.Data;
@@ -54,6 +53,13 @@
private final Sender<NetworkEvent> sender;
private final int maxPayload;
+ private final ThreadLocal<BBEncoder> encoder = new ThreadLocal()
+ {
+ public BBEncoder initialValue()
+ {
+ return new BBEncoder(4*1024);
+ }
+ };
public Disassembler(Sender<NetworkEvent> sender, int maxFrame)
{
@@ -72,6 +78,11 @@
event.getProtocolEvent().delegate(event, this);
}
+ public void flush()
+ {
+ sender.flush();
+ }
+
public void close()
{
sender.close();
@@ -92,8 +103,7 @@
first = false;
}
nflags |= LAST_FRAME;
- Frame frame = new Frame(nflags, type, track, event.getChannel());
- // frame.addFragment(buf);
+ Frame frame = new Frame(nflags, type, track, event.getChannel(), buf.slice());
sender.send(frame);
}
else
@@ -115,8 +125,7 @@
newflags |= LAST_FRAME;
}
- Frame frame = new Frame(newflags, type, track, event.getChannel());
- frame.addFragment(slice);
+ Frame frame = new Frame(newflags, type, track, event.getChannel(), slice);
sender.send(frame);
}
}
@@ -137,18 +146,18 @@
method(event, method, SegmentType.COMMAND);
}
- private void method(ConnectionEvent event, Method method, SegmentType type)
+ private ByteBuffer copy(ByteBuffer src)
{
- SizeEncoder sizer = new SizeEncoder();
- sizer.writeUint16(method.getEncodedType());
- if (type == SegmentType.COMMAND)
- {
- sizer.writeUint16(0);
- }
- method.write(sizer);
+ ByteBuffer buf = ByteBuffer.allocate(src.remaining());
+ buf.put(src);
+ buf.flip();
+ return buf;
+ }
- ByteBuffer buf = ByteBuffer.allocate(sizer.size());
- BBEncoder enc = new BBEncoder(buf);
+ private void method(ConnectionEvent event, Method method, SegmentType type)
+ {
+ BBEncoder enc = encoder.get();
+ enc.init();
enc.writeUint16(method.getEncodedType());
if (type == SegmentType.COMMAND)
{
@@ -162,7 +171,7 @@
}
}
method.write(enc);
- buf.flip();
+ ByteBuffer buf = enc.done();
byte flags = FIRST_SEG;
@@ -176,42 +185,29 @@
public void header(ConnectionEvent event, Header header)
{
- ByteBuffer buf;
- if( header.getBuf() == null)
+ ByteBuffer buf;
+ if (header.getBuf() == null)
{
- SizeEncoder sizer = new SizeEncoder();
- for (Struct st : header.getStructs())
- {
- sizer.writeStruct32(st);
- }
-
- buf = ByteBuffer.allocate(sizer.size());
- BBEncoder enc = new BBEncoder(buf);
+ BBEncoder enc = encoder.get();
+ enc.init();
for (Struct st : header.getStructs())
{
enc.writeStruct32(st);
}
+ buf = enc.done();
header.setBuf(buf);
}
else
{
buf = header.getBuf();
+ buf.flip();
}
- buf.flip();
fragment((byte) 0x0, SegmentType.HEADER, event, buf, true, true);
}
public void data(ConnectionEvent event, Data data)
{
- boolean first = data.isFirst();
- for (Iterator<ByteBuffer> it = data.getFragments().iterator();
- it.hasNext(); )
- {
- ByteBuffer buf = it.next();
- boolean last = data.isLast() && !it.hasNext();
- fragment(LAST_SEG, SegmentType.BODY, event, buf, first, last);
- first = false;
- }
+ fragment(LAST_SEG, SegmentType.BODY, event, data.getData(), data.isFirst(), data.isLast());
}
public void error(ConnectionEvent event, ProtocolError error)
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java Wed Jul 9 06:26:54 2008
@@ -38,7 +38,7 @@
* @author Rafael H. Schloming
*/
-public final class Frame implements NetworkEvent, Iterable<ByteBuffer>
+public final class Frame implements NetworkEvent
{
public static final int HEADER_SIZE = 12;
@@ -61,23 +61,21 @@
final private SegmentType type;
final private byte track;
final private int channel;
- final private List<ByteBuffer> fragments;
- private int size;
+ final private ByteBuffer body;
- public Frame(byte flags, SegmentType type, byte track, int channel)
+ public Frame(byte flags, SegmentType type, byte track, int channel,
+ ByteBuffer body)
{
this.flags = flags;
this.type = type;
this.track = track;
this.channel = channel;
- this.size = 0;
- this.fragments = new ArrayList<ByteBuffer>();
+ this.body = body;
}
- public void addFragment(ByteBuffer fragment)
+ public ByteBuffer getBody()
{
- fragments.add(fragment);
- size += fragment.remaining();
+ return body.slice();
}
public byte getFlags()
@@ -92,7 +90,7 @@
public int getSize()
{
- return size;
+ return body.remaining();
}
public SegmentType getType()
@@ -130,16 +128,6 @@
return flag(LAST_FRAME);
}
- public Iterator<ByteBuffer> getFragments()
- {
- return new SliceIterator(fragments.iterator());
- }
-
- public Iterator<ByteBuffer> iterator()
- {
- return getFragments();
- }
-
public void delegate(NetworkDelegate delegate)
{
delegate.frame(this);
@@ -148,26 +136,14 @@
public String toString()
{
StringBuilder str = new StringBuilder();
+
str.append(String.format
("[%05d %05d %1d %s %d%d%d%d] ", getChannel(), getSize(),
getTrack(), getType(),
isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0,
isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0));
- boolean first = true;
- for (ByteBuffer buf : this)
- {
- if (first)
- {
- first = false;
- }
- else
- {
- str.append(" | ");
- }
-
- str.append(str(buf));
- }
+ str.append(str(body));
return str.toString();
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java Wed Jul 9 06:26:54 2008
@@ -79,7 +79,7 @@
private byte track;
private int channel;
private int size;
- private Frame frame;
+ private ByteBuffer body;
public InputHandler(Receiver<NetworkEvent> receiver, State state)
{
@@ -99,9 +99,9 @@
private void frame()
{
+ Frame frame = new Frame(flags, type, track, channel, body);
assert size == frame.getSize();
receiver.received(frame);
- frame = null;
}
private void error(String fmt, Object ... args)
@@ -191,30 +191,28 @@
return ERROR;
}
- frame = new Frame(flags, type, track, channel);
if (size > buf.remaining()) {
- frame.addFragment(buf.slice());
- buf.position(buf.limit());
+ body = ByteBuffer.allocate(size);
+ body.put(buf);
return FRAME_FRAGMENT;
} else {
- ByteBuffer payload = buf.slice();
- payload.limit(size);
+ body = buf.slice();
+ body.limit(size);
buf.position(buf.position() + size);
- frame.addFragment(payload);
frame();
return FRAME_HDR;
}
case FRAME_FRAGMENT:
- int delta = size - frame.getSize();
+ int delta = body.remaining();
if (delta > buf.remaining()) {
- frame.addFragment(buf.slice());
- buf.position(buf.limit());
+ body.put(buf);
return FRAME_FRAGMENT;
} else {
ByteBuffer fragment = buf.slice();
fragment.limit(delta);
buf.position(buf.position() + delta);
- frame.addFragment(fragment);
+ body.put(fragment);
+ body.flip();
frame();
return FRAME_HDR;
}