You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/15 17:26:01 UTC

svn commit: r496389 - in /incubator/qpid/branches/qpid.0-9/java: broker/src/main/java/org/apache/qpid/server/protocol/ common/src/main/java/org/apache/qpid/framing/

Author: kpvdr
Date: Mon Jan 15 08:26:00 2007
New Revision: 496389

URL: http://svn.apache.org/viewvc?view=rev&rev=496389
Log:
Request and Respone managers now use the new common AMQMethodListener class

Removed:
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java
Modified:
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=496389&r1=496388&r2=496389
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Jan 15 08:26:00 2007
@@ -36,9 +36,7 @@
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQRequestBody;
 import org.apache.qpid.framing.AMQResponseBody;
-//import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.HeartbeatBody;
-//import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -221,24 +219,6 @@
             {
                 _logger.error("Received invalid frame: " + frame.toString());
             }
-//             else if (frame.bodyFrame instanceof AMQMethodBody)
-//             {
-//                 methodFrameReceived(frame);
-//             }
-//             else
-//             {
-//                 try
-//                 {
-//                     contentFrameReceived(frame);
-//                 }
-//                 catch (RequiredDeliveryException e)
-//                 {
-//                     //need to return the message:
-//                     _logger.info("Returning message to " + this + " channel " + frame.channel
-//                                  + ": " + e.getMessage());
-//                     writeFrame(e.getReturnMessage(frame.channel));
-//                 }
-//             }
         }
     }
     

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java?view=diff&rev=496389&r1=496388&r2=496389
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java Mon Jan 15 08:26:00 2007
@@ -28,21 +28,21 @@
 {
     private int channel;
     AMQProtocolWriter protocolSession;
-    
+
     /**
      * Request and response frames must have a requestID and responseID which
      * indepenedently increment from 0 on a per-channel basis. These are the
      * counters, and contain the value of the next (not yet used) frame.
      */
     private long requestIdCount;
-    
+
     /**
      * These keep track of the last requestId and responseId to be received.
      */
     private long lastProcessedResponseId;
-            
+
     private Hashtable<Long, AMQResponseCallback> requestSentMap;
-    
+
     public RequestManager(int channel, AMQProtocolWriter protocolSession)
     {
         this.channel = channel;
@@ -51,9 +51,9 @@
         lastProcessedResponseId = 0L;
         requestSentMap = new Hashtable<Long, AMQResponseCallback>();
     }
-    
+
     // *** Functions to originate a request ***
-    
+
     public long sendRequest(AMQMethodBody requestMethodBody,
         AMQResponseCallback responseCallback)
     {
@@ -64,7 +64,7 @@
         requestSentMap.put(requestId, responseCallback);
         return requestId;
     }
-    
+
     public void responseReceived(AMQResponseBody responseBody)
         throws RequestResponseMappingException
     {
@@ -81,16 +81,16 @@
         }
         lastProcessedResponseId = responseBody.getResponseId();
     }
-    
+
     // *** Management functions ***
-    
+
     public int requestsMapSize()
     {
         return requestSentMap.size();
     }
-        
+
     // *** Private helper functions ***
-    
+
     private long getNextRequestId()
     {
         return requestIdCount++;

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=496389&r1=496388&r2=496389
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java Mon Jan 15 08:26:00 2007
@@ -24,12 +24,14 @@
 import java.util.Hashtable;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.AMQProtocolWriter;
 
 public class ResponseManager
 {
     private int channel;
-    RequestHandler requestHandler;
+    AMQMethodListener methodListener;
     AMQProtocolWriter protocolSession;
 
     /**
@@ -51,66 +53,68 @@
      */
     public enum batchResponseModeEnum { NONE }
     private batchResponseModeEnum batchResponseMode;
-    
+
     /**
      * Request and response frames must have a requestID and responseID which
      * indepenedently increment from 0 on a per-channel basis. These are the
      * counters, and contain the value of the next (not yet used) frame.
      */
     private long responseIdCount;
-    
+
     /**
      * These keep track of the last requestId and responseId to be received.
      */
     private long lastReceivedRequestId;
-    
+
     /**
      * Last requestID sent in a response (for batching)
      */
     private long lastSentRequestId;
-    
+
     private class ResponseStatus implements Comparable<ResponseStatus>
     {
-         public long requestId;
-        public AMQMethodBody responseMethodBody;
-         
+        private long requestId;
+        private AMQMethodBody responseMethodBody;
+
         public ResponseStatus(long requestId)
         {
-               this.requestId = requestId;
+            this.requestId = requestId;
             responseMethodBody = null;
         }
-         
+
         public int compareTo(ResponseStatus o)
         {
             return (int)(requestId - o.requestId);
         }
     }
-    
+
     private Hashtable<Long, ResponseStatus> responseMap;
-    
-    public ResponseManager(int channel, RequestHandler requestHandler,
+
+    public ResponseManager(int channel, AMQMethodListener methodListener,
         AMQProtocolWriter protocolSession)
     {
         this.channel = channel;
-        this.requestHandler = requestHandler;
+        this.methodListener = methodListener;
         this.protocolSession = protocolSession;
         responseIdCount = 1L;
         lastReceivedRequestId = 0L;
         responseMap = new Hashtable<Long, ResponseStatus>();
     }
-    
+
     // *** Functions to handle an incoming request ***
-    
-    public void requestReceived(AMQRequestBody requestBody)
+
+    public void requestReceived(AMQRequestBody requestBody) throws Exception
     {
         long requestId = requestBody.getRequestId();
         // TODO: responseMark is used in HA, but until then, ignore...
         long responseMark = requestBody.getResponseMark();
         lastReceivedRequestId = requestId;
         responseMap.put(requestId, new ResponseStatus(requestId));
-        requestHandler.requestReceived(requestBody);
+        // TODO: Update MethodEvent to use the RequestBody instead of MethodBody
+        AMQMethodEvent methodEvent = new AMQMethodEvent(channel, requestBody.getMethodPayload());
+        methodListener.methodReceived(methodEvent);
     }
-    
+
     public void sendResponse(long requestId, AMQMethodBody responseMethodBody)
         throws RequestResponseMappingException
     {
@@ -124,14 +128,14 @@
         responseStatus.responseMethodBody = responseMethodBody;
         doBatches();
     }
-    
+
     // *** Management functions ***
 
     public batchResponseModeEnum getBatchResponseMode()
     {
         return batchResponseMode;
     }
-    
+
     public void setBatchResponseMode(batchResponseModeEnum batchResponseMode)
     {
         if (this.batchResponseMode != batchResponseMode)
@@ -140,12 +144,12 @@
             doBatches();
         }
     }
-    
+
     public int responsesMapSize()
     {
         return responseMap.size();
     }
-    
+
     /**
      * As the responseMap may contain both outstanding responses (those with
      * ResponseStatus.responseMethodBody still null) and responses waiting to
@@ -162,7 +166,7 @@
         }
         return cnt;
     }
-    
+
     /**
      * As the responseMap may contain both outstanding responses (those with
      * ResponseStatus.responseMethodBody still null) and responses waiting to
@@ -179,14 +183,14 @@
         }
         return cnt;
     }
-    
+
     // *** Private helper functions ***
-    
+
     private long getNextResponseId()
     {
         return responseIdCount++;
     }
-    
+
     private void doBatches()
     {
         switch (batchResponseMode)
@@ -204,13 +208,13 @@
                     }
                 }
                 break;
-                
+
             // TODO: Add additional batch mode handlers here...
             // case DELAY_FIXED:
             // case MANUAL:
         }
     }
-    
+
     private void sendResponseBatch(long firstRequestId, int numAdditionalRequests,
         AMQMethodBody responseMethodBody)
     {