You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/20 18:23:11 UTC
svn commit: r827724 [2/8] - in /qpid/branches/java-broker-0-10/qpid/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/src/main/java/org/apach...
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java Tue Oct 20 16:23:01 2009
@@ -20,8 +20,6 @@
// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
//
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.Filterable;
/**
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java Tue Oct 20 16:23:01 2009
@@ -14,14 +14,12 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.filter;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.Filterable;
public interface MessageFilter
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java Tue Oct 20 16:23:01 2009
@@ -22,7 +22,6 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.Filterable;
import java.lang.reflect.Constructor;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java Tue Oct 20 16:23:01 2009
@@ -18,7 +18,6 @@
package org.apache.qpid.server.filter;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.Filterable;
//
@@ -43,16 +42,16 @@
public String toString() {
return "XQUERY "+ConstantExpression.encodeString(xpath);
}
-
+
/**
* @param message
* @return true if the expression evaluates to Boolean.TRUE.
* @throws AMQException
*/
- public boolean matches(Filterable message)
+ public boolean matches(Filterable message)
{
Object object = evaluate(message);
- return object!=null && object==Boolean.TRUE;
+ return object!=null && object==Boolean.TRUE;
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java Tue Oct 20 16:23:01 2009
@@ -27,8 +27,6 @@
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.Filterable;
import org.apache.xpath.CachedXPathAPI;
import org.w3c.dom.Document;
@@ -36,14 +34,14 @@
import org.xml.sax.InputSource;
public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator {
-
+
private final String xpath;
public XalanXPathEvaluator(String xpath) {
this.xpath = xpath;
}
-
- public boolean evaluate(Filterable m)
+
+ public boolean evaluate(Filterable m)
{
// TODO - we would have to check the content type and then evaluate the content
// here... is this really a feature we wish to implement? - RobG
@@ -65,18 +63,18 @@
private boolean evaluate(byte[] data) {
try {
-
+
InputSource inputSource = new InputSource(new ByteArrayInputStream(data));
-
+
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(true);
DocumentBuilder dbuilder = factory.newDocumentBuilder();
Document doc = dbuilder.parse(inputSource);
-
+
CachedXPathAPI cachedXPathAPI = new CachedXPathAPI();
NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath);
return iterator.nextNode()!=null;
-
+
} catch (Throwable e) {
return false;
}
@@ -85,12 +83,12 @@
private boolean evaluate(String text) {
try {
InputSource inputSource = new InputSource(new StringReader(text));
-
+
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(true);
DocumentBuilder dbuilder = factory.newDocumentBuilder();
Document doc = dbuilder.parse(inputSource);
-
+
// We should associated the cachedXPathAPI object with the message being evaluated
// since that should speedup subsequent xpath expressions.
CachedXPathAPI cachedXPathAPI = new CachedXPathAPI();
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java Tue Oct 20 16:23:01 2009
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.message.ServerMessage;
/*
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Oct 20 16:23:01 2009
@@ -25,7 +25,6 @@
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.Permission;
@@ -116,17 +115,31 @@
try
{
- AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
- body.getArguments(), body.getNoLocal(), body.getExclusive());
- if (!body.getNowait())
+ if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
- session.writeFrame(responseBody.generateFrame(channelId));
+ AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
+ body.getArguments(), body.getNoLocal(), body.getExclusive());
+ if (!body.getNowait())
+ {
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
+ session.writeFrame(responseBody.generateFrame(channelId));
+
+ }
+ }
+ else
+ {
+ AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg, // replytext
+ body.getClazz(),
+ body.getMethod());
+ session.writeFrame(responseBody.generateFrame(0));
}
-
}
catch (org.apache.qpid.AMQInvalidArgumentException ise)
{
@@ -141,17 +154,6 @@
}
- catch (ConsumerTagNotUniqueException e)
- {
- AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
-
- MethodRegistry methodRegistry = session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- msg, // replytext
- body.getClazz(),
- body.getMethod());
- session.writeFrame(responseBody.generateFrame(0));
- }
catch (AMQQueue.ExistingExclusiveSubscription e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Tue Oct 20 16:23:01 2009
@@ -30,6 +30,7 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.MessageOnlyCreditManager;
import org.apache.qpid.server.subscription.SubscriptionImpl;
@@ -40,9 +41,6 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Tue Oct 20 16:23:01 2009
@@ -24,7 +24,6 @@
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -96,7 +95,7 @@
session.writeFrame(responseBody.generateFrame(channelId));
-
+
}
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Tue Oct 20 16:23:01 2009
@@ -50,5 +50,6 @@
}
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
session.initHeartbeats(body.getHeartbeat());
+ session.setMaxFrameSize(body.getFrameMax());
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -92,11 +92,11 @@
try
{
- exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(),
- body.getType() == null ? null : body.getType().intern(),
- body.getDurable(),
- body.getPassive(), body.getTicket());
- exchangeRegistry.registerExchange(exchange);
+ exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(),
+ body.getType() == null ? null : body.getType().intern(),
+ body.getDurable(),
+ body.getPassive(), body.getTicket());
+ exchangeRegistry.registerExchange(exchange);
}
catch(AMQUnknownExchangeType e)
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Tue Oct 20 16:23:01 2009
@@ -40,7 +40,6 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties Tue Oct 20 16:23:01 2009
@@ -61,13 +61,32 @@
# 0 - path
MST-1002 = Store location : {0}
MST-1003 = Closed
+MST-1004 = Recovery Start
+MST-1005 = Recovered {0,number} messages
+MST-1006 = Recovery Complete
+
+#ConfigStore
+# 0 - name
+CFG-1001 = Created : {0}
+# 0 - path
+CFG-1002 = Store location : {0}
+CFG-1003 = Closed
+CFG-1004 = Recovery Start
+CFG-1005 = Recovery Complete
+
+#TransactionLog
+# 0 - name
+TXN-1001 = Created : {0}
+# 0 - path
+TXN-1002 = Store location : {0}
+TXN-1003 = Closed
# 0 - queue name
-MST-1004 = Recovery Start[ : {0}]
+TXN-1004 = Recovery Start[ : {0}]
# 0 - count
# 1 - queue count
-MST-1005 = Recovered {0,number} messages for queue {1}
+TXN-1005 = Recovered {0,number} messages for queue {1}
# 0 - queue name
-MST-1006 = Recovery Complete[ : {0}]
+TXN-1006 = Recovery Complete[ : {0}]
#Connection
# 0 - Client id
@@ -83,12 +102,18 @@
# 0 - bytes allowed in prefetch
# 1 - number of messagse.
CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number}
+# 0 - queue causing flow control
+CHN-1005 = Flow Control Enforced (Queue {0})
+CHN-1006 = Flow Control Removed
#Queue
# 0 - owner
# 1 - priority
QUE-1001 = Create :[ Owner: {0}][ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}]
QUE-1002 = Deleted
+QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number}
+QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
+
#Exchange
# 0 - type
@@ -104,4 +129,4 @@
SUB-1001 = Create[ : Durable][ : Arguments : {0}]
SUB-1002 = Close
# 0 - The current subscription state
-SUB-1003 = State : {0}
\ No newline at end of file
+SUB-1003 = State : {0}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties Tue Oct 20 16:23:01 2009
@@ -16,173 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
-# LogMessages used within the Java Broker as originally defined on the wiki:
-#
-# http://cwiki.apache.org/confluence/display/qpid/Status+Update+Design#StatusUpdateDesign-InitialStatusMessages
-#
-# Technical Notes:
-# This is a standard Java Properties file so white space is respected at the
-# end of the lines. This file is processed in a number of ways.
-# 1) ResourceBundle
-# This file is loaded through a ResourceBundle named LogMessages. the en_US
-# addition to the file is the localisation. Additional localisations can be
-# provided and will automatically be selected based on the <locale> value in
-# the config.xml. The default is en_US.
-#
-# 2) MessasgeFormat
-# Each entry is prepared with the Java Core MessageFormat methods. Therefore
-# most functionality you can do via MessageFormat can be done here:
-#
-# http://java.sun.com/javase/6/docs/api/java/text/MessageFormat.html
-#
-# The cavet here is that only default String and number FormatTypes can be used.
-# This is due to the processing described in 3 below. If support for date, time
-# or choice is requried then the GenerateLogMessages class should be updated to
-# provide support.
-#
-# Format Note:
-# As mentioned earlier white space in this file is very important. One thing
-# in particular to note is the way MessageFormat peforms its replacements.
-# The replacement text will totally replace the {xxx} section so there will be
-# no addtion of white space or removal e.g.
-# MSG = Text----{0}----
-# When given parameter 'Hello' result in text:
-# Text----Hello----
-#
-# For simple arguments this is expected however when using Style formats then
-# it can be a little unexepcted. In particular a common pattern is used for
-# number replacements : {0,number,#}. This is used in the Broker to display an
-# Integer simply as the Integer with no formating. e.g new Integer(1234567)
-# becomes the String "1234567" which is can be contrasted with the pattern
-# without a style format field : {0,number} which becomes string "1,234,567".
-#
-# What you may not expect is that {0,number, #} would produce the String " 1234567"
-# note the space after the ',' here /\ has resulted in a space /\ in
-# the output.
-#
-# More details on the SubformatPattern can be found on the API link above.
-#
-# 3) GenerateLogMessage/Velocity Macro
-# This is the first and final stage of processing that this file goes through.
-# 1) Class Generation:
-# The GenerateLogMessage processes this file and uses the velocity Macro
-# to create classes with static methods to perform the logging and give us
-# compile time validation.
-#
-# 2) Property Processing:
-# During the class generation the message properties ({x}) are identified
-# and used to create the method signature.
-#
-# 3) Option Processing:
-# The Classes perform final formatting of the messages at runtime based on
-# optional parameters that are defined within the message. Optional
-# paramters are enclosed in square brackets e.g. [optional].
-#
-# To provide fixed log messages as required by the Technical Specification:
-# http://cwiki.apache.org/confluence/display/qpid/Operational+Logging+-+Status+Update+-+Technical+Specification#OperationalLogging-StatusUpdate-TechnicalSpecification-Howtoprovidefixedlogmessages
-#
-# This file is processed by Velocity to create a number of classes that contain
-# static methods that provide LogMessages in the code to provide compile time
-# validation.
-#
-# For details of what processing is done see GenerateLogMessages.
-#
-# What a localiser or developer need know is the following:
-#
-# The Property structure is important is it defines how the class and methods
-# will be built.
-#
-# Class Generation:
-# =================
-#
-# Each class of messages will be split in to their own <Class>Messages.java
-# Currently the following classes are created and are populated with the
-# messages that bear their 3-digit type identifier:
-#
-# Class | Type
-# ---------------------|--------
-# Broker | BKR
-# ManagementConsole | MNG
-# VirtualHost | VHT
-# MessageStore | MST
-# Connection | CON
-# Channel | CHN
-# Queue | QUE
-# Exchange | EXH
-# Binding | BND
-# Subscription | SUB
-#
-# Property Processing:
-# ====================
-#
-# Each property is then processed by the GenerateLogMessages class to identify
-# The number and type of parameters, {x} entries. Parameters are defaulted to
-# String types but the use of FormatType number (e.g.{0,number}) will result
-# in a Number type being used. These parameters are then used to build the
-# method parameter list. e.g:
-# Property:
-# BRK-1003 = Shuting down : {0} port {1,number,#}
-# becomes Method:
-# public static LogMessage BRK_1003(String param1, Number param2)
-#
-# This improves our compile time validation of log message content and
-# ensures that change in the message format does not accidentally cause
-# erroneous messages.
-#
-# Option Processing:
-# ====================
-#
-# Options are identified in the log message as being surrounded by square
-# brackets ([ ]). These optional values can themselves contain paramters
-# however nesting of options is not permitted. Identification is performed on
-# first matchings so give the message:
-# Msg = Log Message [option1] [option2]
-# Two options will be identifed and enabled to select text 'option1 and
-# 'option2'.
-#
-# The nesting of a options is not supported and will provide
-# unexpected results. e.g. Using Message:
-# Msg = Log Message [option1 [sub-option2]]
-#
-# The options will be 'option1 [sub-option2' and 'sub-option2'. The first
-# option includes the second option as the nesting is not detected.
-#
-# The detected options are presented in the method signature as boolean options
-# numerically identified by their position in the message. e.g.
-# Property:
-# CON-1001 = Open : Client ID {0} [: Protocol Version : {1}]
-# becomes Method:
-# public static LogMessage CON_1001(String param1, String param2, boolean opt1)
-#
-# The value of 'opt1' will show/hide the option in the message. Note that
-# 'param2' is still required however a null value can be used if the optional
-# section is not desired.
-#
-# Again here the importance of white space needs to be highlighted.
-# Looking at the QUE-1001 message as an example. The first thought on how this
-# would look would be as follows:
-# QUE-1001 = Create : Owner: {0} [AutoDelete] [Durable] [Transient] [Priority: {1,number,#}]
-# Each option is correctly defined so the text that is defined will appear when
-# selected. e.g. 'AutoDelete'. However, what may not be immediately apparent is
-# the white space. Using the above definition of QUE-1001 if we were to print
-# the message with only the Priority option displayed it would appear as this:
-# "Create : Owner: guest Priority: 1"
-# Note the spaces here /\ This is because only the text between the brackets
-# has been removed.
-#
-# Each option needs to include white space to correctly format the message. So
-# the correct definition of QUE-1001 is as follows:
-# QUE-1001 = Create : Owner: {0}[ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}]
-# Note that white space is included with each option and there is no extra
-# white space between the options. As a result the output with just Priority
-# enabled is as follows:
-# "Create : Owner: guest Priority: 1"
-#
-# The final processing that is done in the generation is the conversion of the
-# property name. As a '-' is an illegal character in the method name it is
-# converted to '_' This processing gives the final method signature as follows:
-# <Class>Message.<Type>_<Number>(<parmaters>,<options>)
-#
+# Default File used for all non-defined locales.
#Broker
# 0 - Version
# 1 = Build
@@ -227,13 +61,32 @@
# 0 - path
MST-1002 = Store location : {0}
MST-1003 = Closed
+MST-1004 = Recovery Start
+MST-1005 = Recovered {0,number} messages
+MST-1006 = Recovery Complete
+
+#ConfigStore
+# 0 - name
+CFG-1001 = Created : {0}
+# 0 - path
+CFG-1002 = Store location : {0}
+CFG-1003 = Closed
+CFG-1004 = Recovery Start
+CFG-1005 = Recovery Complete
+
+#TransactionLog
+# 0 - name
+TXN-1001 = Created : {0}
+# 0 - path
+TXN-1002 = Store location : {0}
+TXN-1003 = Closed
# 0 - queue name
-MST-1004 = Recovery Start[ : {0}]
+TXN-1004 = Recovery Start[ : {0}]
# 0 - count
# 1 - queue count
-MST-1005 = Recovered {0,number} messages for queue {1}
+TXN-1005 = Recovered {0,number} messages for queue {1}
# 0 - queue name
-MST-1006 = Recovery Complete[ : {0}]
+TXN-1006 = Recovery Complete[ : {0}]
#Connection
# 0 - Client id
@@ -247,8 +100,9 @@
CHN-1002 = Flow {0}
CHN-1003 = Close
# 0 - bytes allowed in prefetch
-# 1 - number of messagse.
+# 1 - number of messagse.
CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number}
+# 0 - queue causing flow control
CHN-1005 = Flow Control Enforced (Queue {0})
CHN-1006 = Flow Control Removed
@@ -260,6 +114,7 @@
QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number}
QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
+
#Exchange
# 0 - type
# 1 - name
@@ -273,4 +128,5 @@
#Subscription
SUB-1001 = Create[ : Durable][ : Arguments : {0}]
SUB-1002 = Close
+# 0 - The current subscription state
SUB-1003 = State : {0}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -65,16 +65,9 @@
return null;
}
- public void register() throws AMQException
+ public void register() throws JMException
{
- try
- {
- getManagedObjectRegistry().registerObject(this);
- }
- catch (JMException e)
- {
- throw new AMQException("Error registering managed object " + this + ": " + e, e);
- }
+ getManagedObjectRegistry().registerObject(this);
}
protected ManagedObjectRegistry getManagedObjectRegistry()
@@ -98,7 +91,7 @@
{
return getObjectInstanceName() + "[" + getType() + "]";
}
-
+
/**
* Created the ObjectName as per the JMX Specs
@@ -140,7 +133,7 @@
objectName.append(",");
objectName.append("version=").append(_version);
-
+
return new ObjectName(objectName.toString());
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java Tue Oct 20 16:23:01 2009
@@ -54,6 +54,7 @@
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.UnknownHostException;
import java.rmi.AlreadyBoundException;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
@@ -236,8 +237,17 @@
* The registry is exported on the defined management port 'port'. We will export the RMIConnectorServer
* on 'port +1'. Use of these two well-defined ports will ease any navigation through firewall's.
*/
- final RMIServerImpl rmiConnectorServerStub = new RMIJRMPServerImpl(port+PORT_EXPORT_OFFSET, csf, ssf, env);
- final String hostname = InetAddress.getLocalHost().getHostName();
+ final RMIServerImpl rmiConnectorServerStub = new RMIJRMPServerImpl(port+PORT_EXPORT_OFFSET, csf, ssf, env);
+ String localHost;
+ try
+ {
+ localHost = InetAddress.getLocalHost().getHostName();
+ }
+ catch(UnknownHostException ex)
+ {
+ localHost="127.0.0.1";
+ }
+ final String hostname = localHost;
final JMXServiceURL externalUrl = new JMXServiceURL(
"service:jmx:rmi://"+hostname+":"+(port+PORT_EXPORT_OFFSET)+"/jndi/rmi://"+hostname+":"+port+"/jmxrmi");
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import javax.management.JMException;
import org.apache.qpid.AMQException;
@@ -45,7 +46,7 @@
ManagedObject getParentObject();
- void register() throws AMQException;
+ void register() throws AMQException, JMException;
void unregister() throws AMQException;
Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java (from r824494, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java&r1=824494&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java Tue Oct 20 16:23:01 2009
@@ -18,21 +18,18 @@
* under the License.
*
*/
-package org.apache.qpid.server.queue;
+package org.apache.qpid.server.message;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.message.*;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.queue.AMQQueue;
-import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
+import java.nio.ByteBuffer;
/**
* A deliverable message.
@@ -44,8 +41,6 @@
private final AtomicInteger _referenceCount = new AtomicInteger(0);
- private final AMQMessageHandle _messageHandle;
-
/** Flag to indicate that this message requires 'immediate' delivery. */
private static final byte IMMEDIATE = 0x01;
@@ -65,73 +60,21 @@
private Object _sessionIdentifier;
private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
- private final AMQMessageHeader _messageHeader;
-
-
- /**
- * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
- * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
- * queues.
- *
- * @param messageId
- * @param store
- * @param factory
- *
- * @throws AMQException
- */
- public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory)
- throws AMQException
- {
- _messageHandle = factory.createMessageHandle(messageId, store, true);
- _size = _messageHandle.getBodySize();
- _messageHeader = new ContentHeaderBodyAdapter(_messageHandle.getContentHeaderBody());
- }
- /**
- * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
- * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
- * queues.
- *
- * @param messageHandle
- *
- * @throws AMQException
- */
- public AMQMessage(
- AMQMessageHandle messageHandle,
- MessagePublishInfo info)
- throws AMQException
- {
- this(messageHandle, messageHandle.getContentHeaderBody(), messageHandle.getBodySize(), info);
- }
+ private final StoredMessage<MessageMetaData> _handle;
- public AMQMessage(
- AMQMessageHandle messageHandle,
- ContentHeaderBody chb,
- long size,
- MessagePublishInfo info)
- throws AMQException
+ public AMQMessage(StoredMessage<MessageMetaData> handle)
{
- _messageHandle = messageHandle;
-
- _messageHeader = new ContentHeaderBodyAdapter(chb);
+ _handle = handle;
+ final MessageMetaData metaData = handle.getMetaData();
+ _size = metaData.getContentSize();
+ final MessagePublishInfo messagePublishInfo = metaData.getMessagePublishInfo();
- if(info.isImmediate())
+ if(messagePublishInfo.isImmediate())
{
_flags |= IMMEDIATE;
}
- _size = size;
-
- }
-
-
- protected AMQMessage(AMQMessage msg) throws AMQException
- {
- _messageHandle = msg._messageHandle;
- _messageHeader = msg._messageHeader;
- _flags = msg._flags;
- _size = msg._size;
-
}
@@ -152,26 +95,21 @@
return _referenceCount.get() > 0;
}
- public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
- {
- return new BodyFrameIterator(protocolSession, channel, _messageHandle);
- }
-
- public Iterator<ContentChunk> getContentBodyIterator()
+ public MessageMetaData getMessageMetaData()
{
- return new BodyContentIterator(_messageHandle);
+ return _handle.getMetaData();
}
public ContentHeaderBody getContentHeaderBody() throws AMQException
{
- return _messageHandle.getContentHeaderBody();
+ return getMessageMetaData().getContentHeaderBody();
}
public Long getMessageId()
{
- return _messageHandle.getMessageId();
+ return _handle.getMessageNumber();
}
/**
@@ -211,10 +149,10 @@
* message store.
*
*
- * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
+ * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
*/
- public void decrementReference() throws MessageCleanupException
+ public void decrementReference()
{
int count = _referenceCount.decrementAndGet();
@@ -229,27 +167,19 @@
// by copying from other queues at the same time as it is being removed.
_referenceCount.set(Integer.MIN_VALUE/2);
- try
+ // must check if the handle is null since there may be cases where we decide to throw away a message
+ // and the handle has not yet been constructed
+ if (_handle != null)
{
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- if (_messageHandle != null && isPersistent())
- {
- _messageHandle.removeMessage();
+ _handle.remove();
- }
- }
- catch (AMQException e)
- {
-
- throw new MessageCleanupException(getMessageId(), e);
}
}
else
{
if (count < 0)
{
- throw new MessageCleanupException("Reference count for message id " + debugIdentity()
+ throw new RuntimeException("Reference count for message id " + debugIdentity()
+ " has gone below 0.");
}
}
@@ -274,12 +204,12 @@
public AMQMessageHeader getMessageHeader()
{
- return _messageHeader;
+ return getMessageMetaData().getMessageHeader();
}
public boolean isPersistent()
{
- return _messageHandle.isPersistent();
+ return getMessageMetaData().isPersistent();
}
/**
@@ -297,12 +227,12 @@
public MessagePublishInfo getMessagePublishInfo() throws AMQException
{
- return _messageHandle.getMessagePublishInfo();
+ return getMessageMetaData().getMessagePublishInfo();
}
public long getArrivalTime()
{
- return _messageHandle.getArrivalTime();
+ return getMessageMetaData().getArrivalTime();
}
/**
@@ -336,13 +266,6 @@
_flags |= DELIVERED_TO_CONSUMER;
}
-
-
- public AMQMessageHandle getMessageHandle()
- {
- return _messageHandle;
- }
-
public long getSize()
{
return _size;
@@ -394,4 +317,13 @@
return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
}
+ public int getContent(ByteBuffer buf, int offset)
+ {
+ return _handle.getContent(offset, buf);
+ }
+
+ public StoredMessage<MessageMetaData> getStoredMessage()
+ {
+ return _handle;
+ }
}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java Tue Oct 20 16:23:01 2009
@@ -30,6 +30,10 @@
String getMessageId();
+ String getMimeType();
+
+ String getEncoding();
+
byte getPriority();
long getTimestamp();
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java Tue Oct 20 16:23:01 2009
@@ -20,13 +20,9 @@
*/
package org.apache.qpid.server.message;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.MessageCleanupException;
-import javax.swing.*;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
public class AMQMessageReference extends MessageReference<AMQMessage>
{
@@ -43,22 +39,6 @@
protected void onRelease(AMQMessage message)
{
- try
- {
- if(message !=null)
- {
- message.decrementReference();
- }
- else
- {
- //TODO
- System.err.println("Shouldn't happen!!!!");
- }
- }
- catch (MessageCleanupException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
+ message.decrementReference();
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java Tue Oct 20 16:23:01 2009
@@ -55,6 +55,16 @@
return getProperties().getMessageIdAsString();
}
+ public String getMimeType()
+ {
+ return getProperties().getContentTypeAsString();
+ }
+
+ public String getEncoding()
+ {
+ return getProperties().getEncodingAsString();
+ }
+
public byte getPriority()
{
return getProperties().getPriority();
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.message;
+
+import java.nio.ByteBuffer;
+
+public interface MessageContentSource
+{
+ public int getContent(ByteBuffer buf, int offset);
+
+ long getSize();
+}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
------------------------------------------------------------------------------
svn:executable = *
Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java (from r821930, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java&r1=821930&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java Tue Oct 20 16:23:01 2009
@@ -18,16 +18,26 @@
* under the License.
*
*/
-package org.apache.qpid.server.queue;
+package org.apache.qpid.server.message;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.EncodingUtils;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.AMQException;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
/**
* Encapsulates a publish body and a content header. In the context of the message store these are treated as a
* single unit.
*/
-public class MessageMetaData
+public class MessageMetaData implements StorableMessageMetaData
{
private MessagePublishInfo _messagePublishInfo;
@@ -36,6 +46,9 @@
private int _contentChunkCount;
private long _arrivalTime;
+ private static final byte MANDATORY_FLAG = 1;
+ private static final byte IMMEDIATE_FLAG = 2;
+ public static final MessageMetaDataType.Factory<MessageMetaData> FACTORY = new MetaDataFactory();
public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
{
@@ -89,4 +102,208 @@
{
_arrivalTime = arrivalTime;
}
+
+ public MessageMetaDataType getType()
+ {
+ return MessageMetaDataType.META_DATA_0_8;
+ }
+
+ public int getStorableSize()
+ {
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.properties);
+ int size = _contentHeaderBody.getSize();
+ size += 4;
+ size += EncodingUtils.encodedShortStringLength(_messagePublishInfo.getExchange());
+ size += EncodingUtils.encodedShortStringLength(_messagePublishInfo.getRoutingKey());
+ size += 1; // flags for immediate/mandatory
+ size += EncodingUtils.encodedLongLength();
+
+ return size;
+ }
+
+ public int writeToBuffer(int offset, ByteBuffer dest)
+ {
+ ByteBuffer src = ByteBuffer.allocate((int)getStorableSize());
+
+ org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(src);
+ EncodingUtils.writeInteger(minaSrc, _contentHeaderBody.getSize());
+ _contentHeaderBody.writePayload(minaSrc);
+ EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getExchange());
+ EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getRoutingKey());
+ byte flags = 0;
+ if(_messagePublishInfo.isMandatory())
+ {
+ flags |= MANDATORY_FLAG;
+ }
+ if(_messagePublishInfo.isImmediate())
+ {
+ flags |= IMMEDIATE_FLAG;
+ }
+ EncodingUtils.writeByte(minaSrc, flags);
+ EncodingUtils.writeLong(minaSrc,_arrivalTime);
+ src.position(minaSrc.position());
+ src.flip();
+ src.position(offset);
+ src = src.slice();
+ if(dest.remaining() < src.limit())
+ {
+ src.limit(dest.remaining());
+ }
+ dest.put(src);
+
+
+ return src.limit();
+ }
+
+ public int getContentSize()
+ {
+ return (int) _contentHeaderBody.bodySize;
+ }
+
+ public boolean isPersistent()
+ {
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.properties);
+ return properties.getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT;
+ }
+
+ private static class MetaDataFactory implements MessageMetaDataType.Factory
+ {
+
+
+ public MessageMetaData createMetaData(ByteBuffer buf)
+ {
+ try
+ {
+ org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(buf);
+ int size = EncodingUtils.readInteger(minaSrc);
+ ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(minaSrc, size);
+ final AMQShortString exchange = EncodingUtils.readAMQShortString(minaSrc);
+ final AMQShortString routingKey = EncodingUtils.readAMQShortString(minaSrc);
+
+ final byte flags = EncodingUtils.readByte(minaSrc);
+ long arrivalTime = EncodingUtils.readLong(minaSrc);
+
+ MessagePublishInfo publishBody =
+ new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ }
+
+ public boolean isImmediate()
+ {
+ return (flags & IMMEDIATE_FLAG) != 0;
+ }
+
+ public boolean isMandatory()
+ {
+ return (flags & MANDATORY_FLAG) != 0;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return routingKey;
+ }
+ };
+ return new MessageMetaData(publishBody, chb, 0, arrivalTime);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+ };
+
+ public AMQMessageHeader getMessageHeader()
+ {
+ return new MessageHeaderAdapter();
+ }
+
+ private final class MessageHeaderAdapter implements AMQMessageHeader
+ {
+ private BasicContentHeaderProperties getProperties()
+ {
+ return (BasicContentHeaderProperties) getContentHeaderBody().properties;
+ }
+
+ public String getCorrelationId()
+ {
+ return getProperties().getCorrelationIdAsString();
+ }
+
+ public long getExpiration()
+ {
+ return getProperties().getExpiration();
+ }
+
+ public String getMessageId()
+ {
+ return getProperties().getMessageIdAsString();
+ }
+
+ public String getMimeType()
+ {
+ return getProperties().getContentTypeAsString();
+ }
+
+ public String getEncoding()
+ {
+ return getProperties().getEncodingAsString();
+ }
+
+ public byte getPriority()
+ {
+ return getProperties().getPriority();
+ }
+
+ public long getTimestamp()
+ {
+ return getProperties().getTimestamp();
+ }
+
+ public String getType()
+ {
+ return getProperties().getTypeAsString();
+ }
+
+ public String getReplyTo()
+ {
+ return getProperties().getReplyToAsString();
+ }
+
+ public Object getHeader(String name)
+ {
+ FieldTable ft = getProperties().getHeaders();
+ return ft.get(name);
+ }
+
+ public boolean containsHeaders(Set<String> names)
+ {
+ FieldTable ft = getProperties().getHeaders();
+ for(String name : names)
+ {
+ if(!ft.containsKey(name))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public boolean containsHeader(String name)
+ {
+ FieldTable ft = getProperties().getHeaders();
+ return ft.containsKey(name);
+ }
+
+
+
+ }
}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,242 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.transport.codec.BBDecoder;
+
+import java.nio.ByteBuffer;
+import java.lang.ref.WeakReference;
+
+public class MessageMetaData_0_10 implements StorableMessageMetaData
+{
+ private Header _header;
+ private DeliveryProperties _deliveryProps;
+ private MessageProperties _messageProps;
+ private MessageTransferHeader _messageHeader;
+ private long _arrivalTime;
+ private int _bodySize;
+ private volatile WeakReference<ByteBuffer> _body;
+
+ private static final int ENCODER_SIZE = 1 << 16;
+
+ public static final MessageMetaDataType.Factory<MessageMetaData_0_10> FACTORY = new MetaDataFactory();
+
+ private volatile ByteBuffer _encoded;
+
+
+ public MessageMetaData_0_10(MessageTransfer xfr)
+ {
+ this(xfr.getHeader(), xfr.getBodySize(), xfr.getBody(), System.currentTimeMillis());
+ }
+
+ private MessageMetaData_0_10(Header header, int bodySize, long arrivalTime)
+ {
+ this(header, bodySize, null, arrivalTime);
+ }
+
+ private MessageMetaData_0_10(Header header, int bodySize, ByteBuffer xfrBody, long arrivalTime)
+ {
+ _header = header;
+ if(_header != null)
+ {
+ _deliveryProps = _header.get(DeliveryProperties.class);
+ _messageProps = _header.get(MessageProperties.class);
+ }
+ else
+ {
+ _deliveryProps = null;
+ _messageProps = null;
+ }
+ _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps);
+ _arrivalTime = arrivalTime;
+ _bodySize = bodySize;
+
+
+
+ if(xfrBody == null)
+ {
+ _body = null;
+ }
+ else
+ {
+ ByteBuffer body = ByteBuffer.allocate(_bodySize);
+ body.put(xfrBody);
+ body.flip();
+ _body = new WeakReference(body);
+ }
+
+
+ }
+
+
+
+ public MessageMetaDataType getType()
+ {
+ return MessageMetaDataType.META_DATA_0_10;
+ }
+
+ public int getStorableSize()
+ {
+ ByteBuffer buf = _encoded;
+
+ if(buf == null)
+ {
+ buf = encodeAsBuffer();
+ _encoded = buf;
+ }
+
+ //TODO -- need to add stuff
+ return buf.limit();
+ }
+
+ private ByteBuffer encodeAsBuffer()
+ {
+ BBEncoder encoder = new BBEncoder(ENCODER_SIZE);
+
+ encoder.writeInt64(_arrivalTime);
+ encoder.writeInt32(_bodySize);
+ Struct[] headers = _header == null ? new Struct[0] : _header.getStructs();
+ encoder.writeInt32(headers.length);
+
+
+ for(Struct header : headers)
+ {
+ encoder.writeStruct32(header);
+
+ }
+
+ ByteBuffer buf = encoder.buffer();
+ return buf;
+ }
+
+ public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
+ {
+ ByteBuffer buf = _encoded;
+
+ if(buf == null)
+ {
+ buf = encodeAsBuffer();
+ _encoded = buf;
+ }
+
+ buf = buf.duplicate();
+
+ buf.position(offsetInMetaData);
+
+ if(dest.remaining() < buf.limit())
+ {
+ buf.limit(dest.remaining());
+ }
+ dest.put(buf);
+ return buf.limit();
+ }
+
+ public int getContentSize()
+ {
+ return _bodySize;
+ }
+
+ public boolean isPersistent()
+ {
+ return _deliveryProps == null ? false : _deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT;
+ }
+
+ public String getRoutingKey()
+ {
+ return _deliveryProps == null ? null : _deliveryProps.getRoutingKey();
+ }
+
+ public AMQMessageHeader getMessageHeader()
+ {
+ return _messageHeader;
+ }
+
+ public long getSize()
+ {
+
+ return _bodySize;
+ }
+
+ public boolean isImmediate()
+ {
+ return _deliveryProps != null && _deliveryProps.getImmediate();
+ }
+
+ public long getExpiration()
+ {
+ return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
+ }
+
+ public long getArrivalTime()
+ {
+ return _arrivalTime;
+ }
+
+ public Header getHeader()
+ {
+ return _header;
+ }
+
+ public ByteBuffer getBody()
+ {
+ ByteBuffer body = _body == null ? null : _body.get();
+ return body;
+ }
+
+ public void setBody(ByteBuffer body)
+ {
+ _body = new WeakReference(body);
+ }
+
+ private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10>
+ {
+ public MessageMetaData_0_10 createMetaData(ByteBuffer buf)
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(buf);
+
+ long arrivalTime = decoder.readInt64();
+ int bodySize = decoder.readInt32();
+ int headerCount = decoder.readInt32();
+
+ Struct[] headers = new Struct[headerCount];
+
+ for(int i = 0 ; i < headerCount; i++)
+ {
+ headers[i] = decoder.readStruct32();
+ }
+
+ Header header = new Header(headers);
+
+ return new MessageMetaData_0_10(header, bodySize, arrivalTime);
+
+ }
+ }
+}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java Tue Oct 20 16:23:01 2009
@@ -57,6 +57,16 @@
return _messageProps == null ? null : String.valueOf(_messageProps.getMessageId());
}
+ public String getMimeType()
+ {
+ return _messageProps == null ? null : _messageProps.getContentType();
+ }
+
+ public String getEncoding()
+ {
+ return _messageProps == null ? null : _messageProps.getContentEncoding();
+ }
+
public byte getPriority()
{
MessageDeliveryPriority priority = _deliveryProps == null
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java Tue Oct 20 16:23:01 2009
@@ -21,6 +21,7 @@
package org.apache.qpid.server.message;
import org.apache.qpid.transport.*;
+import org.apache.qpid.server.store.StoredMessage;
import java.util.concurrent.atomic.AtomicLong;
import java.nio.ByteBuffer;
@@ -29,76 +30,65 @@
public class MessageTransferMessage implements InboundMessage, ServerMessage
{
- private static final AtomicLong _numberSource = new AtomicLong(0L);
- private final MessageTransfer _xfr;
- private final DeliveryProperties _deliveryProps;
- private final MessageProperties _messageProps;
- private final AMQMessageHeader _messageHeader;
- private final long _messageNumber;
- private final long _arrivalTime;
+
+ private StoredMessage<MessageMetaData_0_10> _storeMessage;
+
+
private WeakReference<Session> _sessionRef;
- public MessageTransferMessage(MessageTransfer xfr, WeakReference<Session> sessionRef)
- {
- this(_numberSource.getAndIncrement(), xfr, sessionRef);
- }
- public MessageTransferMessage(long messageNumber, MessageTransfer xfr, WeakReference<Session> sessionRef)
+ public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef)
{
- _xfr = xfr;
- _messageNumber = messageNumber;
- Header header = _xfr.getHeader();
- if(header != null)
- {
- _deliveryProps = header.get(DeliveryProperties.class);
- _messageProps = header.get(MessageProperties.class);
- }
- else
- {
- _deliveryProps = null;
- _messageProps = null;
- }
- _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps);
- _arrivalTime = System.currentTimeMillis();
+ _storeMessage = storeMessage;
_sessionRef = sessionRef;
+
+ }
+
+ private MessageMetaData_0_10 getMetaData()
+ {
+ return _storeMessage.getMetaData();
}
public String getRoutingKey()
{
- return _deliveryProps == null ? null : _deliveryProps.getRoutingKey();
+ return getMetaData().getRoutingKey();
+
}
public AMQMessageHeader getMessageHeader()
{
- return _messageHeader;
+ return getMetaData().getMessageHeader();
}
public boolean isPersistent()
{
- return (_deliveryProps != null) && (_deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT);
+ return getMetaData().isPersistent();
}
+
public boolean isRedelivered()
{
+ // The *Message* is never redelivered, only queue entries are... this is here so that filters
+ // can run against the message on entry to an exchange
return false;
}
public long getSize()
{
- return _xfr.getBodySize();
+ return getMetaData().getSize();
}
public boolean isImmediate()
{
- return _deliveryProps != null && _deliveryProps.getImmediate();
+ return getMetaData().isImmediate();
}
public long getExpiration()
{
- return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
+ return getMetaData().getExpiration();
}
public MessageReference newReference()
@@ -108,23 +98,43 @@
public Long getMessageNumber()
{
- return _messageNumber;
+ return _storeMessage.getMessageNumber();
}
public long getArrivalTime()
{
- return _arrivalTime;
+ return getMetaData().getArrivalTime();
}
- public Header getHeader()
+ public int getContent(ByteBuffer buf, int offset)
{
- return _xfr.getHeader();
+ return _storeMessage.getContent(offset, buf);
+ }
+ public Header getHeader()
+ {
+ return getMetaData().getHeader();
}
public ByteBuffer getBody()
{
- return _xfr.getBody();
+ ByteBuffer body = getMetaData().getBody();
+ if(body == null)
+ {
+ final int size = (int) getSize();
+ int pos = 0;
+ body = ByteBuffer.allocate(size);
+
+ while(pos < size)
+ {
+ pos += getContent(body, pos);
+ }
+
+ body.flip();
+
+ getMetaData().setBody(body.duplicate());
+ }
+ return body;
}
public Session getSession()
@@ -132,4 +142,5 @@
return _sessionRef == null ? null : _sessionRef.get();
}
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java Tue Oct 20 16:23:01 2009
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.message;
-public interface ServerMessage extends EnqueableMessage
+import java.nio.ByteBuffer;
+
+public interface ServerMessage extends EnqueableMessage, MessageContentSource
{
String getRoutingKey();
@@ -39,4 +41,7 @@
Long getMessageNumber();
long getArrivalTime();
+
+ public int getContent(ByteBuffer buf, int offset);
+
}
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,124 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.output;
+
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.AMQPInvalidClassException;
+
+import java.util.Map;
+
+public class HeaderPropertiesConverter
+{
+
+ public static BasicContentHeaderProperties convert(MessageTransferMessage messageTransferMessage)
+ {
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ Header header = messageTransferMessage.getHeader();
+ DeliveryProperties deliveryProps = header.get(DeliveryProperties.class);
+ MessageProperties messageProps = header.get(MessageProperties.class);
+
+ if(deliveryProps != null)
+ {
+ if(deliveryProps.hasDeliveryMode())
+ {
+ props.setDeliveryMode((byte)(deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT));
+ }
+ if(deliveryProps.hasExpiration())
+ {
+ props.setExpiration(deliveryProps.getExpiration());
+ }
+ if(deliveryProps.hasPriority())
+ {
+ props.setPriority((byte)deliveryProps.getPriority().getValue());
+ }
+ if(deliveryProps.hasTimestamp())
+ {
+ props.setTimestamp(deliveryProps.getTimestamp());
+ }
+ }
+ if(messageProps != null)
+ {
+ if(messageProps.hasAppId())
+ {
+ props.setAppId(new AMQShortString(messageProps.getAppId()));
+ }
+ if(messageProps.hasContentType())
+ {
+ props.setContentType(messageProps.getContentType());
+ }
+ if(messageProps.hasCorrelationId())
+ {
+ props.setCorrelationId(new AMQShortString(messageProps.getCorrelationId()));
+ }
+ if(messageProps.hasContentEncoding())
+ {
+ props.setEncoding(messageProps.getContentEncoding());
+ }
+ if(messageProps.hasMessageId())
+ {
+ props.setMessageId(messageProps.getMessageId().toString());
+ }
+
+ // TODO Reply-to
+
+ if(messageProps.hasUserId())
+ {
+ props.setUserId(new AMQShortString(messageProps.getUserId()));
+ }
+
+ if(messageProps.hasApplicationHeaders())
+ {
+ Map<String, Object> appHeaders = messageProps.getApplicationHeaders();
+ FieldTable ft = new FieldTable();
+ for(Map.Entry<String, Object> entry : appHeaders.entrySet())
+ {
+ try
+ {
+ ft.put(new AMQShortString(entry.getKey()), entry.getValue());
+ }
+ catch(AMQPInvalidClassException e)
+ {
+ // TODO
+ // log here, but ignore - just can;t convert
+ }
+ }
+ props.setHeaders(ft);
+
+ }
+ }
+
+
+
+
+
+
+
+ return props;
+ }
+}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java Tue Oct 20 16:23:01 2009
@@ -26,17 +26,15 @@
*/
package org.apache.qpid.server.output;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.AMQException;
-import java.util.Iterator;
-
public interface ProtocolOutputConverter
{
void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag);
@@ -55,7 +53,7 @@
byte getProtocolMajorVersion();
- void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, Iterator<AMQDataBlock> bodyFrameIterator, int channelId, int replyCode, AMQShortString replyText)
+ void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource msgContent, int channelId, int replyCode, AMQShortString replyText)
throws AMQException;
void writeFrame(AMQDataBlock block);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org