You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/29 12:10:12 UTC

svn commit: r501009 - in /incubator/qpid/trunk/qpid/java/cluster/src: main/java/org/apache/qpid/server/cluster/ main/java/org/apache/qpid/server/cluster/replay/ main/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/cluster/

Author: rgreig
Date: Mon Jan 29 03:10:09 2007
New Revision: 501009

URL: http://svn.apache.org/viewvc?view=rev&rev=501009
Log:
QPID-320 : Patch supplied by Rob Godfrey - Improve performance by remembering protocol version

Modified:
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
    incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java Mon Jan 29 03:10:09 2007
@@ -172,12 +172,12 @@
 
     private boolean isMembershipAnnouncement(Object msg)
     {
-        return msg instanceof AMQFrame && (((AMQFrame) msg).bodyFrame instanceof ClusterMembershipBody);
+        return msg instanceof AMQFrame && (((AMQFrame) msg).getBodyFrame() instanceof ClusterMembershipBody);
     }
 
     private boolean isBufferable(Object msg)
     {
-        return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).bodyFrame);
+        return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).getBodyFrame());
     }
 
     private boolean isBuffereable(AMQBody body)

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java Mon Jan 29 03:10:09 2007
@@ -110,6 +110,8 @@
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         ClusterPingBody ping = new ClusterPingBody((byte)8,
                                                    (byte)0,
+                                                   ClusterPingBody.getClazz((byte)8, (byte)0),
+                                                   ClusterPingBody.getMethod((byte)8, (byte)0),
                                                    _group.getLocal().getDetails(),
                                                    _loadTable.getLocalLoad(),
                                                    true);
@@ -159,6 +161,8 @@
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         ClusterJoinBody join = new ClusterJoinBody((byte)8,
                                                    (byte)0,
+                                                   ClusterJoinBody.getClazz((byte)8, (byte)0),
+                                                   ClusterJoinBody.getMethod((byte)8, (byte)0),
                                                    _group.getLocal().getDetails());
 
         send(leader, new SimpleBodySendable(join));
@@ -182,6 +186,8 @@
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         ClusterLeaveBody leave = new ClusterLeaveBody((byte)8,
                                                       (byte)0,
+                                                      ClusterLeaveBody.getClazz((byte)8, (byte)0),
+                                                      ClusterLeaveBody.getMethod((byte)8, (byte)0),
                                                       _group.getLocal().getDetails());
 
         send(getLeader(), new SimpleBodySendable(leave));
@@ -207,6 +213,8 @@
             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8,
                                                                 (byte)0,
+                                                                ClusterSuspectBody.getClazz((byte)8, (byte)0),
+                                                                ClusterSuspectBody.getMethod((byte)8, (byte)0),
                                                                 broker.getDetails());
 
             send(getLeader(), new SimpleBodySendable(suspect));
@@ -231,7 +239,10 @@
             //pass request on to leader:
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0, member.getDetails());
+            ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0,
+                                                          ClusterJoinBody.getClazz((byte)8, (byte)0),
+                                                          ClusterJoinBody.getMethod((byte)8, (byte)0),
+                                                          member.getDetails());
             
             Broker leader = getLeader();
             send(leader, new SimpleBodySendable(request));
@@ -278,7 +289,10 @@
     {
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0, membership.getBytes());
+        ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0,
+                                                                   ClusterMembershipBody.getClazz((byte)8, (byte)0),
+                                                                   ClusterMembershipBody.getMethod((byte)8, (byte)0),
+                                                                   membership.getBytes());
 
         
         return announce;

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java Mon Jan 29 03:10:09 2007
@@ -200,10 +200,10 @@
 
     private void handleFrame(AMQFrame frame) throws AMQException
     {
-        AMQBody body = frame.bodyFrame;
+        AMQBody body = frame.getBodyFrame();
         if (body instanceof AMQMethodBody)
         {
-            handleMethod(frame.channel, (AMQMethodBody) body);
+            handleMethod(frame.getChannel(), (AMQMethodBody) body);
         }
         else
         {

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java Mon Jan 29 03:10:09 2007
@@ -56,6 +56,8 @@
             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             BasicConsumeBody m = new BasicConsumeBody((byte)8,
                                                       (byte)0,
+                                                      BasicConsumeBody.getClazz((byte)8, (byte)0),
+                                                      BasicConsumeBody.getMethod((byte)8, (byte)0),
                                                       null,
                                                       queue,
                                                       false,

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java Mon Jan 29 03:10:09 2007
@@ -50,13 +50,13 @@
     private final byte minor = (byte)0;
     private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[]
             {
-                    new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor,null,false,false,false,false,false,null,0)),
-                    new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor,false,false,false,null,0)),
-                    new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor,null,null,false,null,null,0)),
-                    new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor,null,false,false,null,false,false,false,0,null)),
-                    new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor,null,false,false,0)),
-                    new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor,null,null,false,false,false,false,null,0)),
-                    new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor,null,false))
+                    new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor, QueueDeclareBody.getClazz(major, minor), QueueDeclareBody.getMethod(major, minor),null,false,false,false,false,false,null,0)),
+                    new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor, QueueDeleteBody.getClazz(major, minor), QueueDeleteBody.getMethod(major, minor),false,false,false,null,0)),
+                    new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor, QueueBindBody.getClazz(major, minor), QueueBindBody.getMethod(major, minor),null,null,false,null,null,0)),
+                    new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor, ExchangeDeclareBody.getClazz(major, minor), ExchangeDeclareBody.getMethod(major, minor),null,false,false,null,false,false,false,0,null)),
+                    new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor, ExchangeDeleteBody.getClazz(major, minor), ExchangeDeleteBody.getMethod(major, minor),null,false,false,0)),
+                    new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor, BasicConsumeBody.getClazz(major, minor), BasicConsumeBody.getMethod(major, minor),null,null,false,false,false,false,null,0)),
+                    new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor, BasicCancelBody.getClazz(major, minor), BasicCancelBody.getMethod(major, minor),null,false))
             });
 
 

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java Mon Jan 29 03:10:09 2007
@@ -122,7 +122,7 @@
         _consumers.replay(methods);
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        methods.add(new ClusterSynchBody((byte)8, (byte)0));
+        methods.add(new ClusterSynchBody((byte)8, (byte)0, ClusterSynchBody.getClazz((byte)8, (byte)0), ClusterSynchBody.getMethod((byte)8, (byte)0)));
         return methods;
     }
 

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java Mon Jan 29 03:10:09 2007
@@ -74,6 +74,8 @@
         	// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             QueueDeleteBody request = new QueueDeleteBody((byte)8,
                                                           (byte)0,
+                                                          QueueDeleteBody.getClazz((byte)8,(byte)0),
+                                                          QueueDeleteBody.getMethod((byte)8,(byte)0),
                                                           false,
                                                           false,
                                                           false,
@@ -94,6 +96,8 @@
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         BasicCancelBody request = new BasicCancelBody((byte)8,
                                                       (byte)0,
+                                                      BasicCancelBody.getClazz((byte)8, (byte)0),
+                                                      BasicCancelBody.getMethod((byte)8, (byte)0),
                                                       getName(),
                                                       false);
         

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java Mon Jan 29 03:10:09 2007
@@ -54,7 +54,10 @@
         //send delete request to peers:
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0, false,false,false,null,0);
+        QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0,
+                                                      QueueDeleteBody.getClazz((byte)8, (byte)0),
+                                                      QueueDeleteBody.getMethod((byte)8, (byte)0),
+                                                      false,false,false,null,0);
         request.queue = getName();
         _groupMgr.broadcast(new SimpleBodySendable(request));
     }

Modified: incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java Mon Jan 29 03:10:09 2007
@@ -54,7 +54,7 @@
 
         for (RecordingBroker b : brokers)
         {
-            b.handleResponse(((AMQFrame) b.getMessages().get(0)).channel, new TestMethod("response"));
+            b.handleResponse(((AMQFrame) b.getMessages().get(0)).getChannel(), new TestMethod("response"));
         }
 
         assertTrue("Handler did not receive response", handler.isCompleted());
@@ -80,7 +80,7 @@
 
         for (RecordingBroker broker : succeeded)
         {
-            broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).channel, new TestMethod("response"));
+            broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).getChannel(), new TestMethod("response"));
         }
         b.remove();
 
@@ -106,7 +106,7 @@
         for (int i = 0; i < msgs.length; i++)
         {
             assertTrue(sent.get(i) instanceof AMQFrame);
-            assertEquals(msgs[i], ((AMQFrame) sent.get(i)).bodyFrame);
+            assertEquals(msgs[i], ((AMQFrame) sent.get(i)).getBodyFrame());
         }
     }
 
@@ -119,9 +119,9 @@
         List<AMQDataBlock> sent = broker.getMessages();
         assertEquals(1, sent.size());
         assertTrue(sent.get(0) instanceof AMQFrame);
-        assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+        assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).getBodyFrame());
 
-        broker.handleResponse(((AMQFrame) sent.get(0)).channel, new TestMethod("B"));
+        broker.handleResponse(((AMQFrame) sent.get(0)).getChannel(), new TestMethod("B"));
 
         assertEquals(new TestMethod("B"), handler.getResponse());
     }
@@ -135,7 +135,7 @@
         List<AMQDataBlock> sent = broker.getMessages();
         assertEquals(1, sent.size());
         assertTrue(sent.get(0) instanceof AMQFrame);
-        assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+        assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).getBodyFrame());
         broker.remove();
         assertEquals(null, handler.getResponse());
         assertTrue(handler.isCompleted());