You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/29 12:10:12 UTC
svn commit: r501009 - in /incubator/qpid/trunk/qpid/java/cluster/src:
main/java/org/apache/qpid/server/cluster/
main/java/org/apache/qpid/server/cluster/replay/
main/java/org/apache/qpid/server/queue/
test/java/org/apache/qpid/server/cluster/
Author: rgreig
Date: Mon Jan 29 03:10:09 2007
New Revision: 501009
URL: http://svn.apache.org/viewvc?view=rev&rev=501009
Log:
QPID-320 : Patch supplied by Rob Godfrey - Improve performance by remembering protocol version
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java Mon Jan 29 03:10:09 2007
@@ -172,12 +172,12 @@
private boolean isMembershipAnnouncement(Object msg)
{
- return msg instanceof AMQFrame && (((AMQFrame) msg).bodyFrame instanceof ClusterMembershipBody);
+ return msg instanceof AMQFrame && (((AMQFrame) msg).getBodyFrame() instanceof ClusterMembershipBody);
}
private boolean isBufferable(Object msg)
{
- return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).bodyFrame);
+ return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).getBodyFrame());
}
private boolean isBuffereable(AMQBody body)
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java Mon Jan 29 03:10:09 2007
@@ -110,6 +110,8 @@
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterPingBody ping = new ClusterPingBody((byte)8,
(byte)0,
+ ClusterPingBody.getClazz((byte)8, (byte)0),
+ ClusterPingBody.getMethod((byte)8, (byte)0),
_group.getLocal().getDetails(),
_loadTable.getLocalLoad(),
true);
@@ -159,6 +161,8 @@
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterJoinBody join = new ClusterJoinBody((byte)8,
(byte)0,
+ ClusterJoinBody.getClazz((byte)8, (byte)0),
+ ClusterJoinBody.getMethod((byte)8, (byte)0),
_group.getLocal().getDetails());
send(leader, new SimpleBodySendable(join));
@@ -182,6 +186,8 @@
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterLeaveBody leave = new ClusterLeaveBody((byte)8,
(byte)0,
+ ClusterLeaveBody.getClazz((byte)8, (byte)0),
+ ClusterLeaveBody.getMethod((byte)8, (byte)0),
_group.getLocal().getDetails());
send(getLeader(), new SimpleBodySendable(leave));
@@ -207,6 +213,8 @@
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8,
(byte)0,
+ ClusterSuspectBody.getClazz((byte)8, (byte)0),
+ ClusterSuspectBody.getMethod((byte)8, (byte)0),
broker.getDetails());
send(getLeader(), new SimpleBodySendable(suspect));
@@ -231,7 +239,10 @@
//pass request on to leader:
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0, member.getDetails());
+ ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0,
+ ClusterJoinBody.getClazz((byte)8, (byte)0),
+ ClusterJoinBody.getMethod((byte)8, (byte)0),
+ member.getDetails());
Broker leader = getLeader();
send(leader, new SimpleBodySendable(request));
@@ -278,7 +289,10 @@
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0, membership.getBytes());
+ ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0,
+ ClusterMembershipBody.getClazz((byte)8, (byte)0),
+ ClusterMembershipBody.getMethod((byte)8, (byte)0),
+ membership.getBytes());
return announce;
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java Mon Jan 29 03:10:09 2007
@@ -200,10 +200,10 @@
private void handleFrame(AMQFrame frame) throws AMQException
{
- AMQBody body = frame.bodyFrame;
+ AMQBody body = frame.getBodyFrame();
if (body instanceof AMQMethodBody)
{
- handleMethod(frame.channel, (AMQMethodBody) body);
+ handleMethod(frame.getChannel(), (AMQMethodBody) body);
}
else
{
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java Mon Jan 29 03:10:09 2007
@@ -56,6 +56,8 @@
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
BasicConsumeBody m = new BasicConsumeBody((byte)8,
(byte)0,
+ BasicConsumeBody.getClazz((byte)8, (byte)0),
+ BasicConsumeBody.getMethod((byte)8, (byte)0),
null,
queue,
false,
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java Mon Jan 29 03:10:09 2007
@@ -50,13 +50,13 @@
private final byte minor = (byte)0;
private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[]
{
- new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor,null,false,false,false,false,false,null,0)),
- new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor,false,false,false,null,0)),
- new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor,null,null,false,null,null,0)),
- new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor,null,false,false,null,false,false,false,0,null)),
- new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor,null,false,false,0)),
- new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor,null,null,false,false,false,false,null,0)),
- new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor,null,false))
+ new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor, QueueDeclareBody.getClazz(major, minor), QueueDeclareBody.getMethod(major, minor),null,false,false,false,false,false,null,0)),
+ new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor, QueueDeleteBody.getClazz(major, minor), QueueDeleteBody.getMethod(major, minor),false,false,false,null,0)),
+ new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor, QueueBindBody.getClazz(major, minor), QueueBindBody.getMethod(major, minor),null,null,false,null,null,0)),
+ new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor, ExchangeDeclareBody.getClazz(major, minor), ExchangeDeclareBody.getMethod(major, minor),null,false,false,null,false,false,false,0,null)),
+ new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor, ExchangeDeleteBody.getClazz(major, minor), ExchangeDeleteBody.getMethod(major, minor),null,false,false,0)),
+ new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor, BasicConsumeBody.getClazz(major, minor), BasicConsumeBody.getMethod(major, minor),null,null,false,false,false,false,null,0)),
+ new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor, BasicCancelBody.getClazz(major, minor), BasicCancelBody.getMethod(major, minor),null,false))
});
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java Mon Jan 29 03:10:09 2007
@@ -122,7 +122,7 @@
_consumers.replay(methods);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- methods.add(new ClusterSynchBody((byte)8, (byte)0));
+ methods.add(new ClusterSynchBody((byte)8, (byte)0, ClusterSynchBody.getClazz((byte)8, (byte)0), ClusterSynchBody.getMethod((byte)8, (byte)0)));
return methods;
}
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java Mon Jan 29 03:10:09 2007
@@ -74,6 +74,8 @@
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
QueueDeleteBody request = new QueueDeleteBody((byte)8,
(byte)0,
+ QueueDeleteBody.getClazz((byte)8,(byte)0),
+ QueueDeleteBody.getMethod((byte)8,(byte)0),
false,
false,
false,
@@ -94,6 +96,8 @@
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
BasicCancelBody request = new BasicCancelBody((byte)8,
(byte)0,
+ BasicCancelBody.getClazz((byte)8, (byte)0),
+ BasicCancelBody.getMethod((byte)8, (byte)0),
getName(),
false);
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java Mon Jan 29 03:10:09 2007
@@ -54,7 +54,10 @@
//send delete request to peers:
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0, false,false,false,null,0);
+ QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0,
+ QueueDeleteBody.getClazz((byte)8, (byte)0),
+ QueueDeleteBody.getMethod((byte)8, (byte)0),
+ false,false,false,null,0);
request.queue = getName();
_groupMgr.broadcast(new SimpleBodySendable(request));
}
Modified: incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java Mon Jan 29 03:10:09 2007
@@ -54,7 +54,7 @@
for (RecordingBroker b : brokers)
{
- b.handleResponse(((AMQFrame) b.getMessages().get(0)).channel, new TestMethod("response"));
+ b.handleResponse(((AMQFrame) b.getMessages().get(0)).getChannel(), new TestMethod("response"));
}
assertTrue("Handler did not receive response", handler.isCompleted());
@@ -80,7 +80,7 @@
for (RecordingBroker broker : succeeded)
{
- broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).channel, new TestMethod("response"));
+ broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).getChannel(), new TestMethod("response"));
}
b.remove();
@@ -106,7 +106,7 @@
for (int i = 0; i < msgs.length; i++)
{
assertTrue(sent.get(i) instanceof AMQFrame);
- assertEquals(msgs[i], ((AMQFrame) sent.get(i)).bodyFrame);
+ assertEquals(msgs[i], ((AMQFrame) sent.get(i)).getBodyFrame());
}
}
@@ -119,9 +119,9 @@
List<AMQDataBlock> sent = broker.getMessages();
assertEquals(1, sent.size());
assertTrue(sent.get(0) instanceof AMQFrame);
- assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+ assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).getBodyFrame());
- broker.handleResponse(((AMQFrame) sent.get(0)).channel, new TestMethod("B"));
+ broker.handleResponse(((AMQFrame) sent.get(0)).getChannel(), new TestMethod("B"));
assertEquals(new TestMethod("B"), handler.getResponse());
}
@@ -135,7 +135,7 @@
List<AMQDataBlock> sent = broker.getMessages();
assertEquals(1, sent.size());
assertTrue(sent.get(0) instanceof AMQFrame);
- assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+ assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).getBodyFrame());
broker.remove();
assertEquals(null, handler.getResponse());
assertTrue(handler.isCompleted());