You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/20 18:23:11 UTC

svn commit: r827724 [2/8] - in /qpid/branches/java-broker-0-10/qpid/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/src/main/java/org/apach...

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java Tue Oct 20 16:23:01 2009
@@ -20,8 +20,6 @@
 // Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
 //
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.Filterable;
 
 /**

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java Tue Oct 20 16:23:01 2009
@@ -14,14 +14,12 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.server.filter;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.Filterable;
 
 public interface MessageFilter

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java Tue Oct 20 16:23:01 2009
@@ -22,7 +22,6 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.Filterable;
 
 import java.lang.reflect.Constructor;

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java Tue Oct 20 16:23:01 2009
@@ -18,7 +18,6 @@
 package org.apache.qpid.server.filter;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.Filterable;
 
 //
@@ -43,16 +42,16 @@
     public String toString() {
         return "XQUERY "+ConstantExpression.encodeString(xpath);
     }
-    
+
     /**
      * @param message
      * @return true if the expression evaluates to Boolean.TRUE.
      * @throws AMQException
      */
-    public boolean matches(Filterable message) 
+    public boolean matches(Filterable message)
     {
         Object object = evaluate(message);
-        return object!=null && object==Boolean.TRUE;            
+        return object!=null && object==Boolean.TRUE;
     }
 
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java Tue Oct 20 16:23:01 2009
@@ -27,8 +27,6 @@
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.Filterable;
 import org.apache.xpath.CachedXPathAPI;
 import org.w3c.dom.Document;
@@ -36,14 +34,14 @@
 import org.xml.sax.InputSource;
 
 public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator {
-    
+
     private final String xpath;
 
     public XalanXPathEvaluator(String xpath) {
         this.xpath = xpath;
     }
-    
-    public boolean evaluate(Filterable m) 
+
+    public boolean evaluate(Filterable m)
     {
         // TODO - we would have to check the content type and then evaluate the content
         //        here... is this really a feature we wish to implement? - RobG
@@ -65,18 +63,18 @@
 
     private boolean evaluate(byte[] data) {
         try {
-            
+
             InputSource inputSource = new InputSource(new ByteArrayInputStream(data));
-            
+
             DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
             factory.setNamespaceAware(true);
             DocumentBuilder dbuilder = factory.newDocumentBuilder();
             Document doc = dbuilder.parse(inputSource);
-            
+
             CachedXPathAPI cachedXPathAPI = new CachedXPathAPI();
             NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath);
             return iterator.nextNode()!=null;
-            
+
         } catch (Throwable e) {
             return false;
         }
@@ -85,12 +83,12 @@
     private boolean evaluate(String text) {
         try {
             InputSource inputSource = new InputSource(new StringReader(text));
-            
+
             DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
             factory.setNamespaceAware(true);
             DocumentBuilder dbuilder = factory.newDocumentBuilder();
             Document doc = dbuilder.parse(inputSource);
-            
+
             // We should associated the cachedXPathAPI object with the message being evaluated
             // since that should speedup subsequent xpath expressions.
             CachedXPathAPI cachedXPathAPI = new CachedXPathAPI();

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java Tue Oct 20 16:23:01 2009
@@ -1,6 +1,5 @@
 package org.apache.qpid.server.flow;
 
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.message.ServerMessage;
 
 /*

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Oct 20 16:23:01 2009
@@ -25,7 +25,6 @@
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.ConsumerTagNotUniqueException;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.access.Permission;
@@ -116,17 +115,31 @@
 
                 try
                 {
-                    AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
-                                                                          body.getArguments(), body.getNoLocal(), body.getExclusive());
-                    if (!body.getNowait())
+                    if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
                     {
-                        MethodRegistry methodRegistry = session.getMethodRegistry();
-                        AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
-                        session.writeFrame(responseBody.generateFrame(channelId));
 
+                        AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
+                                                                              body.getArguments(), body.getNoLocal(), body.getExclusive());
+                        if (!body.getNowait())
+                        {
+                            MethodRegistry methodRegistry = session.getMethodRegistry();
+                            AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
+                            session.writeFrame(responseBody.generateFrame(channelId));
+
+                        }
+                    }
+                    else
+                    {
+                        AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
+
+                        MethodRegistry methodRegistry = session.getMethodRegistry();
+                        AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),    // replyCode
+                                                                 msg,               // replytext
+                                                                 body.getClazz(),
+                                                                 body.getMethod());
+                        session.writeFrame(responseBody.generateFrame(0));
                     }
 
-                    
                 }
                 catch (org.apache.qpid.AMQInvalidArgumentException ise)
                 {
@@ -141,17 +154,6 @@
 
 
                 }
-                catch (ConsumerTagNotUniqueException e)
-                {
-                    AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
-
-                    MethodRegistry methodRegistry = session.getMethodRegistry();
-                    AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),    // replyCode
-                                                             msg,               // replytext
-                                                             body.getClazz(),
-                                                             body.getMethod());
-                    session.writeFrame(responseBody.generateFrame(0));
-                }
                 catch (AMQQueue.ExistingExclusiveSubscription e)
                 {
                     throw body.getChannelException(AMQConstant.ACCESS_REFUSED,

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Tue Oct 20 16:23:01 2009
@@ -30,6 +30,7 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.flow.MessageOnlyCreditManager;
 import org.apache.qpid.server.subscription.SubscriptionImpl;
@@ -40,9 +41,6 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.security.access.Permission;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Tue Oct 20 16:23:01 2009
@@ -24,7 +24,6 @@
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.security.access.Permission;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -96,7 +95,7 @@
 
             session.writeFrame(responseBody.generateFrame(channelId));
 
-            
+
         }
     }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Tue Oct 20 16:23:01 2009
@@ -50,5 +50,6 @@
         }
         stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
         session.initHeartbeats(body.getHeartbeat());
+        session.setMaxFrameSize(body.getFrameMax());
     }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -92,11 +92,11 @@
                     try
                     {
 
-                    exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(),
-                                                              body.getType() == null ? null : body.getType().intern(),
-                                                              body.getDurable(),
-                                                              body.getPassive(), body.getTicket());
-                    exchangeRegistry.registerExchange(exchange);
+                        exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(),
+                                                                  body.getType() == null ? null : body.getType().intern(),
+                                                                  body.getDurable(),
+                                                                  body.getPassive(), body.getTicket());
+                        exchangeRegistry.registerExchange(exchange);
                     }
                     catch(AMQUnknownExchangeType e)
                     {

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Tue Oct 20 16:23:01 2009
@@ -40,7 +40,6 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties Tue Oct 20 16:23:01 2009
@@ -61,13 +61,32 @@
 # 0 - path
 MST-1002 = Store location : {0}
 MST-1003 = Closed
+MST-1004 = Recovery Start
+MST-1005 = Recovered {0,number} messages
+MST-1006 = Recovery Complete
+
+#ConfigStore
+# 0 - name
+CFG-1001 = Created : {0}
+# 0 - path
+CFG-1002 = Store location : {0}
+CFG-1003 = Closed
+CFG-1004 = Recovery Start
+CFG-1005 = Recovery Complete
+
+#TransactionLog
+# 0 - name
+TXN-1001 = Created : {0}
+# 0 - path
+TXN-1002 = Store location : {0}
+TXN-1003 = Closed
 # 0 - queue name
-MST-1004 = Recovery Start[ : {0}]
+TXN-1004 = Recovery Start[ : {0}]
 # 0 - count
 # 1 - queue count
-MST-1005 = Recovered {0,number} messages for queue {1}
+TXN-1005 = Recovered {0,number} messages for queue {1}
 # 0 - queue name
-MST-1006 = Recovery Complete[ : {0}]
+TXN-1006 = Recovery Complete[ : {0}]
 
 #Connection
 # 0 - Client id
@@ -83,12 +102,18 @@
 # 0 - bytes allowed in prefetch
 # 1 - number of messagse.
 CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number}
+# 0 - queue causing flow control
+CHN-1005 = Flow Control Enforced (Queue {0})
+CHN-1006 = Flow Control Removed
 
 #Queue
 # 0 - owner
 # 1 - priority
 QUE-1001 = Create :[ Owner: {0}][ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}]
 QUE-1002 = Deleted
+QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number}
+QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
+
 
 #Exchange
 # 0 - type
@@ -104,4 +129,4 @@
 SUB-1001 = Create[ : Durable][ : Arguments : {0}]
 SUB-1002 = Close
 # 0 - The current subscription state
-SUB-1003 = State : {0}
\ No newline at end of file
+SUB-1003 = State : {0}

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties Tue Oct 20 16:23:01 2009
@@ -16,173 +16,7 @@
 #  specific language governing permissions and limitations
 #  under the License.
 #
-# LogMessages used within the Java Broker as originally defined on the wiki:
-#
-# http://cwiki.apache.org/confluence/display/qpid/Status+Update+Design#StatusUpdateDesign-InitialStatusMessages
-#  
-# Technical Notes:
-#  This is a standard Java Properties file so white space is respected at the
-#  end of the lines. This file is processed in a number of ways.
-# 1) ResourceBundle
-#   This file is loaded through a ResourceBundle named LogMessages. the en_US
-#   addition to the file is the localisation. Additional localisations can be
-#   provided and will automatically be selected based on the <locale> value in
-#   the config.xml. The default is en_US.
-#
-# 2) MessasgeFormat
-#  Each entry is prepared with the Java Core MessageFormat methods. Therefore
-#  most functionality you can do via MessageFormat can be done here:
-#
-#  http://java.sun.com/javase/6/docs/api/java/text/MessageFormat.html
-#
-#  The cavet here is that only default String and number FormatTypes can be used.
-#  This is due to the processing described in 3 below. If support for date, time
-#  or choice is requried then the GenerateLogMessages class should be updated to
-#  provide support.
-#
-#  Format Note:
-#   As mentioned earlier white space in this file is very important. One thing
-#  in particular to note is the way MessageFormat peforms its replacements.
-#  The replacement text will totally replace the {xxx} section so there will be
-#  no addtion of white space or removal e.g.
-#     MSG = Text----{0}----
-#  When given parameter 'Hello' result in text:
-#     Text----Hello----
-#
-#  For simple arguments this is expected however when using Style formats then
-#  it can be a little unexepcted. In particular a common pattern is used for
-#  number replacements : {0,number,#}. This is used in the Broker to display an
-#  Integer simply as the Integer with no formating. e.g new Integer(1234567)
-#  becomes the String "1234567" which is can be contrasted with the pattern
-#  without a style format field : {0,number} which becomes string "1,234,567".
-#
-#  What you may not expect is that {0,number, #} would produce the String " 1234567"
-#  note the space after the ','   here      /\   has resulted in a space  /\ in
-#  the output.
-#
-#  More details on the SubformatPattern can be found on the API link above.
-#
-# 3) GenerateLogMessage/Velocity Macro
-#  This is the first and final stage of processing that this file goes through.
-#   1) Class Generation:
-#      The GenerateLogMessage processes this file and uses the velocity Macro
-#      to create classes with static methods to perform the logging and give us
-#      compile time validation.
-#
-#   2) Property Processing:
-#      During the class generation the message properties ({x}) are identified
-#      and used to create the method signature.
-#
-#   3) Option Processing:
-#      The Classes perform final formatting of the messages at runtime based on
-#      optional parameters that are defined within the message. Optional
-#      paramters are enclosed in square brackets e.g. [optional].
-#
-#  To provide fixed log messages as required by the Technical Specification:
-#  http://cwiki.apache.org/confluence/display/qpid/Operational+Logging+-+Status+Update+-+Technical+Specification#OperationalLogging-StatusUpdate-TechnicalSpecification-Howtoprovidefixedlogmessages
-#
-#  This file is processed by Velocity to create a number of classes that contain
-#  static methods that provide LogMessages in the code to provide compile time
-#  validation.
-#
-#  For details of what processing is done see GenerateLogMessages.
-#
-#  What a localiser or developer need know is the following:
-#
-#  The Property structure is important is it defines how the class and methods
-#  will be built.
-#
-#  Class Generation:
-#  =================
-#
-#  Each class of messages will be split in to their own <Class>Messages.java
-#  Currently the following classes are created and are populated with the
-#  messages that bear their 3-digit type identifier:
-#
-#        Class              | Type
-#      ---------------------|--------
-#        Broker             |  BKR
-#        ManagementConsole  |  MNG
-#        VirtualHost        |  VHT
-#        MessageStore       |  MST
-#        Connection         |  CON
-#        Channel            |  CHN
-#        Queue              |  QUE
-#        Exchange           |  EXH
-#        Binding            |  BND
-#        Subscription       |  SUB
-#
-#  Property Processing:
-#  ====================
-#
-#   Each property is then processed by the GenerateLogMessages class to identify
-#   The number and type of parameters, {x} entries. Parameters are defaulted to
-#   String types but the use of FormatType number (e.g.{0,number}) will result
-#   in a Number type being used. These parameters are then used to build the
-#   method parameter list. e.g:
-#   Property:
-#    BRK-1003 = Shuting down : {0} port {1,number,#}
-#   becomes Method:
-#    public static LogMessage BRK_1003(String param1, Number param2)
-#
-#   This improves our compile time validation of log message content and
-#   ensures that change in the message format does not accidentally cause
-#   erroneous messages.
-#
-#  Option Processing:
-#  ====================
-#
-#  Options are identified in the log message as being surrounded by square
-#  brackets ([ ]). These optional values can themselves contain paramters
-#  however nesting of options is not permitted. Identification is performed on
-#  first matchings so give the message:
-#   Msg = Log Message [option1] [option2]
-#  Two options will be identifed and enabled to select text 'option1 and
-#  'option2'.
-#
-#  The nesting of a options is not supported and will provide
-#  unexpected results. e.g. Using Message:
-#   Msg = Log Message [option1 [sub-option2]]
-#
-#  The options will be 'option1 [sub-option2' and 'sub-option2'. The first
-#  option includes the second option as the nesting is not detected.
-#
-#  The detected options are presented in the method signature as boolean options
-#  numerically identified by their position in the message. e.g.
-#   Property:
-#    CON-1001 = Open : Client ID {0} [: Protocol Version : {1}]
-#  becomes Method:
-#    public static LogMessage CON_1001(String param1, String param2, boolean opt1)
-#
-#  The value of 'opt1' will show/hide the option in the message. Note that
-#  'param2' is still required however a null value can be used if the optional
-#  section is not desired.
-#
-#  Again here the importance of white space needs to be highlighted.
-#  Looking at the QUE-1001 message as an example. The first thought on how this
-#  would look would be as follows:
-# QUE-1001 = Create : Owner: {0} [AutoDelete] [Durable] [Transient] [Priority: {1,number,#}]
-#  Each option is correctly defined so the text that is defined will appear when
-#  selected. e.g. 'AutoDelete'. However, what may not be immediately apparent is
-#  the white space. Using the above definition of QUE-1001 if we were to print
-#  the message with only the Priority option displayed it would appear as this:
-#  "Create : Owner: guest    Priority: 1"
-#  Note the spaces here   /\ This is because only the text between the brackets
-#  has been removed.
-#
-#  Each option needs to include white space to correctly format the message. So
-#  the correct definition of QUE-1001 is as follows:
-# QUE-1001 = Create : Owner: {0}[ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}]
-#  Note that white space is included with each option and there is no extra
-#  white space between the options. As a result the output with just Priority
-#  enabled is as follows:
-#  "Create : Owner: guest Priority: 1"
-#
-#  The final processing that is done in the generation is the conversion of the
-#  property name. As a '-' is an illegal character in the method name it is
-#  converted to '_' This processing gives the final method signature as follows:
-#   <Class>Message.<Type>_<Number>(<parmaters>,<options>)
-#
+# Default File used for all non-defined locales.
 #Broker
 # 0 - Version
 # 1 = Build
@@ -227,13 +61,32 @@
 # 0 - path
 MST-1002 = Store location : {0}
 MST-1003 = Closed
+MST-1004 = Recovery Start
+MST-1005 = Recovered {0,number} messages
+MST-1006 = Recovery Complete
+
+#ConfigStore
+# 0 - name
+CFG-1001 = Created : {0}
+# 0 - path
+CFG-1002 = Store location : {0}
+CFG-1003 = Closed
+CFG-1004 = Recovery Start
+CFG-1005 = Recovery Complete
+
+#TransactionLog
+# 0 - name
+TXN-1001 = Created : {0}
+# 0 - path
+TXN-1002 = Store location : {0}
+TXN-1003 = Closed
 # 0 - queue name
-MST-1004 = Recovery Start[ : {0}]
+TXN-1004 = Recovery Start[ : {0}]
 # 0 - count
 # 1 - queue count
-MST-1005 = Recovered {0,number} messages for queue {1}
+TXN-1005 = Recovered {0,number} messages for queue {1}
 # 0 - queue name
-MST-1006 = Recovery Complete[ : {0}]
+TXN-1006 = Recovery Complete[ : {0}]
 
 #Connection
 # 0 - Client id
@@ -247,8 +100,9 @@
 CHN-1002 = Flow {0}
 CHN-1003 = Close
 # 0 - bytes allowed in prefetch
-# 1 - number of messagse. 
+# 1 - number of messagse.
 CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number}
+# 0 - queue causing flow control
 CHN-1005 = Flow Control Enforced (Queue {0})
 CHN-1006 = Flow Control Removed
 
@@ -260,6 +114,7 @@
 QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number}
 QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
 
+
 #Exchange
 # 0 - type
 # 1 - name
@@ -273,4 +128,5 @@
 #Subscription
 SUB-1001 = Create[ : Durable][ : Arguments : {0}]
 SUB-1002 = Close
+# 0 - The current subscription state
 SUB-1003 = State : {0}

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -65,16 +65,9 @@
         return null;
     }
 
-    public void register() throws AMQException
+    public void register() throws JMException
     {
-        try
-        {
-            getManagedObjectRegistry().registerObject(this);
-        }
-        catch (JMException e)
-        {
-            throw new AMQException("Error registering managed object " + this + ": " + e, e);
-        }
+        getManagedObjectRegistry().registerObject(this);
     }
 
     protected ManagedObjectRegistry getManagedObjectRegistry()
@@ -98,7 +91,7 @@
     {
         return getObjectInstanceName() + "[" + getType() + "]";
     }
-    
+
 
     /**
      * Created the ObjectName as per the JMX Specs
@@ -140,7 +133,7 @@
 
         objectName.append(",");
         objectName.append("version=").append(_version);
-        
+
         return new ObjectName(objectName.toString());
     }
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java Tue Oct 20 16:23:01 2009
@@ -54,6 +54,7 @@
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.UnknownHostException;
 import java.rmi.AlreadyBoundException;
 import java.rmi.RemoteException;
 import java.rmi.registry.LocateRegistry;
@@ -236,8 +237,17 @@
          * The registry is exported on the defined management port 'port'. We will export the RMIConnectorServer
          * on 'port +1'. Use of these two well-defined ports will ease any navigation through firewall's. 
          */
-        final RMIServerImpl rmiConnectorServerStub = new RMIJRMPServerImpl(port+PORT_EXPORT_OFFSET, csf, ssf, env); 
-        final String hostname = InetAddress.getLocalHost().getHostName();
+        final RMIServerImpl rmiConnectorServerStub = new RMIJRMPServerImpl(port+PORT_EXPORT_OFFSET, csf, ssf, env);
+        String localHost;
+        try
+        {
+            localHost = InetAddress.getLocalHost().getHostName();
+        }
+        catch(UnknownHostException ex)
+        {
+            localHost="127.0.0.1";
+        }
+        final String hostname = localHost;
         final JMXServiceURL externalUrl = new JMXServiceURL(
                 "service:jmx:rmi://"+hostname+":"+(port+PORT_EXPORT_OFFSET)+"/jndi/rmi://"+hostname+":"+port+"/jmxrmi");
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@
 
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import javax.management.JMException;
 
 import org.apache.qpid.AMQException;
 
@@ -45,7 +46,7 @@
 
     ManagedObject getParentObject();
 
-    void register() throws AMQException;
+    void register() throws AMQException, JMException;
 
     void unregister() throws AMQException;
 

Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java (from r824494, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java&r1=824494&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java Tue Oct 20 16:23:01 2009
@@ -18,21 +18,18 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.queue;
+package org.apache.qpid.server.message;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.message.*;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.queue.AMQQueue;
 
 
-import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.nio.ByteBuffer;
 
 /**
  * A deliverable message.
@@ -44,8 +41,6 @@
 
     private final AtomicInteger _referenceCount = new AtomicInteger(0);
 
-    private final AMQMessageHandle _messageHandle;
-
     /** Flag to indicate that this message requires 'immediate' delivery. */
 
     private static final byte IMMEDIATE = 0x01;
@@ -65,73 +60,21 @@
 
     private Object _sessionIdentifier;
     private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
-    private final AMQMessageHeader _messageHeader;
-
-
-    /**
-     * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
-     * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
-     * queues.
-     *
-     * @param messageId
-     * @param store
-     * @param factory
-     *
-     * @throws AMQException
-     */
-    public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory)
-            throws AMQException
-    {
-        _messageHandle = factory.createMessageHandle(messageId, store, true);
-        _size = _messageHandle.getBodySize();
-        _messageHeader = new ContentHeaderBodyAdapter(_messageHandle.getContentHeaderBody());
-    }
 
-        /**
-     * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
-     * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
-     * queues.
-     *
-     * @param messageHandle
-     *
-     * @throws AMQException
-     */
-    public AMQMessage(
-                AMQMessageHandle messageHandle,
-                MessagePublishInfo info)
-            throws AMQException
-    {
-        this(messageHandle, messageHandle.getContentHeaderBody(), messageHandle.getBodySize(), info);
-    }
+    private final StoredMessage<MessageMetaData> _handle;
 
-    public AMQMessage(
-                    AMQMessageHandle messageHandle,
-                    ContentHeaderBody chb,
-                    long size,
-                    MessagePublishInfo info)
-                throws AMQException
 
+    public AMQMessage(StoredMessage<MessageMetaData> handle)
     {
-        _messageHandle = messageHandle;
-
-        _messageHeader = new ContentHeaderBodyAdapter(chb);
+        _handle = handle;
+        final MessageMetaData metaData = handle.getMetaData();
+        _size = metaData.getContentSize();
+        final MessagePublishInfo messagePublishInfo = metaData.getMessagePublishInfo();
 
-        if(info.isImmediate())
+        if(messagePublishInfo.isImmediate())
         {
             _flags |= IMMEDIATE;
         }
-        _size = size;
-
-    }
-
-
-    protected AMQMessage(AMQMessage msg) throws AMQException
-    {
-        _messageHandle = msg._messageHandle;
-        _messageHeader = msg._messageHeader;
-        _flags = msg._flags;
-        _size = msg._size;
-
     }
 
 
@@ -152,26 +95,21 @@
         return _referenceCount.get() > 0;
     }
 
-    public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
-    {
-        return new BodyFrameIterator(protocolSession, channel, _messageHandle);
-    }
-
-    public Iterator<ContentChunk> getContentBodyIterator()
+    public MessageMetaData getMessageMetaData()
     {
-        return new BodyContentIterator(_messageHandle);
+        return _handle.getMetaData();
     }
 
     public ContentHeaderBody getContentHeaderBody() throws AMQException
     {
-        return _messageHandle.getContentHeaderBody();
+        return getMessageMetaData().getContentHeaderBody();
     }
 
 
 
     public Long getMessageId()
     {
-        return _messageHandle.getMessageId();
+        return _handle.getMessageNumber();
     }
 
     /**
@@ -211,10 +149,10 @@
      * message store.
      *
      *
-     * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
+     * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that
      *                                 failed
      */
-    public void decrementReference() throws MessageCleanupException
+    public void decrementReference()
     {
         int count = _referenceCount.decrementAndGet();
 
@@ -229,27 +167,19 @@
             // by copying from other queues at the same time as it is being removed.
             _referenceCount.set(Integer.MIN_VALUE/2);
 
-            try
+            // must check if the handle is null since there may be cases where we decide to throw away a message
+            // and the handle has not yet been constructed
+            if (_handle != null)
             {
-                // must check if the handle is null since there may be cases where we decide to throw away a message
-                // and the handle has not yet been constructed
-                if (_messageHandle != null && isPersistent())
-                {
-                    _messageHandle.removeMessage();
+                _handle.remove();
 
-                }
-            }
-            catch (AMQException e)
-            {
-
-                throw new MessageCleanupException(getMessageId(), e);
             }
         }
         else
         {
             if (count < 0)
             {
-                throw new MessageCleanupException("Reference count for message id " + debugIdentity()
+                throw new RuntimeException("Reference count for message id " + debugIdentity()
                                                   + " has gone below 0.");
             }
         }
@@ -274,12 +204,12 @@
 
     public AMQMessageHeader getMessageHeader()
     {
-        return _messageHeader;
+        return getMessageMetaData().getMessageHeader();
     }
 
     public boolean isPersistent()
     {
-        return _messageHandle.isPersistent();
+        return getMessageMetaData().isPersistent();
     }
 
     /**
@@ -297,12 +227,12 @@
 
     public MessagePublishInfo getMessagePublishInfo() throws AMQException
     {
-        return _messageHandle.getMessagePublishInfo();
+        return getMessageMetaData().getMessagePublishInfo();
     }
 
     public long getArrivalTime()
     {
-        return _messageHandle.getArrivalTime();
+        return getMessageMetaData().getArrivalTime();
     }
 
     /**
@@ -336,13 +266,6 @@
         _flags |= DELIVERED_TO_CONSUMER;
     }
 
-
-
-    public AMQMessageHandle getMessageHandle()
-    {
-        return _messageHandle;
-    }
-
     public long getSize()
     {
         return _size;
@@ -394,4 +317,13 @@
         return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
     }
 
+    public int getContent(ByteBuffer buf, int offset)
+    {
+        return _handle.getContent(offset, buf);
+    }
+
+    public StoredMessage<MessageMetaData> getStoredMessage()
+    {
+        return _handle;
+    }
 }

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java Tue Oct 20 16:23:01 2009
@@ -30,6 +30,10 @@
 
     String getMessageId();
 
+    String getMimeType();
+
+    String getEncoding();
+
     byte getPriority();
 
     long getTimestamp();

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java Tue Oct 20 16:23:01 2009
@@ -20,13 +20,9 @@
  */
 package org.apache.qpid.server.message;
 
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.queue.MessageCleanupException;
 
-import javax.swing.*;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
 public class AMQMessageReference extends MessageReference<AMQMessage>
 {
 
@@ -43,22 +39,6 @@
 
     protected void onRelease(AMQMessage message)
     {
-        try
-        {
-            if(message !=null)
-            {
-                message.decrementReference();
-            }
-            else
-            {
-                //TODO
-                System.err.println("Shouldn't happen!!!!");
-            }
-        }
-        catch (MessageCleanupException e)
-        {
-            // TODO
-            throw new RuntimeException(e);
-        }
+        message.decrementReference();
     }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java Tue Oct 20 16:23:01 2009
@@ -55,6 +55,16 @@
         return getProperties().getMessageIdAsString();
     }
 
+    public String getMimeType()
+    {
+        return getProperties().getContentTypeAsString();
+    }
+
+    public String getEncoding()
+    {
+        return getProperties().getEncodingAsString();
+    }
+
     public byte getPriority()
     {
         return getProperties().getPriority();

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.message;
+
+import java.nio.ByteBuffer;
+
+public interface MessageContentSource
+{
+    public int getContent(ByteBuffer buf, int offset);
+
+    long getSize();
+}

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
------------------------------------------------------------------------------
    svn:executable = *

Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java (from r821930, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java&r1=821930&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java Tue Oct 20 16:23:01 2009
@@ -18,16 +18,26 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.queue;
+package org.apache.qpid.server.message;
 
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.EncodingUtils;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.AMQException;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
 
 /**
  * Encapsulates a publish body and a content header. In the context of the message store these are treated as a
  * single unit.
  */
-public class MessageMetaData
+public class MessageMetaData implements StorableMessageMetaData
 {
     private MessagePublishInfo _messagePublishInfo;
 
@@ -36,6 +46,9 @@
     private int _contentChunkCount;
 
     private long _arrivalTime;
+    private static final byte MANDATORY_FLAG = 1;
+    private static final byte IMMEDIATE_FLAG = 2;
+    public static final MessageMetaDataType.Factory<MessageMetaData> FACTORY = new MetaDataFactory();
 
     public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
     {
@@ -89,4 +102,208 @@
     {
         _arrivalTime = arrivalTime;
     }
+
+    public MessageMetaDataType getType()
+    {
+        return MessageMetaDataType.META_DATA_0_8;
+    }
+
+    public int getStorableSize()
+    {
+        BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.properties);
+        int size = _contentHeaderBody.getSize();
+        size += 4;
+        size += EncodingUtils.encodedShortStringLength(_messagePublishInfo.getExchange());
+        size += EncodingUtils.encodedShortStringLength(_messagePublishInfo.getRoutingKey());
+        size += 1; // flags for immediate/mandatory
+        size += EncodingUtils.encodedLongLength();
+
+        return size;
+    }
+
+    public int writeToBuffer(int offset, ByteBuffer dest)
+    {
+        ByteBuffer src = ByteBuffer.allocate((int)getStorableSize());
+
+        org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(src);
+        EncodingUtils.writeInteger(minaSrc, _contentHeaderBody.getSize());
+        _contentHeaderBody.writePayload(minaSrc);
+        EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getExchange());
+        EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getRoutingKey());
+        byte flags = 0;
+        if(_messagePublishInfo.isMandatory())
+        {
+            flags |= MANDATORY_FLAG;
+        }
+        if(_messagePublishInfo.isImmediate())
+        {
+            flags |= IMMEDIATE_FLAG;
+        }
+        EncodingUtils.writeByte(minaSrc, flags);
+        EncodingUtils.writeLong(minaSrc,_arrivalTime);
+        src.position(minaSrc.position());
+        src.flip();
+        src.position(offset);
+        src = src.slice();
+        if(dest.remaining() < src.limit())
+        {
+            src.limit(dest.remaining());
+        }
+        dest.put(src);
+
+
+        return src.limit();
+    }
+
+    public int getContentSize()
+    {
+        return (int) _contentHeaderBody.bodySize;
+    }
+
+    public boolean isPersistent()
+    {
+        BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.properties);
+        return properties.getDeliveryMode() ==  BasicContentHeaderProperties.PERSISTENT;
+    }
+
+    private static class MetaDataFactory implements MessageMetaDataType.Factory
+    {
+
+
+        public MessageMetaData createMetaData(ByteBuffer buf)
+        {
+            try
+            {
+                org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(buf);
+                int size = EncodingUtils.readInteger(minaSrc);
+                ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(minaSrc, size);
+                final AMQShortString exchange = EncodingUtils.readAMQShortString(minaSrc);
+                final AMQShortString routingKey = EncodingUtils.readAMQShortString(minaSrc);
+
+                final byte flags = EncodingUtils.readByte(minaSrc);
+                long arrivalTime = EncodingUtils.readLong(minaSrc);
+
+                MessagePublishInfo publishBody =
+                        new MessagePublishInfo()
+                        {
+
+                            public AMQShortString getExchange()
+                            {
+                                return exchange;
+                            }
+
+                            public void setExchange(AMQShortString exchange)
+                            {
+                            }
+
+                            public boolean isImmediate()
+                            {
+                                return (flags & IMMEDIATE_FLAG) != 0;
+                            }
+
+                            public boolean isMandatory()
+                            {
+                                return (flags & MANDATORY_FLAG) != 0;
+                            }
+
+                            public AMQShortString getRoutingKey()
+                            {
+                                return routingKey;
+                            }
+                        };
+                return new MessageMetaData(publishBody, chb, 0, arrivalTime);
+            }
+            catch (AMQException e)
+            {
+                throw new RuntimeException(e);
+            }
+
+        }
+    };
+
+    public AMQMessageHeader getMessageHeader()
+    {
+        return new MessageHeaderAdapter();
+    }
+
+    private final class MessageHeaderAdapter implements AMQMessageHeader
+    {
+        private BasicContentHeaderProperties getProperties()
+        {
+            return (BasicContentHeaderProperties) getContentHeaderBody().properties;
+        }
+
+        public String getCorrelationId()
+        {
+            return getProperties().getCorrelationIdAsString();
+        }
+
+        public long getExpiration()
+        {
+            return getProperties().getExpiration();
+        }
+
+        public String getMessageId()
+        {
+            return getProperties().getMessageIdAsString();
+        }
+
+        public String getMimeType()
+        {
+            return getProperties().getContentTypeAsString();
+        }
+
+        public String getEncoding()
+        {
+            return getProperties().getEncodingAsString();
+        }
+
+        public byte getPriority()
+        {
+            return getProperties().getPriority();
+        }
+
+        public long getTimestamp()
+        {
+            return getProperties().getTimestamp();
+        }
+
+        public String getType()
+        {
+            return getProperties().getTypeAsString();
+        }
+
+        public String getReplyTo()
+        {
+            return getProperties().getReplyToAsString();
+        }
+
+        public Object getHeader(String name)
+        {
+            FieldTable ft = getProperties().getHeaders();
+            return ft.get(name);
+        }
+
+        public boolean containsHeaders(Set<String> names)
+        {
+            FieldTable ft = getProperties().getHeaders();
+            for(String name : names)
+            {
+                if(!ft.containsKey(name))
+                {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        public boolean containsHeader(String name)
+        {
+            FieldTable ft = getProperties().getHeaders();
+            return ft.containsKey(name);
+        }
+
+
+
+    }
 }

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,242 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.transport.codec.BBDecoder;
+
+import java.nio.ByteBuffer;
+import java.lang.ref.WeakReference;
+
+public class MessageMetaData_0_10 implements StorableMessageMetaData
+{
+    private Header _header;
+    private DeliveryProperties _deliveryProps;
+    private MessageProperties _messageProps;
+    private MessageTransferHeader _messageHeader;
+    private long _arrivalTime;
+    private int _bodySize;
+    private volatile WeakReference<ByteBuffer> _body;
+
+    private static final int ENCODER_SIZE = 1 << 16;
+
+    public static final MessageMetaDataType.Factory<MessageMetaData_0_10> FACTORY = new MetaDataFactory();
+
+    private volatile ByteBuffer _encoded;
+
+
+    public MessageMetaData_0_10(MessageTransfer xfr)
+    {
+        this(xfr.getHeader(), xfr.getBodySize(), xfr.getBody(), System.currentTimeMillis());
+    }
+
+    private MessageMetaData_0_10(Header header, int bodySize, long arrivalTime)
+    {
+        this(header, bodySize, null, arrivalTime);
+    }
+
+    private MessageMetaData_0_10(Header header, int bodySize, ByteBuffer xfrBody, long arrivalTime)
+    {
+        _header = header;
+        if(_header != null)
+        {
+            _deliveryProps = _header.get(DeliveryProperties.class);
+            _messageProps = _header.get(MessageProperties.class);
+        }
+        else
+        {
+            _deliveryProps = null;
+            _messageProps = null;
+        }
+        _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps);
+        _arrivalTime = arrivalTime;
+        _bodySize = bodySize;
+
+
+
+        if(xfrBody == null)
+        {
+            _body = null;
+        }
+        else
+        {
+            ByteBuffer body = ByteBuffer.allocate(_bodySize);
+            body.put(xfrBody);
+            body.flip();
+            _body = new WeakReference(body);
+        }
+
+
+    }
+
+
+
+    public MessageMetaDataType getType()
+    {
+        return MessageMetaDataType.META_DATA_0_10;
+    }
+
+    public int getStorableSize()
+    {
+        ByteBuffer buf = _encoded;
+
+        if(buf == null)
+        {
+            buf = encodeAsBuffer();
+            _encoded = buf;
+        }
+
+        //TODO -- need to add stuff
+        return buf.limit();
+    }
+
+    private ByteBuffer encodeAsBuffer()
+    {
+        BBEncoder encoder = new BBEncoder(ENCODER_SIZE);
+
+        encoder.writeInt64(_arrivalTime);
+        encoder.writeInt32(_bodySize);
+        Struct[] headers = _header == null ? new Struct[0] : _header.getStructs();
+        encoder.writeInt32(headers.length);
+
+
+        for(Struct header : headers)
+        {
+            encoder.writeStruct32(header);
+
+        }
+
+        ByteBuffer buf = encoder.buffer();
+        return buf;
+    }
+
+    public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
+    {
+        ByteBuffer buf = _encoded;
+
+        if(buf == null)
+        {
+            buf = encodeAsBuffer();
+            _encoded = buf;
+        }
+
+        buf = buf.duplicate();
+
+        buf.position(offsetInMetaData);
+
+        if(dest.remaining() < buf.limit())
+        {
+            buf.limit(dest.remaining());
+        }
+        dest.put(buf);
+        return buf.limit();
+    }
+
+    public int getContentSize()
+    {
+        return _bodySize;
+    }
+
+    public boolean isPersistent()
+    {
+        return _deliveryProps == null ? false : _deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT;
+    }
+
+    public String getRoutingKey()
+    {
+        return _deliveryProps == null ? null : _deliveryProps.getRoutingKey();
+    }
+
+    public AMQMessageHeader getMessageHeader()
+    {
+        return _messageHeader;
+    }
+
+    public long getSize()
+    {
+
+        return _bodySize;
+    }
+
+    public boolean isImmediate()
+    {
+        return _deliveryProps != null && _deliveryProps.getImmediate();
+    }
+
+    public long getExpiration()
+    {
+        return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
+    }
+
+    public long getArrivalTime()
+    {
+        return _arrivalTime;
+    }
+
+    public Header getHeader()
+    {
+        return _header;
+    }
+
+    public ByteBuffer getBody()
+    {
+        ByteBuffer body = _body == null ? null : _body.get();
+        return body;
+    }
+
+    public void setBody(ByteBuffer body)
+    {
+        _body = new WeakReference(body);
+    }
+
+    private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10>
+    {
+        public MessageMetaData_0_10 createMetaData(ByteBuffer buf)
+        {
+            BBDecoder decoder = new BBDecoder();
+            decoder.init(buf);
+
+            long arrivalTime = decoder.readInt64();
+            int bodySize = decoder.readInt32();
+            int headerCount = decoder.readInt32();
+
+            Struct[] headers = new Struct[headerCount];
+
+            for(int i = 0 ; i < headerCount; i++)
+            {
+                headers[i] = decoder.readStruct32();
+            }
+
+            Header header = new Header(headers);
+
+            return new MessageMetaData_0_10(header, bodySize, arrivalTime);
+
+        }
+    }
+}

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java Tue Oct 20 16:23:01 2009
@@ -57,6 +57,16 @@
         return _messageProps == null ? null : String.valueOf(_messageProps.getMessageId());
     }
 
+    public String getMimeType()
+    {
+        return _messageProps == null ? null : _messageProps.getContentType();
+    }
+
+    public String getEncoding()
+    {
+        return _messageProps == null ? null : _messageProps.getContentEncoding();
+    }
+
     public byte getPriority()
     {
         MessageDeliveryPriority priority = _deliveryProps == null

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java Tue Oct 20 16:23:01 2009
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.message;
 
 import org.apache.qpid.transport.*;
+import org.apache.qpid.server.store.StoredMessage;
 
 import java.util.concurrent.atomic.AtomicLong;
 import java.nio.ByteBuffer;
@@ -29,76 +30,65 @@
 
 public class MessageTransferMessage implements InboundMessage, ServerMessage
 {
-    private static final AtomicLong _numberSource = new AtomicLong(0L);
 
-    private final MessageTransfer _xfr;
-    private final DeliveryProperties _deliveryProps;
-    private final MessageProperties _messageProps;
-    private final AMQMessageHeader _messageHeader;
-    private final long _messageNumber;
-    private final long _arrivalTime;
+
+    private StoredMessage<MessageMetaData_0_10> _storeMessage;
+
+
     private WeakReference<Session> _sessionRef;
 
-    public MessageTransferMessage(MessageTransfer xfr, WeakReference<Session> sessionRef)
-    {
-        this(_numberSource.getAndIncrement(), xfr, sessionRef);
-    }
 
-    public MessageTransferMessage(long messageNumber, MessageTransfer xfr, WeakReference<Session> sessionRef)
+    public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef)
     {
 
-        _xfr = xfr;
-        _messageNumber = messageNumber;
-        Header header = _xfr.getHeader();
-        if(header != null)
-        {
-            _deliveryProps = header.get(DeliveryProperties.class);
-            _messageProps = header.get(MessageProperties.class);
-        }
-        else
-        {
-            _deliveryProps = null;
-            _messageProps = null;
-        }
-        _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps);
-        _arrivalTime = System.currentTimeMillis();
+        _storeMessage = storeMessage;
         _sessionRef = sessionRef;
+
+    }
+
+    private MessageMetaData_0_10 getMetaData()
+    {
+        return _storeMessage.getMetaData();
     }
 
     public String getRoutingKey()
     {
-        return _deliveryProps == null ? null : _deliveryProps.getRoutingKey();
+        return getMetaData().getRoutingKey();
+
     }
 
     public AMQMessageHeader getMessageHeader()
     {
-        return _messageHeader;
+        return getMetaData().getMessageHeader();
     }
 
     public boolean isPersistent()
     {
-        return (_deliveryProps != null) && (_deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT);
+        return getMetaData().isPersistent();
     }
 
+
     public boolean isRedelivered()
     {
+        // The *Message* is never redelivered, only queue entries are... this is here so that filters
+        // can run against the message on entry to an exchange
         return false;
     }
 
     public long getSize()
     {
 
-        return _xfr.getBodySize();
+        return getMetaData().getSize();
     }
 
     public boolean isImmediate()
     {
-        return _deliveryProps != null && _deliveryProps.getImmediate();
+        return getMetaData().isImmediate();
     }
 
     public long getExpiration()
     {
-        return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
+        return getMetaData().getExpiration();
     }
 
     public MessageReference newReference()
@@ -108,23 +98,43 @@
 
     public Long getMessageNumber()
     {
-        return _messageNumber;
+        return _storeMessage.getMessageNumber();
     }
 
     public long getArrivalTime()
     {
-        return _arrivalTime;
+        return getMetaData().getArrivalTime();
     }
 
-    public Header getHeader()
+    public int getContent(ByteBuffer buf, int offset)
     {
-        return _xfr.getHeader();
+        return _storeMessage.getContent(offset, buf);
+    }
 
+    public Header getHeader()
+    {
+        return getMetaData().getHeader();
     }
 
     public ByteBuffer getBody()
     {
-        return _xfr.getBody();
+        ByteBuffer body = getMetaData().getBody();
+        if(body == null)
+        {
+            final int size = (int) getSize();
+            int pos = 0;
+            body = ByteBuffer.allocate(size);
+
+            while(pos < size)
+            {
+                pos += getContent(body, pos);
+            }
+
+            body.flip();
+
+            getMetaData().setBody(body.duplicate());
+        }
+        return body;
     }
 
     public Session getSession()
@@ -132,4 +142,5 @@
         return _sessionRef == null ? null : _sessionRef.get();
     }
 
+    
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java Tue Oct 20 16:23:01 2009
@@ -20,7 +20,9 @@
  */
 package org.apache.qpid.server.message;
 
-public interface ServerMessage extends EnqueableMessage
+import java.nio.ByteBuffer;
+
+public interface ServerMessage extends EnqueableMessage, MessageContentSource
 {
     String getRoutingKey();
 
@@ -39,4 +41,7 @@
     Long getMessageNumber();
 
     long getArrivalTime();
+
+    public int getContent(ByteBuffer buf, int offset);
+
 }

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,124 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.output;
+
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.AMQPInvalidClassException;
+
+import java.util.Map;
+
+public class HeaderPropertiesConverter
+{
+
+    public static BasicContentHeaderProperties convert(MessageTransferMessage messageTransferMessage)
+    {
+        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+        Header header = messageTransferMessage.getHeader();
+        DeliveryProperties deliveryProps = header.get(DeliveryProperties.class);
+        MessageProperties messageProps = header.get(MessageProperties.class);
+
+        if(deliveryProps != null)
+        {
+            if(deliveryProps.hasDeliveryMode())
+            {
+                props.setDeliveryMode((byte)(deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT));
+            }
+            if(deliveryProps.hasExpiration())
+            {
+                props.setExpiration(deliveryProps.getExpiration());
+            }
+            if(deliveryProps.hasPriority())
+            {
+                props.setPriority((byte)deliveryProps.getPriority().getValue());
+            }
+            if(deliveryProps.hasTimestamp())
+            {
+                props.setTimestamp(deliveryProps.getTimestamp());
+            }
+        }
+        if(messageProps != null)
+        {
+            if(messageProps.hasAppId())
+            {
+                props.setAppId(new AMQShortString(messageProps.getAppId()));
+            }
+            if(messageProps.hasContentType())
+            {
+                props.setContentType(messageProps.getContentType());
+            }
+            if(messageProps.hasCorrelationId())
+            {
+                props.setCorrelationId(new AMQShortString(messageProps.getCorrelationId()));
+            }
+            if(messageProps.hasContentEncoding())
+            {
+                props.setEncoding(messageProps.getContentEncoding());
+            }
+            if(messageProps.hasMessageId())
+            {
+                props.setMessageId(messageProps.getMessageId().toString());
+            }
+
+            // TODO Reply-to
+
+            if(messageProps.hasUserId())
+            {
+                props.setUserId(new AMQShortString(messageProps.getUserId()));
+            }
+
+            if(messageProps.hasApplicationHeaders())
+            {
+                Map<String, Object> appHeaders = messageProps.getApplicationHeaders();
+                FieldTable ft = new FieldTable();
+                for(Map.Entry<String, Object> entry : appHeaders.entrySet())
+                {
+                    try
+                    {
+                        ft.put(new AMQShortString(entry.getKey()), entry.getValue());
+                    }
+                    catch(AMQPInvalidClassException e)
+                    {
+                        // TODO
+                        // log here, but ignore - just can;t convert
+                    }
+                }
+                props.setHeaders(ft);
+
+            }
+        }
+
+
+
+
+
+
+
+        return props;
+    }
+}

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java Tue Oct 20 16:23:01 2009
@@ -26,17 +26,15 @@
  */
 package org.apache.qpid.server.output;
 
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.MessageContentSource;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.AMQException;
 
-import java.util.Iterator;
-
 public interface ProtocolOutputConverter
 {
     void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag);
@@ -55,7 +53,7 @@
 
     byte getProtocolMajorVersion();
 
-    void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, Iterator<AMQDataBlock> bodyFrameIterator,  int channelId, int replyCode, AMQShortString replyText)
+    void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource msgContent,  int channelId, int replyCode, AMQShortString replyText)
                     throws AMQException;
 
     void writeFrame(AMQDataBlock block);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org