You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/15 20:00:15 UTC

svn commit: r496439 - in /incubator/qpid/branches/qpid.0-9: gentools/templ.java/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/protocol/ java/common/src/main/java/org/apache/qpid/ java/common/src/mai...

Author: kpvdr
Date: Mon Jan 15 11:00:14 2007
New Revision: 496439

URL: http://svn.apache.org/viewvc?view=rev&rev=496439
Log:
Changed static method AMQMethodBody.createAMQFrame() to createMessageBody() for all generated classes so that it can be used with the new Request and Response handlers. (Don't forget to update the gentools dir.) Created new methods AMQMinaProtocolSession.writeRequest() and writeResponse() as new entry points for Request and Response framing.

Modified:
    incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java

Modified: incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl?view=diff&rev=496439&r1=496438&r2=496439
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl (original)
+++ incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl Mon Jan 15 11:00:14 2007
@@ -84,16 +84,13 @@
         return buf.toString();
     }
 
-    public static AMQFrame createAMQFrame(int _channelId, byte major, byte minor
+    public static ${CLASS}${METHOD}Body createMethodBody(byte major, byte minor
 %{FLIST}    ${mb_field_parameter_list}
                                          )
     {
         ${CLASS}${METHOD}Body bodyFrame = new ${CLASS}${METHOD}Body(major, minor);
 %{FLIST}    ${mb_field_body_initialize}
         		 
-        AMQFrame frame = new AMQFrame();
-        frame.channel = _channelId;
-        frame.bodyFrame = bodyFrame;
-        return frame;
+        return bodyFrame;
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=496439&r1=496438&r2=496439
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Mon Jan 15 11:00:14 2007
@@ -23,14 +23,12 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQDataBlock;
-//import org.apache.qpid.framing.BasicPublishBody;
-//import org.apache.qpid.framing.ContentBody;
-//import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.Content;
 import org.apache.qpid.framing.RequestManager;
 import org.apache.qpid.framing.ResponseManager;
 import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.server.ack.TxAck;
 import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
@@ -121,7 +119,7 @@
     private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
     private Set<Long> _browsedAcks = new HashSet<Long>();
 
-    public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges, AMQProtocolWriter protocolSession)
+    public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges, AMQProtocolWriter protocolWriter, AMQMethodListener methodListener)
             throws AMQException
     {
         _channelId = channelId;
@@ -129,8 +127,8 @@
         _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
         _messageStore = messageStore;
         _exchanges = exchanges;
-   		_requestManager = new RequestManager(channelId, protocolSession);
-    	_responseManager = new ResponseManager(channelId, protocolSession);
+   		_requestManager = new RequestManager(channelId, protocolWriter);
+    	_responseManager = new ResponseManager(channelId, methodListener, protocolWriter);
         _txnBuffer = new TxnBuffer(_messageStore);
     }
 
@@ -279,8 +277,15 @@
         }
     }
     
-    public RequestManager getRequestManager() { return _requestManager; }
-    public ResponseManager getResponseManager() { return _responseManager; }
+    public RequestManager getRequestManager()
+    {
+        return _requestManager;
+    }
+    
+    public ResponseManager getResponseManager()
+    {
+        return _responseManager;
+    }
 
     public long getNextDeliveryTag()
     {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=496439&r1=496438&r2=496439
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Jan 15 11:00:14 2007
@@ -36,7 +36,9 @@
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQRequestBody;
 import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.AMQResponseCallback;
 import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.RequestResponseMappingException;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -119,7 +121,6 @@
         _codecFactory = codecFactory;
         _managedObject = createMBean();
         _managedObject.register();
-//        this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
     }
 
     public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
@@ -209,11 +210,11 @@
 
             if (frame.bodyFrame instanceof AMQRequestBody)
             {
-            	requestFrameReceived(frame);
+            	requestFrameReceived(frame.channel, (AMQRequestBody)frame.bodyFrame);
             }
             else if (frame.bodyFrame instanceof AMQResponseBody)
             {
-            	responseFrameReceived(frame);
+            	responseFrameReceived(frame.channel, (AMQResponseBody)frame.bodyFrame);
             }
             else
             {
@@ -222,97 +223,43 @@
         }
     }
     
-    private void requestFrameReceived(AMQFrame frame) throws AMQException
+    private void requestFrameReceived(int channel, AMQRequestBody requestBody) throws AMQException
     {
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Request frame received: " + frame);
         }
-        AMQChannel channel = getChannel(frame.channel);
+        AMQChannel channel = getChannel(channel);
+        ResponseManager responseManager = channel.getResponseManager();
+        responseManager.requestReceived(requestBody);
     }
     
-    private void responseFrameReceived(AMQFrame frame) throws AMQException
+    private void responseFrameReceived(int channel, AMQResponseBody responseBody) throws AMQException
     {
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Response frame received: " + frame);
         }
-        AMQChannel channel = getChannel(frame.channel);
+        AMQChannel channel = getChannel(channel);
+        RequestManager requestManager = channel.getRequestManager();
+        requestManager.responseReceived(responseBody);
     }
 
-//     private void methodFrameReceived(AMQFrame frame)
-//     {
-//         if (_logger.isDebugEnabled())
-//         {
-//             _logger.debug("Method frame received: " + frame);
-//         }
-//         final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel,
-//                                                                                     (AMQMethodBody) frame.bodyFrame);
-//         try
-//         {
-//             boolean wasAnyoneInterested = false;
-//             for (AMQMethodListener listener : _frameListeners)
-//             {
-//                 wasAnyoneInterested = listener.methodReceived(evt) ||
-//                                       wasAnyoneInterested;
-//             }
-//             if (!wasAnyoneInterested)
-//             {
-//                 throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
-//             }
-//         }
-//         catch (AMQChannelException e)
-//         {
-//             _logger.error("Closing channel due to: " + e.getMessage());
-//             writeFrame(e.getCloseFrame(frame.channel));
-//         }
-//         catch (Exception e)
-//         {
-//             for (AMQMethodListener listener : _frameListeners)
-//             {
-//                 listener.error(e);
-//             }
-//             _minaProtocolSession.close();
-//         }
-//     }
-
-//     private void contentFrameReceived(AMQFrame frame) throws AMQException
-//     {
-//         if (frame.bodyFrame instanceof ContentHeaderBody)
-//         {
-//             contentHeaderReceived(frame);
-//         }
-//         else if (frame.bodyFrame instanceof ContentBody)
-//         {
-//             contentBodyReceived(frame);
-//         }
-//         else if (frame.bodyFrame instanceof HeartbeatBody)
-//         {
-//             _logger.debug("Received heartbeat from client");
-//         }
-//         else
-//         {
-//             _logger.warn("Unrecognised frame " + frame.getClass().getName());
-//         }
-//     }
-
-//     private void contentHeaderReceived(AMQFrame frame) throws AMQException
-//     {
-//         if (_logger.isDebugEnabled())
-//         {
-//             _logger.debug("Content header frame received: " + frame);
-//         }
-//         getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame);
-//     }
-
-//     private void contentBodyReceived(AMQFrame frame) throws AMQException
-//     {
-//         if (_logger.isDebugEnabled())
-//         {
-//             _logger.debug("Content body frame received: " + frame);
-//         }
-//         getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame);
-//     }
+    public long writeRequest(int channel, AMQMethodBody methodBody, AMQResponseCallback responseCallback)
+        throws RequestResponseMappingException
+    {
+        AMQChannel channel = getChannel(channel);
+        RequestManager requestManager = channel.getRequestManager();
+        return requestManager.sendRequest(methodBody, responseCallback);
+    }
+
+    public void writeResponse(int channel, long requestId, AMQMethodBody methodBody)
+        throws RequestResponseMappingException
+    {
+        AMQChannel channel = getChannel(channel);
+        ResponseManager responseManager = channel.getResponseManager();
+        responseManager(requestId, methodBody);
+    }
 
     /**
      * Convenience method that writes a frame to the protocol session. Equivalent

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQChannelException.java?view=diff&rev=496439&r1=496438&r2=496439
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQChannelException.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQChannelException.java Mon Jan 15 11:00:14 2007
@@ -21,7 +21,7 @@
 package org.apache.qpid;
 
 import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
 
 public class AMQChannelException extends AMQException
 {
@@ -49,8 +49,8 @@
         this.minor = minor;
     }
 
-    public AMQFrame getCloseFrame(int channel)
+    public AMQMethodBody getCloseMethodBody()
     {
-        return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), getMessage());
+        return ChannelCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), getMessage());
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java?view=diff&rev=496439&r1=496438&r2=496439
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java Mon Jan 15 11:00:14 2007
@@ -27,7 +27,7 @@
 public class RequestManager
 {
     private int channel;
-    AMQProtocolWriter protocolSession;
+    AMQProtocolWriter protocolWriter;
 
     /**
      * Request and response frames must have a requestID and responseID which
@@ -43,10 +43,10 @@
 
     private Hashtable<Long, AMQResponseCallback> requestSentMap;
 
-    public RequestManager(int channel, AMQProtocolWriter protocolSession)
+    public RequestManager(int channel, AMQProtocolWriter protocolWriter)
     {
         this.channel = channel;
-        this.protocolSession = protocolSession;
+        this.protocolWriter = protocolWriter;
         requestIdCount = 1L;
         lastProcessedResponseId = 0L;
         requestSentMap = new Hashtable<Long, AMQResponseCallback>();
@@ -60,7 +60,7 @@
         long requestId = getNextRequestId(); // Get new request ID
         AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId,
             lastProcessedResponseId, requestMethodBody);
-        protocolSession.writeFrame(requestFrame);
+        protocolWriter.writeFrame(requestFrame);
         requestSentMap.put(requestId, responseCallback);
         return requestId;
     }

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=496439&r1=496438&r2=496439
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java Mon Jan 15 11:00:14 2007
@@ -32,7 +32,7 @@
 {
     private int channel;
     AMQMethodListener methodListener;
-    AMQProtocolWriter protocolSession;
+    AMQProtocolWriter protocolWriter;
 
     /**
      * Determines the batch behaviour of the manager.
@@ -91,11 +91,11 @@
     private Hashtable<Long, ResponseStatus> responseMap;
 
     public ResponseManager(int channel, AMQMethodListener methodListener,
-        AMQProtocolWriter protocolSession)
+        AMQProtocolWriter protocolWriter)
     {
         this.channel = channel;
         this.methodListener = methodListener;
-        this.protocolSession = protocolSession;
+        this.protocolWriter = protocolWriter;
         responseIdCount = 1L;
         lastReceivedRequestId = 0L;
         responseMap = new Hashtable<Long, ResponseStatus>();
@@ -221,6 +221,6 @@
         long responseId = getNextResponseId(); // Get new request ID
         AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId,
             firstRequestId, numAdditionalRequests, responseMethodBody);
-        protocolSession.writeFrame(responseFrame);
+        protocolWriter.writeFrame(responseFrame);
     }
 }