You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/24 19:42:38 UTC

svn commit: r499525 - in /incubator/qpid/branches/qpid.0-9/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/handler/ client/src/main/java/org/apache/qpid/client/message/ common/src/main/java/org/apache/qpid...

Author: kpvdr
Date: Wed Jan 24 10:42:37 2007
New Revision: 499525

URL: http://svn.apache.org/viewvc?view=rev&rev=499525
Log:
Fixed problem with missing type field in Message.transport. Used JMSHeaders instead.

Modified:
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java?view=diff&rev=499525&r1=499524&r2=499525
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java Wed Jan 24 10:42:37 2007
@@ -26,7 +26,8 @@
 {
     JMSX_QPID_JMSDESTINATIONURL,
     JMSXGroupID,
-    JMSXGroupSeq;
+    JMSXGroupSeq,
+    JMSXType;
 
     private static Enumeration _names;
     

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java?view=diff&rev=499525&r1=499524&r2=499525
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java Wed Jan 24 10:42:37 2007
@@ -75,7 +75,7 @@
         
         msg.contentHeader = messageHeaders;
         
-        if(transferBody.getBody().contentType == Content.TypeEnum.INLINE_T)
+        if(transferBody.getBody().getContentType() == Content.TypeEnum.INLINE_T)
         {
         	msg.addContent(transferBody.getBody().getContentAsByteArray());
         	protocolSession.deliverMessageToAMQSession(evt.getChannelId(), msg);

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=499525&r1=499524&r2=499525
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Wed Jan 24 10:42:37 2007
@@ -34,6 +34,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.client.CustomJMXProperty;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
@@ -205,12 +206,18 @@
 
     public String getJMSType() throws JMSException
     {
-        return getMessageHeaders().getType();
+        // Since the type field is not a part of message.transport and is used only for
+        // JMS messages, this change to JMS Headers solves the problem.
+        // return getMessageHeaders().getType();
+        return getStringProperty(CustomJMXProperty.JMSXType.toString());
     }
 
     public void setJMSType(String string) throws JMSException
     {
-        getMessageHeaders().setType(string);
+        // Since the type field is not a part of message.transport and is used only for
+        // JMS messages, this change to JMS Headers solves the problem.
+        // getMessageHeaders().setType(string);
+        setStringProperty(CustomJMXProperty.JMSXType.toString(), string);
     }
 
     public long getJMSExpiration() throws JMSException

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java?view=diff&rev=499525&r1=499524&r2=499525
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java Wed Jan 24 10:42:37 2007
@@ -42,8 +42,8 @@
         }
     }
     
-    public TypeEnum contentType;
-    public ByteBuffer content;
+    private TypeEnum contentType;
+    private ByteBuffer content;
     
     // Constructors
     
@@ -126,7 +126,7 @@
     {
     	EncodingUtils.writeUnsignedByte(buffer, contentType.toByte());
     	EncodingUtils.writeUnsignedInteger(buffer, content.remaining());
-        buffer.put(content);
+        buffer.put(content.duplicate());
     }
     
     public void populateFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=499525&r1=499524&r2=499525
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java Wed Jan 24 10:42:37 2007
@@ -44,25 +44,8 @@
      */
     private boolean serverFlag;
 
-    /**
-     * Determines the batch behaviour of the manager.
-     *
-     * Responses are sent to the RequestResponseManager through sendResponse().
-     * These may be internally stored/accumulated for batching purposes, depending
-     * on the batching strategy/mode of the RequestResponseManager.
-     *
-     * The following modes are possibe:
-     *
-     * NONE: Each request results in an immediate single response, no batching
-     *     takes place.
-     * DELAY_FIXED: Waits until a fixed period has passed to batch
-     *     accumulated responses. An optional fixed threshold may be set, which
-     *     if reached or exceeded within the delay period will trigger the batch. (TODO)
-     * MANUAL: No response is sent until it is explicitly released by calling
-     *     function xxxx(). (TODO)
-     */
-    public enum batchResponseModeEnum { NONE }
-    private batchResponseModeEnum batchResponseMode = batchResponseModeEnum.NONE;
+    private int maxAccumulatedResponses = 20; // Default
+//    private Class currentResponseMethodBodyClass;
 
     /**
      * Request and response frames must have a requestID and responseID which
@@ -109,6 +92,7 @@
         this.serverFlag = serverFlag;
         responseIdCount = 1L;
         lastReceivedRequestId = 0L;
+//        currentResponseMethodBodyClass = null;
         responseMap = new ConcurrentHashMap<Long, ResponseStatus>();
     }
 
@@ -122,7 +106,7 @@
             logger.debug((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
                 " " + requestBody + "; " + requestBody.getMethodPayload());
         }
-        //System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
+        //System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + 
         //        " " + requestBody + "; " + requestBody.getMethodPayload());
         // TODO: responseMark is used in HA, but until then, ignore...
         long responseMark = requestBody.getResponseMark();
@@ -150,26 +134,79 @@
         if (responseStatus.responseMethodBody != null)
             throw new RequestResponseMappingException(requestId, "RequestId " +
                 requestId + " already has a response in responseMap.");
+                
         responseStatus.responseMethodBody = responseMethodBody;
         doBatches();
+        
+//         if (currentResponseMethodBodyClass == null)
+//         {
+//             currentResponseMethodBodyClass = responseMethodBody.getClass();
+//             responseStatus.responseMethodBody = responseMethodBody;
+//         }
+//         else if (currentResponseMethodBodyClass.equals(responseMethodBody.getClass()))
+//         {
+//             doBatches();
+//             currentResponseMethodBodyClass = responseMethodBody.getClass();
+//             responseStatus.responseMethodBody = responseMethodBody;
+//         }
+//         else
+//         {
+//             responseStatus.responseMethodBody = responseMethodBody;
+//             if (batchedResponses() >= maxAccumulatedResponses)
+//                 doBatches();
+//         }
     }
 
     // *** Management functions ***
 
-    public batchResponseModeEnum getBatchResponseMode()
-    {
-        return batchResponseMode;
-    }
-
-    public void setBatchResponseMode(batchResponseModeEnum batchResponseMode)
+    /**
+     * Sends batched responses - i.e. all those members of responseMap that have
+     * received a response.
+     */
+    public synchronized void doBatches()
     {
-        if (this.batchResponseMode != batchResponseMode)
-        {
-            this.batchResponseMode = batchResponseMode;
-            doBatches();
+        long startRequestId = 0;
+        int numAdditionalRequestIds = 0;
+        Class responseMethodBodyClass = null;
+        Iterator<Long> lItr = responseMap.keySet().iterator();
+        while (lItr.hasNext())
+        {
+            long requestId = lItr.next();
+            ResponseStatus responseStatus = responseMap.get(requestId);
+            if (responseStatus.responseMethodBody != null)
+            {
+//                 if (startRequestId == 0 || responseMethodBodyClass == null)
+//                 {
+//                     startRequestId = requestId;
+//                     responseMethodBodyClass = responseStatus.responseMethodBody.getClass();
+//                     lItr.remove();
+//                 }
+//                 else if (responseMethodBodyClass.equals(responseStatus.responseMethodBody.getClass()))
+//                 {
+//                     numAdditionalRequestIds++;
+//                     lItr.remove();
+//                 }
+//                 else
+//                 {
+//                     sendResponseBatchFrame(startRequestId, numAdditionalRequestIds,
+//                         responseStatus.responseMethodBody);
+//                     numAdditionalRequestIds = 0;
+//                     startRequestId = requestId;
+//                     responseMethodBodyClass = responseStatus.responseMethodBody.getClass();
+//                     lItr.remove();
+//                 }
+               sendResponseBatchFrame(requestId, 0, responseStatus.responseMethodBody);
+               lItr.remove();
+            }
         }
     }
 
+    /**
+     * Total number of entries in the responseMap - including both those that
+     * are outstanding (i.e. no response has been received) and those that are
+     * batched (those for which responses have been received but have not yet
+     * been collected together and sent).
+     */
     public int responsesMapSize()
     {
         return responseMap.size();
@@ -216,31 +253,7 @@
         return responseIdCount++;
     }
 
-    private synchronized void doBatches()
-    {
-        switch (batchResponseMode)
-        {
-            case NONE:
-                Iterator<Long> lItr = responseMap.keySet().iterator();
-                while (lItr.hasNext())
-                {
-                    long requestId = lItr.next();
-                    ResponseStatus responseStatus = responseMap.get(requestId);
-                    if (responseStatus.responseMethodBody != null)
-                    {
-                        sendResponseBatch(requestId, 0, responseStatus.responseMethodBody);
-                        lItr.remove();
-                    }
-                }
-                break;
-
-            // TODO: Add additional batch mode handlers here...
-            // case DELAY_FIXED:
-            // case MANUAL:
-        }
-    }
-
-    private void sendResponseBatch(long firstRequestId, int numAdditionalRequests,
+    private void sendResponseBatchFrame(long firstRequestId, int numAdditionalRequests,
         AMQMethodBody responseMethodBody)
     {
         long responseId = getNextResponseId(); // Get new response ID