You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/06/06 05:39:03 UTC
svn commit: r663813 - in /incubator/qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpidity/nclient/
common/src/main/java/org/apache/qpidity/transport/
common/src/main/java/org/apache/qpidity/transport...
Author: rhs
Date: Thu Jun 5 20:39:03 2008
New Revision: 663813
URL: http://svn.apache.org/viewvc?rev=663813&view=rev
Log:
QPID-1062: merge writes of separate frames within an assembly, use sync flag instead of sync command on message transfer
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=663813&r1=663812&r2=663813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Jun 5 20:39:03 2008
@@ -206,16 +206,24 @@
// send the message
try
{
- ((AMQSession_0_10) getSession()).getQpidSession().messageTransfer(destination.getExchangeName().toString(),
- message.get010Message(),
- org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
- org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
- if(deliveryMode == DeliveryMode.PERSISTENT && getSession().getAMQConnection().getSyncPersistence())
+ org.apache.qpidity.nclient.Session ssn = ((AMQSession_0_10) getSession()).getQpidSession();
+
+ // if true, we need to sync the delivery of this message
+ boolean sync = (deliveryMode == DeliveryMode.PERSISTENT &&
+ getSession().getAMQConnection().getSyncPersistence());
+
+ if(sync)
{
- // we need to sync the delivery of this message
- ((AMQSession_0_10) getSession()).getQpidSession().sync();
+ ssn.setAutoSync(true);
+ }
+ ssn.messageTransfer(destination.getExchangeName().toString(),
+ message.get010Message(),
+ ssn.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+ ssn.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+ if (sync)
+ {
+ ssn.setAutoSync(false);
}
-
}
catch (IOException e)
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=663813&r1=663812&r2=663813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java Thu Jun 5 20:39:03 2008
@@ -71,6 +71,8 @@
public byte[] getName();
+ public void setAutoSync(boolean value);
+
//------------------------------------------------------
// Messaging methods
// Producer
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=663813&r1=663812&r2=663813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java Thu Jun 5 20:39:03 2008
@@ -65,6 +65,7 @@
private byte[] name;
private long timeout = 60000;
+ private boolean autoSync = false;
// channel may be null
Channel channel;
@@ -80,6 +81,7 @@
private int commandsOut = 0;
private Map<Integer,Method> commands = new HashMap<Integer,Method>();
private int maxComplete = commandsOut - 1;
+ private boolean needSync = false;
private AtomicBoolean closed = new AtomicBoolean(false);
@@ -93,6 +95,14 @@
return name;
}
+ public void setAutoSync(boolean value)
+ {
+ synchronized (commands)
+ {
+ this.autoSync = value;
+ }
+ }
+
public Map<Integer,Method> getOutstandingCommands()
{
return commands;
@@ -242,7 +252,16 @@
{
commands.put(next, m);
}
+ if (autoSync)
+ {
+ m.setSync(true);
+ }
+ needSync = !m.isSync();
channel.method(m);
+ if (autoSync && !m.hasPayload())
+ {
+ sync();
+ }
}
}
else
@@ -286,6 +305,13 @@
public void endData()
{
channel.end();
+ synchronized (commands)
+ {
+ if (autoSync)
+ {
+ sync();
+ }
+ }
}
public void sync()
@@ -300,7 +326,7 @@
{
int point = commandsOut - 1;
- if (lt(maxComplete, point))
+ if (needSync && lt(maxComplete, point))
{
ExecutionSync sync = new ExecutionSync();
sync.setSync(true);
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java?rev=663813&r1=663812&r2=663813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java Thu Jun 5 20:39:03 2008
@@ -22,6 +22,9 @@
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.qpidity.transport.Constant;
import org.apache.qpidity.transport.ProtocolError;
import org.apache.qpidity.transport.ProtocolHeader;
@@ -40,6 +43,8 @@
private Sender<ByteBuffer> sender;
private Object lock = new Object();
+ private int bytes = 0;
+ private List<Frame> frames = new ArrayList<Frame>();
public OutputHandler(Sender<ByteBuffer> sender)
{
@@ -69,24 +74,37 @@
public void frame(Frame frame)
{
- ByteBuffer buf = ByteBuffer.allocate(HEADER_SIZE + frame.getSize());
- buf.put(frame.getFlags());
- buf.put((byte) frame.getType().getValue());
- buf.putShort((short) (frame.getSize() + HEADER_SIZE));
- // RESERVED
- buf.put(RESERVED);
- buf.put(frame.getTrack());
- buf.putShort((short) frame.getChannel());
- // RESERVED
- buf.putInt(0);
- for(ByteBuffer frg : frame)
- {
- buf.put(frg);
- }
- buf.flip();
synchronized (lock)
{
- sender.send(buf);
+ frames.add(frame);
+ bytes += HEADER_SIZE + frame.getSize();
+
+ if (frame.isLastFrame() && frame.isLastSegment() || bytes > 64*1024)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(bytes);
+ for (Frame f : frames)
+ {
+ buf.put(f.getFlags());
+ buf.put((byte) f.getType().getValue());
+ buf.putShort((short) (f.getSize() + HEADER_SIZE));
+ // RESERVED
+ buf.put(RESERVED);
+ buf.put(f.getTrack());
+ buf.putShort((short) f.getChannel());
+ // RESERVED
+ buf.putInt(0);
+ for(ByteBuffer frg : f)
+ {
+ buf.put(frg);
+ }
+ }
+ buf.flip();
+
+ frames.clear();
+ bytes = 0;
+
+ sender.send(buf);
+ }
}
}