You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/25 23:59:05 UTC
svn commit: r829675 [4/11] - in /qpid/trunk/qpid/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/ broker/bin/ broker/src/main/java/org/apac...
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Sun Oct 25 22:58:57 2009
@@ -81,13 +81,13 @@
message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
if(message != null)
{
- message.discard(channel.getStoreContext());
+ message.discard();
}
//sendtoDeadLetterQueue(msg)
return;
}
- if (!message.getMessage().isReferenced())
+ if (message.getMessage() == null)
{
_logger.warn("Message as already been purged, unable to Reject.");
return;
@@ -96,7 +96,7 @@
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
+ _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
": Requeue:" + body.getRequeue() +
//": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Sun Oct 25 22:58:57 2009
@@ -24,7 +24,6 @@
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -96,7 +95,7 @@
session.writeFrame(responseBody.generateFrame(channelId));
-
+
}
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Sun Oct 25 22:58:57 2009
@@ -50,5 +50,6 @@
}
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
session.initHeartbeats(body.getHeartbeat());
+ session.setMaxFrameSize(body.getFrameMax());
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Sun Oct 25 22:58:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -92,11 +92,11 @@
try
{
- exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(),
- body.getType() == null ? null : body.getType().intern(),
- body.getDurable(),
- body.getPassive(), body.getTicket());
- exchangeRegistry.registerExchange(exchange);
+ exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(),
+ body.getType() == null ? null : body.getType().intern(),
+ body.getDurable(),
+ body.getPassive(), body.getTicket());
+ exchangeRegistry.registerExchange(exchange);
}
catch(AMQUnknownExchangeType e)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Sun Oct 25 22:58:57 2009
@@ -40,7 +40,7 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
@@ -60,11 +60,11 @@
public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- MessageStore store = virtualHost.getMessageStore();
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
if (!body.getPassive())
@@ -109,11 +109,31 @@
else
{
queue = createQueue(queueName, body, virtualHost, session);
+ queue.setPrincipalHolder(session);
if (queue.isDurable() && !queue.isAutoDelete())
{
store.createQueue(queue, body.getArguments());
}
queueRegistry.registerQueue(queue);
+ if(queue.isExclusive() && !queue.isAutoDelete())
+ {
+ final AMQQueue q = queue;
+ queue.setExclusiveOwner(session);
+ final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
+ {
+ public void doTask(AMQProtocolSession session) throws AMQException
+ {
+ q.setExclusiveOwner(null);
+ }
+ };
+ session.addSessionCloseTask(sessionCloseTask);
+ queue.addQueueDeleteTask(new AMQQueue.Task() {
+ public void doTask(AMQQueue queue) throws AMQException
+ {
+ session.removeSessionCloseTask(sessionCloseTask);
+ }
+ });
+ }
if (autoRegister)
{
Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
@@ -123,14 +143,18 @@
}
}
}
- else if (queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
+ else if (queue.getPrincipalHolder() != null
+ && queue.getPrincipalHolder().getPrincipal() != null
+ && queue.getPrincipalHolder().getPrincipal().getName() != null
+ && (!queue.getPrincipalHolder().getPrincipal().getName().equals(session.getPrincipal().getName())
+ || ((!body.getPassive() && queue.getExclusiveOwner() != null && queue.getExclusiveOwner() != session))))
{
throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
+ "declared on another client ID('"
- + queue.getOwner() + "')");
- }
+ + queue.getPrincipalHolder().getPrincipal().getName() + "')");
+ }
AMQChannel channel = session.getChannel(channelId);
if (channel == null)
@@ -197,7 +221,7 @@
}
});
}// if exclusive and not durable
-
+
return queue;
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Sun Oct 25 22:58:57 2009
@@ -31,6 +31,7 @@
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.security.access.Permission;
@@ -62,7 +63,7 @@
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- MessageStore store = virtualHost.getMessageStore();
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
AMQQueue queue;
if (body.getQueue() == null)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Sun Oct 25 22:58:57 2009
@@ -33,7 +33,6 @@
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.security.access.Permission;
public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
{
@@ -106,7 +105,7 @@
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
- long purged = queue.clearQueue(channel.getStoreContext());
+ long purged = queue.clearQueue();
if(!body.getNowait())
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Sun Oct 25 22:58:57 2009
@@ -61,14 +61,12 @@
{
throw body.getChannelNotFoundException(channelId);
}
-
channel.commit();
MethodRegistry methodRegistry = session.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
session.writeFrame(responseBody.generateFrame(channelId));
-
- channel.processReturns();
+
}
catch (AMQException e)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Sun Oct 25 22:58:57 2009
@@ -44,9 +44,9 @@
{
}
- public void methodReceived(AMQStateManager stateManager, TxRollbackBody body, int channelId) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, TxRollbackBody body, final int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
try
{
@@ -57,17 +57,22 @@
throw body.getChannelNotFoundException(channelId);
}
- channel.rollback();
- MethodRegistry methodRegistry = session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
+ final MethodRegistry methodRegistry = session.getMethodRegistry();
+ final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
+
+ Runnable task = new Runnable()
+ {
+
+ public void run()
+ {
+ session.writeFrame(responseBody.generateFrame(channelId));
+ }
+ };
+
+ channel.rollback(task);
- //Now resend all the unacknowledged messages back to the original subscribers.
- //(Must be done after the TxnRollback-ok response).
- // Why, are we not allowed to send messages back to client before the ok method?
- channel.resend(false);
}
catch (AMQException e)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java Sun Oct 25 22:58:57 2009
@@ -68,7 +68,7 @@
*/
_logString = "[" + MessageFormat.format(ChannelLogSubject.CHANNEL_FORMAT,
session.getSessionID(),
- session.getAuthorizedID().getName(),
+ session.getPrincipal().getName(),
session.getRemoteAddress(),
session.getVirtualHost().getName(),
channel.getChannelId())
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java Sun Oct 25 22:58:57 2009
@@ -75,7 +75,7 @@
{
_logString = "[" + MessageFormat.format(USER_FORMAT,
session.getSessionID(),
- session.getAuthorizedID().getName(),
+ session.getPrincipal().getName(),
session.getRemoteAddress())
+ "] ";
@@ -105,7 +105,7 @@
*/
_logString = "[" + MessageFormat.format(ConnectionLogSubject.CONNECTION_FORMAT,
session.getSessionID(),
- session.getAuthorizedID().getName(),
+ session.getPrincipal().getName(),
session.getRemoteAddress(),
session.getVirtualHost().getName())
+ "] ";
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java Sun Oct 25 22:58:57 2009
@@ -21,6 +21,9 @@
package org.apache.qpid.server.logging.actors;
import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.RootMessageLogger;
import java.util.EmptyStackException;
import java.util.Stack;
@@ -66,6 +69,8 @@
}
};
+ private static LogActor _defaultActor;
+
/**
* Set a new LogActor to be the Current Actor
* <p/>
@@ -105,7 +110,12 @@
}
catch (EmptyStackException ese)
{
- return null;
+ return _defaultActor;
}
}
+
+ public static void setDefault(LogActor defaultActor)
+ {
+ _defaultActor = defaultActor;
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties Sun Oct 25 22:58:57 2009
@@ -61,13 +61,32 @@
# 0 - path
MST-1002 = Store location : {0}
MST-1003 = Closed
+MST-1004 = Recovery Start
+MST-1005 = Recovered {0,number} messages
+MST-1006 = Recovery Complete
+
+#ConfigStore
+# 0 - name
+CFG-1001 = Created : {0}
+# 0 - path
+CFG-1002 = Store location : {0}
+CFG-1003 = Closed
+CFG-1004 = Recovery Start
+CFG-1005 = Recovery Complete
+
+#TransactionLog
+# 0 - name
+TXN-1001 = Created : {0}
+# 0 - path
+TXN-1002 = Store location : {0}
+TXN-1003 = Closed
# 0 - queue name
-MST-1004 = Recovery Start[ : {0}]
+TXN-1004 = Recovery Start[ : {0}]
# 0 - count
# 1 - queue count
-MST-1005 = Recovered {0,number} messages for queue {1}
+TXN-1005 = Recovered {0,number} messages for queue {1}
# 0 - queue name
-MST-1006 = Recovery Complete[ : {0}]
+TXN-1006 = Recovery Complete[ : {0}]
#Connection
# 0 - Client id
@@ -83,12 +102,18 @@
# 0 - bytes allowed in prefetch
# 1 - number of messagse.
CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number}
+# 0 - queue causing flow control
+CHN-1005 = Flow Control Enforced (Queue {0})
+CHN-1006 = Flow Control Removed
#Queue
# 0 - owner
# 1 - priority
QUE-1001 = Create :[ Owner: {0}][ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}]
QUE-1002 = Deleted
+QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number}
+QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
+
#Exchange
# 0 - type
@@ -104,4 +129,4 @@
SUB-1001 = Create[ : Durable][ : Arguments : {0}]
SUB-1002 = Close
# 0 - The current subscription state
-SUB-1003 = State : {0}
\ No newline at end of file
+SUB-1003 = State : {0}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties Sun Oct 25 22:58:57 2009
@@ -16,173 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
-# LogMessages used within the Java Broker as originally defined on the wiki:
-#
-# http://cwiki.apache.org/confluence/display/qpid/Status+Update+Design#StatusUpdateDesign-InitialStatusMessages
-#
-# Technical Notes:
-# This is a standard Java Properties file so white space is respected at the
-# end of the lines. This file is processed in a number of ways.
-# 1) ResourceBundle
-# This file is loaded through a ResourceBundle named LogMessages. the en_US
-# addition to the file is the localisation. Additional localisations can be
-# provided and will automatically be selected based on the <locale> value in
-# the config.xml. The default is en_US.
-#
-# 2) MessasgeFormat
-# Each entry is prepared with the Java Core MessageFormat methods. Therefore
-# most functionality you can do via MessageFormat can be done here:
-#
-# http://java.sun.com/javase/6/docs/api/java/text/MessageFormat.html
-#
-# The cavet here is that only default String and number FormatTypes can be used.
-# This is due to the processing described in 3 below. If support for date, time
-# or choice is requried then the GenerateLogMessages class should be updated to
-# provide support.
-#
-# Format Note:
-# As mentioned earlier white space in this file is very important. One thing
-# in particular to note is the way MessageFormat peforms its replacements.
-# The replacement text will totally replace the {xxx} section so there will be
-# no addtion of white space or removal e.g.
-# MSG = Text----{0}----
-# When given parameter 'Hello' result in text:
-# Text----Hello----
-#
-# For simple arguments this is expected however when using Style formats then
-# it can be a little unexepcted. In particular a common pattern is used for
-# number replacements : {0,number,#}. This is used in the Broker to display an
-# Integer simply as the Integer with no formating. e.g new Integer(1234567)
-# becomes the String "1234567" which is can be contrasted with the pattern
-# without a style format field : {0,number} which becomes string "1,234,567".
-#
-# What you may not expect is that {0,number, #} would produce the String " 1234567"
-# note the space after the ',' here /\ has resulted in a space /\ in
-# the output.
-#
-# More details on the SubformatPattern can be found on the API link above.
-#
-# 3) GenerateLogMessage/Velocity Macro
-# This is the first and final stage of processing that this file goes through.
-# 1) Class Generation:
-# The GenerateLogMessage processes this file and uses the velocity Macro
-# to create classes with static methods to perform the logging and give us
-# compile time validation.
-#
-# 2) Property Processing:
-# During the class generation the message properties ({x}) are identified
-# and used to create the method signature.
-#
-# 3) Option Processing:
-# The Classes perform final formatting of the messages at runtime based on
-# optional parameters that are defined within the message. Optional
-# paramters are enclosed in square brackets e.g. [optional].
-#
-# To provide fixed log messages as required by the Technical Specification:
-# http://cwiki.apache.org/confluence/display/qpid/Operational+Logging+-+Status+Update+-+Technical+Specification#OperationalLogging-StatusUpdate-TechnicalSpecification-Howtoprovidefixedlogmessages
-#
-# This file is processed by Velocity to create a number of classes that contain
-# static methods that provide LogMessages in the code to provide compile time
-# validation.
-#
-# For details of what processing is done see GenerateLogMessages.
-#
-# What a localiser or developer need know is the following:
-#
-# The Property structure is important is it defines how the class and methods
-# will be built.
-#
-# Class Generation:
-# =================
-#
-# Each class of messages will be split in to their own <Class>Messages.java
-# Currently the following classes are created and are populated with the
-# messages that bear their 3-digit type identifier:
-#
-# Class | Type
-# ---------------------|--------
-# Broker | BKR
-# ManagementConsole | MNG
-# VirtualHost | VHT
-# MessageStore | MST
-# Connection | CON
-# Channel | CHN
-# Queue | QUE
-# Exchange | EXH
-# Binding | BND
-# Subscription | SUB
-#
-# Property Processing:
-# ====================
-#
-# Each property is then processed by the GenerateLogMessages class to identify
-# The number and type of parameters, {x} entries. Parameters are defaulted to
-# String types but the use of FormatType number (e.g.{0,number}) will result
-# in a Number type being used. These parameters are then used to build the
-# method parameter list. e.g:
-# Property:
-# BRK-1003 = Shuting down : {0} port {1,number,#}
-# becomes Method:
-# public static LogMessage BRK_1003(String param1, Number param2)
-#
-# This improves our compile time validation of log message content and
-# ensures that change in the message format does not accidentally cause
-# erroneous messages.
-#
-# Option Processing:
-# ====================
-#
-# Options are identified in the log message as being surrounded by square
-# brackets ([ ]). These optional values can themselves contain paramters
-# however nesting of options is not permitted. Identification is performed on
-# first matchings so give the message:
-# Msg = Log Message [option1] [option2]
-# Two options will be identifed and enabled to select text 'option1 and
-# 'option2'.
-#
-# The nesting of a options is not supported and will provide
-# unexpected results. e.g. Using Message:
-# Msg = Log Message [option1 [sub-option2]]
-#
-# The options will be 'option1 [sub-option2' and 'sub-option2'. The first
-# option includes the second option as the nesting is not detected.
-#
-# The detected options are presented in the method signature as boolean options
-# numerically identified by their position in the message. e.g.
-# Property:
-# CON-1001 = Open : Client ID {0} [: Protocol Version : {1}]
-# becomes Method:
-# public static LogMessage CON_1001(String param1, String param2, boolean opt1)
-#
-# The value of 'opt1' will show/hide the option in the message. Note that
-# 'param2' is still required however a null value can be used if the optional
-# section is not desired.
-#
-# Again here the importance of white space needs to be highlighted.
-# Looking at the QUE-1001 message as an example. The first thought on how this
-# would look would be as follows:
-# QUE-1001 = Create : Owner: {0} [AutoDelete] [Durable] [Transient] [Priority: {1,number,#}]
-# Each option is correctly defined so the text that is defined will appear when
-# selected. e.g. 'AutoDelete'. However, what may not be immediately apparent is
-# the white space. Using the above definition of QUE-1001 if we were to print
-# the message with only the Priority option displayed it would appear as this:
-# "Create : Owner: guest Priority: 1"
-# Note the spaces here /\ This is because only the text between the brackets
-# has been removed.
-#
-# Each option needs to include white space to correctly format the message. So
-# the correct definition of QUE-1001 is as follows:
-# QUE-1001 = Create : Owner: {0}[ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}]
-# Note that white space is included with each option and there is no extra
-# white space between the options. As a result the output with just Priority
-# enabled is as follows:
-# "Create : Owner: guest Priority: 1"
-#
-# The final processing that is done in the generation is the conversion of the
-# property name. As a '-' is an illegal character in the method name it is
-# converted to '_' This processing gives the final method signature as follows:
-# <Class>Message.<Type>_<Number>(<parmaters>,<options>)
-#
+# Default File used for all non-defined locales.
#Broker
# 0 - Version
# 1 = Build
@@ -227,13 +61,32 @@
# 0 - path
MST-1002 = Store location : {0}
MST-1003 = Closed
+MST-1004 = Recovery Start
+MST-1005 = Recovered {0,number} messages
+MST-1006 = Recovery Complete
+
+#ConfigStore
+# 0 - name
+CFG-1001 = Created : {0}
+# 0 - path
+CFG-1002 = Store location : {0}
+CFG-1003 = Closed
+CFG-1004 = Recovery Start
+CFG-1005 = Recovery Complete
+
+#TransactionLog
+# 0 - name
+TXN-1001 = Created : {0}
+# 0 - path
+TXN-1002 = Store location : {0}
+TXN-1003 = Closed
# 0 - queue name
-MST-1004 = Recovery Start[ : {0}]
+TXN-1004 = Recovery Start[ : {0}]
# 0 - count
# 1 - queue count
-MST-1005 = Recovered {0,number} messages for queue {1}
+TXN-1005 = Recovered {0,number} messages for queue {1}
# 0 - queue name
-MST-1006 = Recovery Complete[ : {0}]
+TXN-1006 = Recovery Complete[ : {0}]
#Connection
# 0 - Client id
@@ -247,8 +100,9 @@
CHN-1002 = Flow {0}
CHN-1003 = Close
# 0 - bytes allowed in prefetch
-# 1 - number of messagse.
+# 1 - number of messagse.
CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number}
+# 0 - queue causing flow control
CHN-1005 = Flow Control Enforced (Queue {0})
CHN-1006 = Flow Control Removed
@@ -260,6 +114,7 @@
QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number}
QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
+
#Exchange
# 0 - type
# 1 - name
@@ -273,4 +128,5 @@
#Subscription
SUB-1001 = Create[ : Durable][ : Arguments : {0}]
SUB-1002 = Close
+# 0 - The current subscription state
SUB-1003 = State : {0}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java Sun Oct 25 22:58:57 2009
@@ -46,7 +46,7 @@
// Provide the value for the 4th replacement.
setLogStringWithFormat(CHANNEL_FORMAT,
session.getSessionID(),
- session.getAuthorizedID().getName(),
+ session.getPrincipal().getName(),
session.getRemoteAddress(),
session.getVirtualHost().getName(),
channel.getChannelId());
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java Sun Oct 25 22:58:57 2009
@@ -41,7 +41,7 @@
public ConnectionLogSubject(AMQProtocolSession session)
{
setLogStringWithFormat(CONNECTION_FORMAT, session.getSessionID(),
- session.getAuthorizedID().getName(),
+ session.getPrincipal().getName(),
session.getRemoteAddress(),
session.getVirtualHost().getName());
}
Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 25 22:58:57 2009
@@ -1,3 +1,5 @@
/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management:757268
+/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management:795950-829653
+/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/management:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/management:787599
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java Sun Oct 25 22:58:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -65,16 +65,9 @@
return null;
}
- public void register() throws AMQException
+ public void register() throws JMException
{
- try
- {
- getManagedObjectRegistry().registerObject(this);
- }
- catch (JMException e)
- {
- throw new AMQException("Error registering managed object " + this + ": " + e, e);
- }
+ getManagedObjectRegistry().registerObject(this);
}
protected ManagedObjectRegistry getManagedObjectRegistry()
@@ -98,7 +91,7 @@
{
return getObjectInstanceName() + "[" + getType() + "]";
}
-
+
/**
* Created the ObjectName as per the JMX Specs
@@ -140,7 +133,7 @@
objectName.append(",");
objectName.append("version=").append(_version);
-
+
return new ObjectName(objectName.toString());
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java Sun Oct 25 22:58:57 2009
@@ -54,6 +54,7 @@
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.UnknownHostException;
import java.rmi.AlreadyBoundException;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
@@ -236,8 +237,17 @@
* The registry is exported on the defined management port 'port'. We will export the RMIConnectorServer
* on 'port +1'. Use of these two well-defined ports will ease any navigation through firewall's.
*/
- final RMIServerImpl rmiConnectorServerStub = new RMIJRMPServerImpl(port+PORT_EXPORT_OFFSET, csf, ssf, env);
- final String hostname = InetAddress.getLocalHost().getHostName();
+ final RMIServerImpl rmiConnectorServerStub = new RMIJRMPServerImpl(port+PORT_EXPORT_OFFSET, csf, ssf, env);
+ String localHost;
+ try
+ {
+ localHost = InetAddress.getLocalHost().getHostName();
+ }
+ catch(UnknownHostException ex)
+ {
+ localHost="127.0.0.1";
+ }
+ final String hostname = localHost;
final JMXServiceURL externalUrl = new JMXServiceURL(
"service:jmx:rmi://"+hostname+":"+(port+PORT_EXPORT_OFFSET)+"/jndi/rmi://"+hostname+":"+port+"/jmxrmi");
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java Sun Oct 25 22:58:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import javax.management.JMException;
import org.apache.qpid.AMQException;
@@ -45,7 +46,7 @@
ManagedObject getParentObject();
- void register() throws AMQException;
+ void register() throws AMQException, JMException;
void unregister() throws AMQException;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java Sun Oct 25 22:58:57 2009
@@ -26,10 +26,13 @@
*/
package org.apache.qpid.server.output;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.AMQException;
public interface ProtocolOutputConverter
@@ -41,16 +44,16 @@
ProtocolOutputConverter newInstance(AMQProtocolSession session);
}
- void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException;
- void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException;
+ void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException;
byte getProtocolMinorVersion();
byte getProtocolMajorVersion();
- void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+ void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource msgContent, int channelId, int replyCode, AMQShortString replyText)
throws AMQException;
void writeFrame(AMQDataBlock block);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Sun Oct 25 22:58:57 2009
@@ -27,28 +27,34 @@
package org.apache.qpid.server.output.amqp0_8;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
-import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.mina.common.ByteBuffer;
-
-import java.util.Iterator;
+import java.nio.ByteBuffer;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
+ private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+
+ private static final ProtocolVersionMethodConverter PROTOCOL_CONVERTER =
+ METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
{
return new Factory()
{
-
+
public ProtocolOutputConverter newInstance(AMQProtocolSession session)
{
return new ProtocolOutputConverterImpl(session);
@@ -69,71 +75,47 @@
return _protocolSession;
}
- public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
-
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
-
-
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ AMQDataBlock deliver = createEncodedDeliverFrame(entry, channelId, deliveryTag, consumerTag);
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+ }
- if(bodyCount == 0)
+ private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+ throws AMQException
+ {
+ if(entry.getMessage() instanceof AMQMessage)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
-
- writeFrame(compositeBlock);
+ return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
}
else
{
-
-
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
- writeFrame(compositeBlock);
-
- //
- // Now start writing out the other content bodies
- //
- for(int i = 1; i < bodyCount; i++)
- {
- cb = messageHandle.getContentChunk(storeContext, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
- }
-
-
+ final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
+ return chb;
}
-
-
}
- public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
+ AMQDataBlock deliver = createEncodedGetOkFrame(entry, channelId, deliveryTag, queueSize);
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+ }
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody chb, int channelId, AMQDataBlock deliver)
+ throws AMQException
+ {
- AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, chb);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
- final int bodyCount = messageHandle.getBodyCount(storeContext);
- if(bodyCount == 0)
+ final int bodySize = (int) message.getSize();
+ if(bodySize == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
contentHeader);
@@ -141,66 +123,97 @@
}
else
{
+ int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+ final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+ ByteBuffer buf = ByteBuffer.allocate(capacity);
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ int writtenSize = 0;
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ writtenSize += message.getContent(buf, writtenSize);
+ buf.flip();
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
- //
- // Now start writing out the other content bodies
- //
- for(int i = 1; i < bodyCount; i++)
+ while(writtenSize < bodySize)
{
- cb = messageHandle.getContentChunk(storeContext, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ buf = java.nio.ByteBuffer.allocate(capacity);
+ writtenSize += message.getContent(buf, writtenSize);
+ buf.flip();
+ writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
}
-
}
-
-
}
- private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ private AMQDataBlock createEncodedDeliverFrame(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicDeliverBody deliverBody =
- methodRegistry.createBasicDeliverBody(consumerTag,
+ METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
deliveryTag,
- messageHandle.isRedelivered(),
- pb.getExchange(),
- pb.getRoutingKey());
+ isRedelivered,
+ exchangeName,
+ routingKey);
+
AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
return deliverFrame;
}
- private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ private AMQDataBlock createEncodedGetOkFrame(QueueEntry entry, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicGetOkBody getOkBody =
- methodRegistry.createBasicGetOkBody(deliveryTag,
- messageHandle.isRedelivered(),
- pb.getExchange(),
- pb.getRoutingKey(),
+ METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey,
queueSize);
AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
@@ -217,54 +230,31 @@
return getProtocolSession().getProtocolMajorVersion();
}
- private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQDataBlock createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicReturnBody basicReturnBody =
- methodRegistry.createBasicReturnBody(replyCode,
+ METHOD_REGISTRY.createBasicReturnBody(replyCode,
replyText,
- message.getMessagePublishInfo().getExchange(),
- message.getMessagePublishInfo().getRoutingKey());
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
return returnFrame;
}
- public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+ public void writeReturn(MessagePublishInfo messagePublishInfo,
+ ContentHeaderBody header,
+ MessageContentSource content,
+ int channelId,
+ int replyCode,
+ AMQShortString replyText)
throws AMQException
{
- AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
- Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- if (bodyFrameIterator.hasNext())
- {
- AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
- writeFrame(compositeBlock);
- }
- else
- {
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
+ AMQDataBlock returnFrame = createEncodedReturnFrame(messagePublishInfo, channelId, replyCode, replyText);
- writeFrame(compositeBlock);
- }
+ writeMessageDelivery(content, header, channelId, returnFrame);
- //
- // Now start writing out the other content bodies
- // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
- //
- while (bodyFrameIterator.hasNext())
- {
- writeFrame(bodyFrameIterator.next());
- }
}
@@ -276,8 +266,7 @@
public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
- BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
+ BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Sun Oct 25 22:58:57 2009
@@ -1,46 +1,48 @@
package org.apache.qpid.server.output.amqp0_9;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
-import org.apache.mina.common.ByteBuffer;
-import java.util.Iterator;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
-import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.amqp_0_9.BasicGetBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
+ private static final ProtocolVersionMethodConverter
+ PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
@@ -68,20 +70,46 @@
return _protocolSession;
}
- public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
+ }
+
+
+ private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
throws AMQException
{
- AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
- final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+ }
+ else
+ {
+ final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
+ return chb;
+ }
+ }
+
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+ }
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ int bodySize = (int) message.getSize();
- if(bodyCount == 0)
+ if(bodySize == 0)
{
SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
contentHeaderBody);
@@ -90,101 +118,75 @@
}
else
{
+ int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+ final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ int writtenSize = 0;
- AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
- CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+ writtenSize += message.getContent(buf, writtenSize);
+ buf.flip();
+ AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
+
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
- //
- // Now start writing out the other content bodies
- //
- for(int i = 1; i < bodyCount; i++)
+ while(writtenSize < bodySize)
{
- cb = messageHandle.getContentChunk(storeContext, i);
- writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
- }
-
+ buf = java.nio.ByteBuffer.allocate(capacity);
+ writtenSize += message.getContent(buf, writtenSize);
+ buf.flip();
+ writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ }
}
-
}
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
-
+
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
contentHeaderBody);
return contentHeader;
}
- public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
-
- AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
+ throws AMQException
+ {
- AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
- final int bodyCount = messageHandle.getBodyCount(storeContext);
- if(bodyCount == 0)
+ if(entry.getMessage() instanceof AMQMessage)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
- writeFrame(compositeBlock);
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
}
else
{
-
-
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
- AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
- writeFrame(compositeBlock);
-
- //
- // Now start writing out the other content bodies
- //
- for(int i = 1; i < bodyCount; i++)
- {
- cb = messageHandle.getContentChunk(storeContext, i);
- writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
- }
-
-
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
}
-
- }
-
-
- private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
- throws AMQException
- {
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
-
-
- final boolean isRedelivered = messageHandle.isRedelivered();
- final AMQShortString exchangeName = pb.getExchange();
- final AMQShortString routingKey = pb.getRoutingKey();
+ final boolean isRedelivered = entry.isRedelivered();
final AMQBody returnBlock = new AMQBody()
{
@@ -237,22 +239,37 @@
return returnBlock;
}
- private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
throws AMQException
{
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
BasicGetOkBody getOkBody =
METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
- messageHandle.isRedelivered(),
- pb.getExchange(),
- pb.getRoutingKey(),
+ isRedelivered,
+ exchangeName,
+ routingKey,
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame;
+ return getOkBody;
}
public byte getProtocolMinorVersion()
@@ -265,53 +282,28 @@
return getProtocolSession().getProtocolMajorVersion();
}
- private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
{
BasicReturnBody basicReturnBody =
METHOD_REGISTRY.createBasicReturnBody(replyCode,
replyText,
- message.getMessagePublishInfo().getExchange(),
- message.getMessagePublishInfo().getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
- return returnFrame;
+
+ return basicReturnBody;
}
- public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
- AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
- Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- if (bodyFrameIterator.hasNext())
- {
- AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
- writeFrame(compositeBlock);
- }
- else
- {
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
-
- writeFrame(compositeBlock);
- }
-
- //
- // Now start writing out the other content bodies
- // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
- //
- while (bodyFrameIterator.hasNext())
- {
- writeFrame(bodyFrameIterator.next());
- }
+ writeMessageDelivery(message, header, channelId, returnFrame);
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Sun Oct 25 22:58:57 2009
@@ -34,6 +34,8 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicMarkableReference;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
@@ -83,8 +85,8 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Sender;
@@ -124,7 +126,7 @@
private Object _lastSent;
- protected boolean _closed;
+ protected volatile boolean _closed;
// maximum number of channels this session should have
private long _maxNoOfChannels = 1000;
@@ -139,7 +141,7 @@
private Principal _authorizedID;
private MethodDispatcher _dispatcher;
private ProtocolSessionIdentifier _sessionIdentifier;
-
+
// Create a simple ID that increments for ever new Session
private final long _sessionID = idGenerator.getAndIncrement();
@@ -152,11 +154,13 @@
private long _writtenBytes;
private long _readBytes;
-
+
private Job _readJob;
private Job _writeJob;
private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+ private long _maxFrameSize;
+ private final AtomicBoolean _closing = new AtomicBoolean(false);
public ManagedObject getManagedObject()
{
@@ -167,7 +171,7 @@
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_networkDriver = driver;
-
+
_codecFactory = new AMQCodecFactory(true, this);
_poolReference.acquireExecutorService();
_readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
@@ -178,17 +182,9 @@
}
- private AMQProtocolSessionMBean createMBean() throws AMQException
+ private AMQProtocolSessionMBean createMBean() throws JMException
{
- try
- {
- return new AMQProtocolSessionMBean(this);
- }
- catch (JMException ex)
- {
- _logger.error("AMQProtocolSession MBean creation has failed ", ex);
- throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
- }
+ return new AMQProtocolSessionMBean(this);
}
public long getSessionID()
@@ -201,6 +197,21 @@
return _actor;
}
+ public void setMaxFrameSize(long frameMax)
+ {
+ _maxFrameSize = frameMax;
+ }
+
+ public long getMaxFrameSize()
+ {
+ return _maxFrameSize;
+ }
+
+ public boolean isClosing()
+ {
+ return _closing.get();
+ }
+
public void received(final ByteBuffer msg)
{
_lastIoTime = System.currentTimeMillis();
@@ -290,7 +301,7 @@
else
{
// The channel has been told to close, we don't process any more frames until
- // it's closed.
+ // it's closed.
return;
}
}
@@ -545,7 +556,7 @@
private void checkForNotification()
{
int channelsCount = _channelMap.size();
- if (channelsCount >= _maxNoOfChannels)
+ if (_managedObject != null && channelsCount >= _maxNoOfChannels)
{
_managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
}
@@ -560,7 +571,7 @@
{
_maxNoOfChannels = value;
}
-
+
public void commitTransactions(AMQChannel channel) throws AMQException
{
if ((channel != null) && channel.isTransactional())
@@ -576,7 +587,7 @@
channel.rollback();
}
}
-
+
/**
* Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
* subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
@@ -676,34 +687,58 @@
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
public void closeSession() throws AMQException
{
- // REMOVE THIS SHOULD NOT BE HERE.
- if (CurrentActor.get() == null)
- {
- CurrentActor.set(_actor);
- }
- if (!_closed)
+ if(_closing.compareAndSet(false,true))
{
- if (_virtualHost != null)
+ // REMOVE THIS SHOULD NOT BE HERE.
+ if (CurrentActor.get() == null)
{
- _virtualHost.getConnectionRegistry().deregisterConnection(this);
+ CurrentActor.set(_actor);
}
-
- closeAllChannels();
- if (_managedObject != null)
+ if (!_closed)
{
- _managedObject.unregister();
- // Ensure we only do this once.
- _managedObject = null;
- }
+ if (_virtualHost != null)
+ {
+ _virtualHost.getConnectionRegistry().deregisterConnection(this);
+ }
+
+ closeAllChannels();
+ if (_managedObject != null)
+ {
+ _managedObject.unregister();
+ // Ensure we only do this once.
+ _managedObject = null;
+ }
- for (Task task : _taskList)
+ for (Task task : _taskList)
+ {
+ task.doTask(this);
+ }
+
+ synchronized(this)
+ {
+ _closed = true;
+ notifyAll();
+ }
+ _poolReference.releaseExecutorService();
+ CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002());
+ }
+ }
+ else
+ {
+ synchronized(this)
{
- task.doTask(this);
+ while(!_closed)
+ {
+ try
+ {
+ wait(1000);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+ }
}
-
- _closed = true;
- _poolReference.releaseExecutorService();
- CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002());
}
}
@@ -778,7 +813,7 @@
throw new IllegalArgumentException("Unsupported socket address class: " + address);
}
}
-
+
public SaslServer getSaslServer()
{
return _saslServer;
@@ -868,8 +903,15 @@
_virtualHost.getConnectionRegistry().registerConnection(this);
- _managedObject = createMBean();
- _managedObject.register();
+ try
+ {
+ _managedObject = createMBean();
+ _managedObject.register();
+ }
+ catch (JMException e)
+ {
+ _logger.error(e);
+ }
}
public void addSessionCloseTask(Task task)
@@ -881,7 +923,7 @@
{
_taskList.remove(task);
}
-
+
public ProtocolOutputConverter getProtocolOutputConverter()
{
return _protocolOutputConverter;
@@ -900,10 +942,15 @@
return _authorizedID;
}
+ public Principal getPrincipal()
+ {
+ return _authorizedID;
+ }
+
public SocketAddress getRemoteAddress()
{
return _networkDriver.getRemoteAddress();
- }
+ }
public SocketAddress getLocalAddress()
{
@@ -939,7 +986,7 @@
public void setNetworkDriver(NetworkDriver driver)
{
- _networkDriver = driver;
+ _networkDriver = driver;
}
public void writerIdle()
@@ -968,7 +1015,7 @@
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
-
+
writeFrame(closeBody.generateFrame(0));
_networkDriver.close();
@@ -984,7 +1031,7 @@
{
// Do nothing
}
-
+
public long getReadBytes()
{
return _readBytes;
@@ -1004,7 +1051,7 @@
{
return _sessionIdentifier;
}
-
+
public String getClientVersion()
{
return (_clientVersion == null) ? null : _clientVersion.toString();
@@ -1022,5 +1069,5 @@
}
}
}
-
+
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org