You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/12/28 14:02:48 UTC
svn commit: r1225178 [2/8] - in /qpid/trunk/qpid/java: ./ bdbstore/src/main/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/
bdbstore/src/test/ bdbstore/src/test/jav...
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Wed Dec 28 13:02:41 2011
@@ -34,6 +34,8 @@ import org.apache.qpid.server.virtualhos
import javax.management.JMException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -41,8 +43,52 @@ public class DirectExchange extends Abst
{
private static final Logger _logger = Logger.getLogger(DirectExchange.class);
- private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey =
- new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>();
+ private static final class BindingSet
+ {
+ private CopyOnWriteArraySet<Binding> _bindings = new CopyOnWriteArraySet<Binding>();
+ private List<BaseQueue> _queues = new ArrayList<BaseQueue>();
+
+ public synchronized void addBinding(Binding binding)
+ {
+ _bindings.add(binding);
+ recalculateQueues();
+ }
+
+
+ public synchronized void removeBinding(Binding binding)
+ {
+ _bindings.remove(binding);
+ recalculateQueues();
+ }
+
+ private void recalculateQueues()
+ {
+ List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size());
+
+ for(Binding b : _bindings)
+ {
+ if(!queues.contains(b.getQueue()))
+ {
+ queues.add(b.getQueue());
+ }
+ }
+ _queues = queues;
+ }
+
+
+ public List<BaseQueue> getQueues()
+ {
+ return _queues;
+ }
+
+ public CopyOnWriteArraySet<Binding> getBindings()
+ {
+ return _bindings;
+ }
+ }
+
+ private final ConcurrentHashMap<String, BindingSet> _bindingsByKey =
+ new ConcurrentHashMap<String, BindingSet>();
public static final ExchangeType<DirectExchange> TYPE = new ExchangeType<DirectExchange>()
{
@@ -91,33 +137,20 @@ public class DirectExchange extends Abst
}
- public ArrayList<? extends BaseQueue> doRoute(InboundMessage payload)
+ public List<? extends BaseQueue> doRoute(InboundMessage payload)
{
final String routingKey = payload.getRoutingKey();
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey);
+ BindingSet bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey);
if(bindings != null)
{
- final ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>(bindings.size());
-
- for(Binding binding : bindings)
- {
- queues.add(binding.getQueue());
- binding.incrementMatches();
- }
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Publishing message to queue " + queues);
- }
-
- return queues;
+ return bindings.getQueues();
}
else
{
- return new ArrayList<BaseQueue>(0);
+ return Collections.emptyList();
}
@@ -132,16 +165,10 @@ public class DirectExchange extends Abst
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
+ BindingSet bindings = _bindingsByKey.get(bindingKey);
if(bindings != null)
{
- for(Binding binding : bindings)
- {
- if(binding.getQueue().equals(queue))
- {
- return true;
- }
- }
+ return bindings.getQueues().contains(queue);
}
return false;
@@ -150,22 +177,20 @@ public class DirectExchange extends Abst
public boolean isBound(AMQShortString routingKey)
{
String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
- return bindings != null && !bindings.isEmpty();
+ BindingSet bindings = _bindingsByKey.get(bindingKey);
+ return bindings != null && !bindings.getQueues().isEmpty();
}
public boolean isBound(AMQQueue queue)
{
- for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values())
+ for (BindingSet bindings : _bindingsByKey.values())
{
- for(Binding binding : bindings)
+ if(bindings.getQueues().contains(queue))
{
- if(binding.getQueue().equals(queue))
- {
- return true;
- }
+ return true;
}
+
}
return false;
}
@@ -184,19 +209,19 @@ public class DirectExchange extends Abst
assert queue != null;
assert routingKey != null;
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
+ BindingSet bindings = _bindingsByKey.get(bindingKey);
if(bindings == null)
{
- bindings = new CopyOnWriteArraySet<Binding>();
- CopyOnWriteArraySet<Binding> newBindings;
+ bindings = new BindingSet();
+ BindingSet newBindings;
if((newBindings = _bindingsByKey.putIfAbsent(bindingKey, bindings)) != null)
{
bindings = newBindings;
}
}
- bindings.add(binding);
+ bindings.addBinding(binding);
}
@@ -204,10 +229,10 @@ public class DirectExchange extends Abst
{
assert binding != null;
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(binding.getBindingKey());
+ BindingSet bindings = _bindingsByKey.get(binding.getBindingKey());
if(bindings != null)
{
- bindings.remove(binding);
+ bindings.removeBinding(binding);
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Wed Dec 28 13:02:41 2011
@@ -36,6 +36,7 @@ import org.apache.qpid.server.configurat
import javax.management.JMException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
public interface Exchange extends ExchangeReferrer, ExchangeConfig
{
@@ -70,7 +71,7 @@ public interface Exchange extends Exchan
*
* @return list of queues to which to route the message.
*/
- ArrayList<? extends BaseQueue> route(InboundMessage message);
+ List<? extends BaseQueue> route(InboundMessage message);
/**
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Wed Dec 28 13:02:41 2011
@@ -37,8 +37,11 @@ import org.apache.qpid.server.filter.JMS
import org.apache.qpid.server.message.InboundMessage;
import javax.management.JMException;
+import java.sql.Array;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
import java.lang.ref.WeakReference;
public class TopicExchange extends AbstractExchange
@@ -77,8 +80,6 @@ public class TopicExchange extends Abstr
private static final Logger _logger = Logger.getLogger(TopicExchange.class);
-
-
private final TopicParser _parser = new TopicParser();
private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
@@ -175,7 +176,6 @@ public class TopicExchange extends Abstr
_bindings.put(binding, args);
}
-
}
private JMSSelectorFilter createSelectorFilter(final FieldTable args) throws AMQInvalidArgumentException
@@ -201,14 +201,23 @@ public class TopicExchange extends Abstr
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
- final AMQShortString routingKey = payload.getRoutingKey() == null
+ final AMQShortString routingKey = payload.getRoutingKeyShortString() == null
? AMQShortString.EMPTY_STRING
- : new AMQShortString(payload.getRoutingKey());
+ : payload.getRoutingKeyShortString();
+
+ final Collection<AMQQueue> matchedQueues = getMatchedQueues(payload, routingKey);
- // The copy here is unfortunate, but not too bad relevant to the amount of
- // things created and copied in getMatchedQueues
- ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>();
- queues.addAll(getMatchedQueues(payload, routingKey));
+ ArrayList<BaseQueue> queues;
+
+ if(matchedQueues.getClass() == ArrayList.class)
+ {
+ queues = (ArrayList) matchedQueues;
+ }
+ else
+ {
+ queues = new ArrayList<BaseQueue>();
+ queues.addAll(matchedQueues);
+ }
if(queues == null || queues.isEmpty())
{
@@ -325,25 +334,28 @@ public class TopicExchange extends Abstr
{
Collection<TopicMatcherResult> results = _parser.parse(routingKey);
- if(results.isEmpty())
- {
- return Collections.EMPTY_SET;
- }
- else
+ switch(results.size())
{
- Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>();
- for(TopicMatcherResult result : results)
- {
- TopicExchangeResult res = (TopicExchangeResult)result;
+ case 0:
+ return Collections.EMPTY_SET;
+ case 1:
+ TopicMatcherResult[] resultQueues = new TopicMatcherResult[1];
+ results.toArray(resultQueues);
+ return ((TopicExchangeResult)resultQueues[0]).processMessage(message, null);
+ default:
+ Collection<AMQQueue> queues = new HashSet<AMQQueue>();
+ for(TopicMatcherResult result : results)
+ {
+ TopicExchangeResult res = (TopicExchangeResult)result;
+
+ for(Binding b : res.getBindings())
+ {
+ b.incrementMatches();
+ }
- for(Binding b : res.getBindings())
- {
- b.incrementMatches();
+ queues = res.processMessage(message, queues);
}
-
- queues = res.processMessage(message, queues);
- }
- return queues;
+ return queues;
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java Wed Dec 28 13:02:41 2011
@@ -39,6 +39,7 @@ public final class TopicExchangeResult i
private final List<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>();
+ private volatile ArrayList<AMQQueue> _unfilteredQueueList = new ArrayList<AMQQueue>(0);
public void addUnfilteredQueue(AMQQueue queue)
{
@@ -46,6 +47,9 @@ public final class TopicExchangeResult i
if(instances == null)
{
_unfilteredQueues.put(queue, 1);
+ ArrayList<AMQQueue> newList = new ArrayList<AMQQueue>(_unfilteredQueueList);
+ newList.add(queue);
+ _unfilteredQueueList = newList;
}
else
{
@@ -59,6 +63,10 @@ public final class TopicExchangeResult i
if(instances == 1)
{
_unfilteredQueues.remove(queue);
+ ArrayList<AMQQueue> newList = new ArrayList<AMQQueue>(_unfilteredQueueList);
+ newList.remove(queue);
+ _unfilteredQueueList = newList;
+
}
else
{
@@ -166,7 +174,7 @@ public final class TopicExchangeResult i
{
if(_filteredQueues.isEmpty())
{
- return new ArrayList<AMQQueue>(_unfilteredQueues.keySet());
+ return _unfilteredQueueList;
}
else
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java Wed Dec 28 13:02:41 2011
@@ -77,7 +77,7 @@ public class TopicMatcherDFAState
}
if(nextState == null)
{
- return Collections.EMPTY_SET;
+ return Collections.EMPTY_LIST;
}
// Shortcut if we are at a looping terminal state
if((nextState == this) && (_nextStateMap.size() == 1) && _nextStateMap.containsKey(TopicWord.ANY_WORD))
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java Wed Dec 28 13:02:41 2011
@@ -44,27 +44,9 @@ import org.apache.qpid.server.transport.
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageReject;
-import org.apache.qpid.transport.MessageRejectCode;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.SessionListener;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import org.apache.qpid.transport.*;
+
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -365,7 +347,8 @@ public class Bridge implements BridgeCon
// TODO - deal with exchange not existing
DeliveryProperties delvProps = null;
- if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+ if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() &&
+ !delvProps.hasExpiration())
{
delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
}
@@ -377,7 +360,7 @@ public class Bridge implements BridgeCon
storeMessage.flushToStore();
MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)_session).getReference());
- ArrayList<? extends BaseQueue> queues = exchange.route(message);
+ List<? extends BaseQueue> queues = exchange.route(message);
@@ -391,7 +374,7 @@ public class Bridge implements BridgeCon
{
if(xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
- RangeSet rejects = new RangeSet();
+ RangeSet rejects = RangeSetFactory.createRangeSet();
rejects.add(xfr.getId());
MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
ssn.invoke(reject);
@@ -428,7 +411,7 @@ public class Bridge implements BridgeCon
}
- private void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
+ private void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues)
{
_transaction.enqueue(queues,message, new ServerTransaction.Action()
{
@@ -456,8 +439,7 @@ public class Bridge implements BridgeCon
{
// NO-OP
}
- });
-
+ }, 0L);
}
public void exception(final Session session, final SessionException exception)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java Wed Dec 28 13:02:41 2011
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.flow;
+import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Set;
import java.util.HashSet;
@@ -27,13 +28,16 @@ import java.util.HashSet;
public abstract class AbstractFlowCreditManager implements FlowCreditManager
{
protected final AtomicBoolean _suspended = new AtomicBoolean(false);
- private final Set<FlowCreditManagerListener> _listeners = new HashSet<FlowCreditManagerListener>();
+ private final ArrayList<FlowCreditManagerListener> _listeners = new ArrayList<FlowCreditManagerListener>();
public final void addStateListener(FlowCreditManagerListener listener)
{
synchronized(_listeners)
{
- _listeners.add(listener);
+ if(!_listeners.contains(listener))
+ {
+ _listeners.add(listener);
+ }
}
}
@@ -49,9 +53,10 @@ public abstract class AbstractFlowCredit
{
synchronized(_listeners)
{
- for(FlowCreditManagerListener listener : _listeners)
+ final int size = _listeners.size();
+ for(int i = 0; i<size; i++)
{
- listener.creditStateChanged(!suspended);
+ _listeners.get(i).creditStateChanged(!suspended);
}
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Wed Dec 28 13:02:41 2011
@@ -52,7 +52,6 @@ public class BasicConsumeMethodHandler i
AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
AMQChannel channel = protocolConnection.getChannel(channelId);
-
VirtualHost vHost = protocolConnection.getVirtualHost();
if (channel == null)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Wed Dec 28 13:02:41 2011
@@ -65,7 +65,6 @@ public class ChannelCloseHandler impleme
{
throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel");
}
-
session.closeChannel(channelId);
// Client requested closure so we don't wait for ok we send it
stateManager.getProtocolSession().closeChannelOk(channelId);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java Wed Dec 28 13:02:41 2011
@@ -55,7 +55,6 @@ public class ChannelFlowHandler implemen
{
throw body.getChannelNotFoundException(channelId);
}
-
channel.setSuspended(!body.getActive());
_logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Wed Dec 28 13:02:41 2011
@@ -49,7 +49,6 @@ public class ConnectionCloseMethodHandle
public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
-
if (_logger.isInfoEnabled())
{
_logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Wed Dec 28 13:02:41 2011
@@ -65,7 +65,6 @@ public class ExchangeBoundHandler implem
public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
-
VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
MethodRegistry methodRegistry = session.getMethodRegistry();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Wed Dec 28 13:02:41 2011
@@ -85,6 +85,13 @@ public class QueueDeclareHandler impleme
//TODO: do we need to check that the queue already exists with exactly the same "configuration"?
+ AMQChannel channel = protocolConnection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
synchronized (queueRegistry)
{
queue = queueRegistry.getQueue(queueName);
@@ -183,12 +190,6 @@ public class QueueDeclareHandler impleme
}
- AMQChannel channel = protocolConnection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
//set this as the default queue on the channel:
channel.setDefaultQueue(queue);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Wed Dec 28 13:02:41 2011
@@ -64,15 +64,17 @@ public class QueueDeleteHandler implemen
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+
+ AMQChannel channel = protocolConnection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
AMQQueue queue;
if (body.getQueue() == null)
{
- AMQChannel channel = protocolConnection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
//get the default queue on the channel:
queue = channel.getDefaultQueue();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Wed Dec 28 13:02:41 2011
@@ -63,17 +63,14 @@ public class QueuePurgeHandler implement
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
AMQChannel channel = protocolConnection.getChannel(channelId);
-
-
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
AMQQueue queue;
if(body.getQueue() == null)
{
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
-
//get the default queue on the channel:
queue = channel.getDefaultQueue();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java Wed Dec 28 13:02:41 2011
@@ -66,14 +66,15 @@ public class QueueUnbindHandler implemen
final AMQQueue queue;
final AMQShortString routingKey;
- if (body.getQueue() == null)
+
+ AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
{
- AMQChannel channel = session.getChannel(channelId);
+ throw body.getChannelNotFoundException(channelId);
+ }
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
+ if (body.getQueue() == null)
+ {
queue = channel.getDefaultQueue();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java Wed Dec 28 13:02:41 2011
@@ -37,7 +37,7 @@ import java.nio.ByteBuffer;
/**
* A deliverable message.
*/
-public class AMQMessage extends AbstractServerMessageImpl
+public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -62,10 +62,6 @@ public class AMQMessage extends Abstract
private Object _sessionIdentifier;
private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
- private final StoredMessage<MessageMetaData> _handle;
-
- WeakReference<AMQChannel> _channelRef;
-
public AMQMessage(StoredMessage<MessageMetaData> handle)
{
this(handle, null);
@@ -75,7 +71,7 @@ public class AMQMessage extends Abstract
{
super(handle);
- _handle = handle;
+
final MessageMetaData metaData = handle.getMetaData();
_size = metaData.getContentSize();
final MessagePublishInfo messagePublishInfo = metaData.getMessagePublishInfo();
@@ -84,8 +80,6 @@ public class AMQMessage extends Abstract
{
_flags |= IMMEDIATE;
}
-
- _channelRef = channelRef;
}
public void setExpiration(final long expiration)
@@ -97,7 +91,7 @@ public class AMQMessage extends Abstract
public MessageMetaData getMessageMetaData()
{
- return _handle.getMetaData();
+ return getStoredMessage().getMetaData();
}
public ContentHeaderBody getContentHeaderBody() throws AMQException
@@ -107,7 +101,7 @@ public class AMQMessage extends Abstract
public Long getMessageId()
{
- return _handle.getMessageNumber();
+ return getStoredMessage().getMessageNumber();
}
/**
@@ -219,9 +213,9 @@ public class AMQMessage extends Abstract
return new AMQMessageReference(this);
}
- public Long getMessageNumber()
+ public long getMessageNumber()
{
- return getMessageId();
+ return getStoredMessage().getMessageNumber();
}
@@ -248,16 +242,13 @@ public class AMQMessage extends Abstract
public int getContent(ByteBuffer buf, int offset)
{
- return _handle.getContent(offset, buf);
+ return getStoredMessage().getContent(offset, buf);
}
- public StoredMessage<MessageMetaData> getStoredMessage()
+
+ public ByteBuffer getContent(int offset, int size)
{
- return _handle;
+ return getStoredMessage().getContent(offset, size);
}
- public SessionConfig getSessionConfig()
- {
- return _channelRef == null ? null : ((SessionConfig) _channelRef.get());
- }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Wed Dec 28 13:02:41 2011
@@ -21,19 +21,30 @@
package org.apache.qpid.server.message;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
-public abstract class AbstractServerMessageImpl implements ServerMessage
+public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaData> implements ServerMessage<T>
{
- private final AtomicInteger _referenceCount = new AtomicInteger(0);
- private final StoredMessage<?> _handle;
- public AbstractServerMessageImpl(StoredMessage<?> handle)
+ private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
+
+ private volatile int _referenceCount = 0;
+ private final StoredMessage<T> _handle;
+
+ public AbstractServerMessageImpl(StoredMessage<T> handle)
{
_handle = handle;
}
+ public StoredMessage<T> getStoredMessage()
+ {
+ return _handle;
+ }
+
public boolean incrementReference()
{
return incrementReference(1);
@@ -41,9 +52,9 @@ public abstract class AbstractServerMess
public boolean incrementReference(int count)
{
- if(_referenceCount.addAndGet(count) <= 0)
+ if(_refCountUpdater.addAndGet(this, count) <= 0)
{
- _referenceCount.addAndGet(-count);
+ _refCountUpdater.addAndGet(this, -count);
return false;
}
else
@@ -62,7 +73,7 @@ public abstract class AbstractServerMess
*/
public void decrementReference()
{
- int count = _referenceCount.decrementAndGet();
+ int count = _refCountUpdater.decrementAndGet(this);
// note that the operation of decrementing the reference count and then removing the message does not
// have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
@@ -73,7 +84,7 @@ public abstract class AbstractServerMess
// set the reference count way below 0 so that we can detect that the message has been deleted
// this is to guard against the message being spontaneously recreated (from the mgmt console)
// by copying from other queues at the same time as it is being removed.
- _referenceCount.set(Integer.MIN_VALUE/2);
+ _refCountUpdater.set(this,Integer.MIN_VALUE/2);
// must check if the handle is null since there may be cases where we decide to throw away a message
// and the handle has not yet been constructed
@@ -99,6 +110,6 @@ public abstract class AbstractServerMess
protected int getReferenceCount()
{
- return _referenceCount.get();
+ return _referenceCount;
}
}
\ No newline at end of file
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java Wed Dec 28 13:02:41 2011
@@ -20,8 +20,11 @@
*/
package org.apache.qpid.server.message;
+import org.apache.qpid.server.store.StoredMessage;
+
public interface EnqueableMessage
{
- Long getMessageNumber();
+ long getMessageNumber();
boolean isPersistent();
+ StoredMessage getStoredMessage();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java Wed Dec 28 13:02:41 2011
@@ -22,10 +22,12 @@ package org.apache.qpid.server.message;
import org.apache.qpid.server.queue.Filterable;
+import org.apache.qpid.framing.AMQShortString;
public interface InboundMessage extends Filterable
{
String getRoutingKey();
+ AMQShortString getRoutingKeyShortString();
AMQMessageHeader getMessageHeader();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java Wed Dec 28 13:02:41 2011
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
public interface MessageContentSource
{
public int getContent(ByteBuffer buf, int offset);
+ public ByteBuffer getContent(int offset, int size);
long getSize();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java Wed Dec 28 13:02:41 2011
@@ -30,9 +30,12 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBEncoder;
import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.framing.AMQShortString;
import java.nio.ByteBuffer;
import java.lang.ref.SoftReference;
+import java.util.ArrayList;
+import java.util.List;
public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage
{
@@ -42,7 +45,6 @@ public class MessageMetaData_0_10 implem
private MessageTransferHeader _messageHeader;
private long _arrivalTime;
private int _bodySize;
- private volatile SoftReference<ByteBuffer> _body;
private static final int ENCODER_SIZE = 1 << 10;
@@ -53,21 +55,16 @@ public class MessageMetaData_0_10 implem
public MessageMetaData_0_10(MessageTransfer xfr)
{
- this(xfr.getHeader(), xfr.getBodySize(), xfr.getBody(), System.currentTimeMillis());
+ this(xfr.getHeader(), xfr.getBodySize(), System.currentTimeMillis());
}
private MessageMetaData_0_10(Header header, int bodySize, long arrivalTime)
{
- this(header, bodySize, null, arrivalTime);
- }
-
- private MessageMetaData_0_10(Header header, int bodySize, ByteBuffer xfrBody, long arrivalTime)
- {
_header = header;
if(_header != null)
{
- _deliveryProps = _header.get(DeliveryProperties.class);
- _messageProps = _header.get(MessageProperties.class);
+ _deliveryProps = _header.getDeliveryProperties();
+ _messageProps = _header.getMessageProperties();
}
else
{
@@ -78,21 +75,6 @@ public class MessageMetaData_0_10 implem
_arrivalTime = arrivalTime;
_bodySize = bodySize;
-
-
- if(xfrBody == null)
- {
- _body = null;
- }
- else
- {
- ByteBuffer body = ByteBuffer.allocate(_bodySize);
- body.put(xfrBody);
- body.flip();
- _body = new SoftReference<ByteBuffer>(body);
- }
-
-
}
@@ -122,16 +104,39 @@ public class MessageMetaData_0_10 implem
encoder.writeInt64(_arrivalTime);
encoder.writeInt32(_bodySize);
- Struct[] headers = _header == null ? new Struct[0] : _header.getStructs();
- encoder.writeInt32(headers.length);
+ int headersLength = 0;
+ if(_header.getDeliveryProperties() != null)
+ {
+ headersLength++;
+ }
+ if(_header.getMessageProperties() != null)
+ {
+ headersLength++;
+ }
+ if(_header.getNonStandardProperties() != null)
+ {
+ headersLength += _header.getNonStandardProperties().size();
+ }
+ encoder.writeInt32(headersLength);
- for(Struct header : headers)
+ if(_header.getDeliveryProperties() != null)
{
- encoder.writeStruct32(header);
-
+ encoder.writeStruct32(_header.getDeliveryProperties());
+ }
+ if(_header.getMessageProperties() != null)
+ {
+ encoder.writeStruct32(_header.getMessageProperties());
}
+ if(_header.getNonStandardProperties() != null)
+ {
+ for(Struct header : _header.getNonStandardProperties())
+ {
+ encoder.writeStruct32(header);
+ }
+
+ }
ByteBuffer buf = encoder.buffer();
return buf;
}
@@ -173,6 +178,11 @@ public class MessageMetaData_0_10 implem
return _deliveryProps == null ? null : _deliveryProps.getRoutingKey();
}
+ public AMQShortString getRoutingKeyShortString()
+ {
+ return AMQShortString.valueOf(getRoutingKey());
+ }
+
public AMQMessageHeader getMessageHeader()
{
return _messageHeader;
@@ -210,17 +220,6 @@ public class MessageMetaData_0_10 implem
return _header;
}
- public ByteBuffer getBody()
- {
- ByteBuffer body = _body == null ? null : _body.get();
- return body;
- }
-
- public void setBody(ByteBuffer body)
- {
- _body = new SoftReference<ByteBuffer>(body);
- }
-
private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10>
{
public MessageMetaData_0_10 createMetaData(ByteBuffer buf)
@@ -232,14 +231,32 @@ public class MessageMetaData_0_10 implem
int bodySize = decoder.readInt32();
int headerCount = decoder.readInt32();
- Struct[] headers = new Struct[headerCount];
+ DeliveryProperties deliveryProperties = null;
+ MessageProperties messageProperties = null;
+ List<Struct> otherProps = null;
for(int i = 0 ; i < headerCount; i++)
{
- headers[i] = decoder.readStruct32();
+ Struct struct = decoder.readStruct32();
+ if(struct instanceof DeliveryProperties && deliveryProperties == null)
+ {
+ deliveryProperties = (DeliveryProperties) struct;
+ }
+ else if(struct instanceof MessageProperties && messageProperties == null)
+ {
+ messageProperties = (MessageProperties) struct;
+ }
+ else
+ {
+ if(otherProps == null)
+ {
+ otherProps = new ArrayList<Struct>();
+
+ }
+ otherProps.add(struct);
+ }
}
-
- Header header = new Header(headers);
+ Header header = new Header(deliveryProperties,messageProperties,otherProps);
return new MessageMetaData_0_10(header, bodySize, arrivalTime);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java Wed Dec 28 13:02:41 2011
@@ -24,32 +24,35 @@ import org.apache.qpid.transport.*;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.framing.AMQShortString;
import java.nio.ByteBuffer;
-import java.lang.ref.WeakReference;
-public class MessageTransferMessage extends AbstractServerMessageImpl implements InboundMessage
+public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMetaData_0_10> implements InboundMessage
{
- private StoredMessage<MessageMetaData_0_10> _storeMessage;
- private WeakReference<Session> _sessionRef;
- public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef)
+ private Object _connectionRef;
+
+ public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef)
{
super(storeMessage);
- _storeMessage = storeMessage;
- _sessionRef = sessionRef;
+ _connectionRef = connectionRef;
}
private MessageMetaData_0_10 getMetaData()
{
- return _storeMessage.getMetaData();
+ return getStoredMessage().getMetaData();
}
public String getRoutingKey()
{
return getMetaData().getRoutingKey();
+ }
+ public AMQShortString getRoutingKeyShortString()
+ {
+ return AMQShortString.valueOf(getRoutingKey());
}
public AMQMessageHeader getMessageHeader()
@@ -91,9 +94,9 @@ public class MessageTransferMessage exte
return new TransferMessageReference(this);
}
- public Long getMessageNumber()
+ public long getMessageNumber()
{
- return _storeMessage.getMessageNumber();
+ return getStoredMessage().getMessageNumber();
}
public long getArrivalTime()
@@ -103,7 +106,13 @@ public class MessageTransferMessage exte
public int getContent(ByteBuffer buf, int offset)
{
- return _storeMessage.getContent(offset, buf);
+ return getStoredMessage().getContent(offset, buf);
+ }
+
+
+ public ByteBuffer getContent(int offset, int size)
+ {
+ return getStoredMessage().getContent(offset,size);
}
public Header getHeader()
@@ -113,32 +122,13 @@ public class MessageTransferMessage exte
public ByteBuffer getBody()
{
- ByteBuffer body = getMetaData().getBody();
- if(body == null && getSize() != 0l)
- {
- final int size = (int) getSize();
- int pos = 0;
- body = ByteBuffer.allocate(size);
-
- while(pos < size)
- {
- pos += getContent(body, pos);
- }
-
- body.flip();
- getMetaData().setBody(body.duplicate());
- }
- return body;
+ return getContent(0, (int)getSize());
}
- public Session getSession()
+ public Object getConnectionReference()
{
- return _sessionRef == null ? null : _sessionRef.get();
+ return _connectionRef;
}
- public SessionConfig getSessionConfig()
- {
- return _sessionRef == null ? null : (ServerSession) _sessionRef.get();
- }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java Wed Dec 28 13:02:41 2011
@@ -23,13 +23,17 @@ package org.apache.qpid.server.message;
import java.nio.ByteBuffer;
import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
-public interface ServerMessage extends EnqueableMessage, MessageContentSource
+public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueableMessage, MessageContentSource
{
String getRoutingKey();
AMQMessageHeader getMessageHeader();
+ public StoredMessage<T> getStoredMessage();
+
boolean isPersistent();
long getSize();
@@ -40,11 +44,12 @@ public interface ServerMessage extends E
MessageReference newReference();
- Long getMessageNumber();
+ long getMessageNumber();
long getArrivalTime();
public int getContent(ByteBuffer buf, int offset);
- SessionConfig getSessionConfig();
+ public ByteBuffer getContent(int offset, int size);
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java Wed Dec 28 13:02:41 2011
@@ -40,8 +40,8 @@ public class HeaderPropertiesConverter
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
Header header = messageTransferMessage.getHeader();
- DeliveryProperties deliveryProps = header.get(DeliveryProperties.class);
- MessageProperties messageProps = header.get(MessageProperties.class);
+ DeliveryProperties deliveryProps = header.getDeliveryProperties();
+ MessageProperties messageProps = header.getMessageProperties();
if(deliveryProps != null)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Wed Dec 28 13:02:41 2011
@@ -1,420 +1,420 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/*
- * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
- * Supported AMQP versions:
- * 8-0
- */
-package org.apache.qpid.server.output.amqp0_8;
-
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.HeaderPropertiesConverter;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.DeliveryProperties;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
-
- private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
-
- public static Factory getInstanceFactory()
- {
- return new Factory()
- {
-
- public ProtocolOutputConverter newInstance(AMQProtocolSession session)
- {
- return new ProtocolOutputConverterImpl(session);
- }
- };
- }
-
-
- private final AMQProtocolSession _protocolSession;
-
- private ProtocolOutputConverterImpl(AMQProtocolSession session)
- {
- _protocolSession = session;
- }
-
-
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
- public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
- writeMessageDelivery(entry, channelId, deliverBody);
- }
-
-
- private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
- throws AMQException
- {
- if(entry.getMessage() instanceof AMQMessage)
- {
- return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
- }
- else
- {
- final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
- ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
- chb.bodySize = message.getSize();
- return chb;
- }
- }
-
-
- private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
- throws AMQException
- {
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
- }
-
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
- throws AMQException
- {
-
-
- int bodySize = (int) message.getSize();
-
- if(bodySize == 0)
- {
- SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
- contentHeaderBody);
-
- writeFrame(compositeBlock);
- }
- else
- {
- int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
-
-
- int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
- int writtenSize = capacity;
-
- AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
-
- CompositeAMQBodyBlock
- compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
- writeFrame(compositeBlock);
-
- while(writtenSize < bodySize)
- {
- capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
- MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
- writtenSize += capacity;
-
- writeFrame(new AMQFrame(channelId, body));
- }
- }
- }
-
- private class MessageContentSourceBody implements AMQBody
- {
- public static final byte TYPE = 3;
- private int _length;
- private MessageContentSource _message;
- private int _offset;
-
- public MessageContentSourceBody(MessageContentSource message, int offset, int length)
- {
- _message = message;
- _offset = offset;
- _length = length;
- }
-
- public byte getFrameType()
- {
- return TYPE;
- }
-
- public int getSize()
- {
- return _length;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- byte[] data = new byte[_length];
-
- _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
-
- buffer.write(data);
- }
-
- public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
- {
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
- return contentHeader;
- }
-
-
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
- writeMessageDelivery(entry, channelId, deliver);
- }
-
-
- private AMQBody createEncodedDeliverBody(QueueEntry entry,
- final long deliveryTag,
- final AMQShortString consumerTag)
- throws AMQException
- {
-
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- final AMQBody returnBlock = new AMQBody()
- {
-
- public AMQBody _underlyingBody;
-
- public AMQBody createAMQBody()
- {
- return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
-
-
-
-
-
- }
-
- public byte getFrameType()
- {
- return AMQMethodBody.TYPE;
- }
-
- public int getSize()
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- return _underlyingBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- _underlyingBody.writePayload(buffer);
- }
-
- public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
- throws AMQException
- {
- throw new AMQException("This block should never be dispatched!");
- }
- };
- return returnBlock;
- }
-
- private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
- throws AMQException
- {
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- BasicGetOkBody getOkBody =
- METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey,
- queueSize);
-
- return getOkBody;
- }
-
- public byte getProtocolMinorVersion()
- {
- return getProtocolSession().getProtocolMinorVersion();
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolSession().getProtocolMajorVersion();
- }
-
- private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
- int replyCode,
- AMQShortString replyText) throws AMQException
- {
-
- BasicReturnBody basicReturnBody =
- METHOD_REGISTRY.createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
-
-
- return basicReturnBody;
- }
-
- public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
-
- AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
-
- writeMessageDelivery(message, header, channelId, returnFrame);
- }
-
-
- public void writeFrame(AMQDataBlock block)
- {
- getProtocolSession().writeFrame(block);
- }
-
-
- public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
- {
-
- BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
- writeFrame(basicCancelOkBody.generateFrame(channelId));
-
- }
-
-
- public static final class CompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final AMQBody _contentBody;
- private final int _channel;
-
-
- public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
- _contentBody = contentBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
- }
- }
-
- public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final int _channel;
-
-
- public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
- }
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ * 8-0
+ */
+package org.apache.qpid.server.output.amqp0_8;
+
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+
+ private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+
+ public static Factory getInstanceFactory()
+ {
+ return new Factory()
+ {
+
+ public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+ {
+ return new ProtocolOutputConverterImpl(session);
+ }
+ };
+ }
+
+
+ private final AMQProtocolSession _protocolSession;
+
+ private ProtocolOutputConverterImpl(AMQProtocolSession session)
+ {
+ _protocolSession = session;
+ }
+
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
+ }
+
+
+ private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+ throws AMQException
+ {
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+ }
+ else
+ {
+ final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
+ return chb;
+ }
+ }
+
+
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+ }
+
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+
+
+ int bodySize = (int) message.getSize();
+
+ if(bodySize == 0)
+ {
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
+
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+ int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+ int writtenSize = capacity;
+
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
+
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+ writeFrame(compositeBlock);
+
+ while(writtenSize < bodySize)
+ {
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
+
+ writeFrame(new AMQFrame(channelId, body));
+ }
+ }
+ }
+
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
+ }
+
+
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
+
+
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
+ throws AMQException
+ {
+
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ final AMQBody returnBlock = new AMQBody()
+ {
+
+ public AMQBody _underlyingBody;
+
+ public AMQBody createAMQBody()
+ {
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
+
+
+
+
+
+ }
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
+ };
+ return returnBlock;
+ }
+
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ BasicGetOkBody getOkBody =
+ METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey,
+ queueSize);
+
+ return getOkBody;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return getProtocolSession().getProtocolMinorVersion();
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return getProtocolSession().getProtocolMajorVersion();
+ }
+
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
+ {
+
+ BasicReturnBody basicReturnBody =
+ METHOD_REGISTRY.createBasicReturnBody(replyCode,
+ replyText,
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
+
+
+ return basicReturnBody;
+ }
+
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
+ {
+
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+
+ writeMessageDelivery(message, header, channelId, returnFrame);
+ }
+
+
+ public void writeFrame(AMQDataBlock block)
+ {
+ getProtocolSession().writeFrame(block);
+ }
+
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+
+ BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
+ writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+ }
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
+}
Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org