You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/08 18:02:37 UTC
svn commit: r494121 [2/5] - in /incubator/qpid/trunk/qpid:
gentools/src/org/apache/qpid/gentools/
java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/ack/
java/broker/src/main/java/org/apache/qpid/server/c...
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Mon Jan 8 09:02:26 2007
@@ -562,7 +562,7 @@
}
}
- public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, String consumerTag)
+ public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
ByteBuffer deliver = createEncodedDeliverFrame(channelId, deliveryTag, consumerTag);
@@ -598,7 +598,7 @@
}
- private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, String consumerTag)
+ private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
BasicPublishBody pb = getPublishBody();
@@ -611,7 +611,7 @@
return buf;
}
- private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, String replyText) throws AMQException
+ private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, (byte) 8, (byte) 0, getPublishBody().exchange,
replyCode, replyText,
@@ -622,7 +622,7 @@
return buf;
}
- public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, String replyText)
+ public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
ByteBuffer returnFrame = createEncodedReturnFrame(channelId, replyCode, replyText);
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Jan 8 09:02:26 2007
@@ -23,6 +23,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -42,12 +43,12 @@
{
private static final Logger _logger = Logger.getLogger(AMQQueue.class);
- private final String _name;
+ private final AMQShortString _name;
/**
* null means shared
*/
- private final String _owner;
+ private final AMQShortString _owner;
private final boolean _durable;
@@ -111,7 +112,7 @@
return _name.compareTo(((AMQQueue) o).getName());
}
- public AMQQueue(String name, boolean durable, String owner,
+ public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
boolean autoDelete, QueueRegistry queueRegistry)
throws AMQException
{
@@ -119,7 +120,7 @@
AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionImpl.Factory());
}
- public AMQQueue(String name, boolean durable, String owner,
+ public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
boolean autoDelete, QueueRegistry queueRegistry, SubscriptionFactory subscriptionFactory)
throws AMQException
{
@@ -127,7 +128,7 @@
AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscriptionFactory);
}
- public AMQQueue(String name, boolean durable, String owner,
+ public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery,
SubscriptionFactory subscriptionFactory)
throws AMQException
@@ -136,7 +137,7 @@
this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(), subscriptionFactory);
}
- public AMQQueue(String name, boolean durable, String owner,
+ public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
throws AMQException
{
@@ -145,7 +146,7 @@
new SubscriptionImpl.Factory());
}
- protected AMQQueue(String name, boolean durable, String owner,
+ protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
boolean autoDelete, QueueRegistry queueRegistry,
SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory)
throws AMQException
@@ -154,7 +155,7 @@
AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, subscriptionFactory);
}
- protected AMQQueue(String name, boolean durable, String owner,
+ protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
boolean autoDelete, QueueRegistry queueRegistry,
SubscriptionSet subscribers)
throws AMQException
@@ -163,7 +164,7 @@
AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, new SubscriptionImpl.Factory());
}
- protected AMQQueue(String name, boolean durable, String owner,
+ protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
boolean autoDelete, QueueRegistry queueRegistry,
Executor asyncDelivery, SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory)
throws AMQException
@@ -225,7 +226,7 @@
}
}
- public String getName()
+ public AMQShortString getName()
{
return _name;
}
@@ -240,7 +241,7 @@
return _durable;
}
- public String getOwner()
+ public AMQShortString getOwner()
{
return _owner;
}
@@ -356,17 +357,17 @@
_deliveryMgr.clearAllMessages(storeContext);
}
- public void bind(String routingKey, Exchange exchange)
+ public void bind(AMQShortString routingKey, Exchange exchange)
{
_bindings.addBinding(routingKey, exchange);
}
- public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, FieldTable filters) throws AMQException
{
registerProtocolSession(ps, channel, consumerTag, acks, filters, false);
}
- public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal)
throws AMQException
{
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
@@ -384,7 +385,7 @@
_subscribers.addSubscriber(subscription);
}
- public void unregisterProtocolSession(AMQProtocolSession ps, int channel, String consumerTag) throws AMQException
+ public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException
{
debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
this);
@@ -475,7 +476,7 @@
}
catch (AMQException e)
{
- throw new FailedDequeueException(_name, e);
+ throw new FailedDequeueException(_name.toString(), e);
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Mon Jan 8 09:02:26 2007
@@ -112,7 +112,7 @@
public String getOwner()
{
- return _queue.getOwner();
+ return String.valueOf(_queue.getOwner());
}
public boolean isAutoDelete()
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java Mon Jan 8 09:02:26 2007
@@ -22,6 +22,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.store.StoreContext;
@@ -294,7 +295,7 @@
}
}
- public void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException
+ public void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg) throws FailedDequeueException, AMQException
{
// first check whether we are queueing, and enqueue if we are
if (!enqueue(msg))
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Mon Jan 8 09:02:26 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.store.StoreContext;
@@ -280,7 +281,7 @@
return _messages.poll();
}
- public void deliver(StoreContext context, String name, AMQMessage msg) throws AMQException
+ public void deliver(StoreContext context, AMQShortString name, AMQMessage msg) throws AMQException
{
if (_log.isDebugEnabled())
{
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Mon Jan 8 09:02:26 2007
@@ -21,13 +21,14 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
public class DefaultQueueRegistry implements QueueRegistry
{
- private ConcurrentMap<String, AMQQueue> _queueMap = new ConcurrentHashMap<String, AMQQueue>();
+ private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
public DefaultQueueRegistry()
{
@@ -38,12 +39,12 @@
_queueMap.put(queue.getName(), queue);
}
- public void unregisterQueue(String name) throws AMQException
+ public void unregisterQueue(AMQShortString name) throws AMQException
{
_queueMap.remove(name);
}
- public AMQQueue getQueue(String name)
+ public AMQQueue getQueue(AMQShortString name)
{
return _queueMap.get(name);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Mon Jan 8 09:02:26 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.store.StoreContext;
import java.util.concurrent.Executor;
@@ -67,7 +68,7 @@
* @param msg the message to deliver
* @throws org.apache.qpid.server.queue.FailedDequeueException if the message could not be dequeued
*/
- void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException;
+ void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg) throws FailedDequeueException, AMQException;
void removeAMessageFromTop(StoreContext storeContext) throws AMQException;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java Mon Jan 8 09:02:26 2007
@@ -22,6 +22,7 @@
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import java.util.List;
import java.util.HashSet;
@@ -37,9 +38,9 @@
static class ExchangeBinding
{
private final Exchange exchange;
- private final String routingKey;
+ private final AMQShortString routingKey;
- ExchangeBinding(String routingKey, Exchange exchange)
+ ExchangeBinding(AMQShortString routingKey, Exchange exchange)
{
this.routingKey = routingKey;
this.exchange = exchange;
@@ -55,7 +56,7 @@
return exchange;
}
- public String getRoutingKey()
+ public AMQShortString getRoutingKey()
{
return routingKey;
}
@@ -87,7 +88,7 @@
* are being tracked by the instance has been bound to the exchange
* @param exchange the exchange bound to
*/
- void addBinding(String routingKey, Exchange exchange)
+ void addBinding(AMQShortString routingKey, Exchange exchange)
{
_bindings.add(new ExchangeBinding(routingKey, exchange));
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Mon Jan 8 09:02:26 2007
@@ -21,13 +21,14 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
public interface QueueRegistry
{
void registerQueue(AMQQueue queue) throws AMQException;
- void unregisterQueue(String name) throws AMQException;
+ void unregisterQueue(AMQShortString name) throws AMQException;
- AMQQueue getQueue(String name);
+ AMQQueue getQueue(AMQShortString name);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java Mon Jan 8 09:02:26 2007
@@ -23,6 +23,7 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
/**
* Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
@@ -33,10 +34,10 @@
*/
public interface SubscriptionFactory
{
- Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks,
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, boolean acks,
FieldTable filters, boolean noLocal) throws AMQException;
- Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag)
throws AMQException;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Mon Jan 8 09:02:26 2007
@@ -25,10 +25,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -53,7 +50,7 @@
public final AMQProtocolSession protocolSession;
- public final String consumerTag;
+ public final AMQShortString consumerTag;
private final Object sessionKey;
@@ -72,12 +69,12 @@
public static class Factory implements SubscriptionFactory
{
- public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException
+ public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException
{
return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal);
}
- public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
+ public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag)
throws AMQException
{
return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false);
@@ -85,14 +82,14 @@
}
public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
- String consumerTag, boolean acks)
+ AMQShortString consumerTag, boolean acks)
throws AMQException
{
this(channelId, protocolSession, consumerTag, acks, null, false);
}
public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
- String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+ AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal)
throws AMQException
{
AMQChannel channel = protocolSession.getChannel(channelId);
@@ -162,7 +159,7 @@
public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
- String consumerTag)
+ AMQShortString consumerTag)
throws AMQException
{
this(channel, protocolSession, consumerTag, false);
@@ -304,8 +301,8 @@
if (_noLocal)
{
// We don't want local messages so check to see if message is one we sent
- if (protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
- msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())))
+ if (protocolSession.getClientProperties().getObject(ClientProperties.instance.toString()).equals(
+ msg.getPublisher().getClientProperties().getObject(ClientProperties.instance.toString())))
{
if (_logger.isTraceEnabled())
{
@@ -395,7 +392,7 @@
}
- private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
+ private ByteBuffer createEncodedDeliverFrame(long deliveryTag, AMQShortString routingKey, AMQShortString exchange)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java Mon Jan 8 09:02:26 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.store.StoreContext;
import org.apache.log4j.Logger;
@@ -234,7 +235,7 @@
* @throws NoConsumersException if there are no active subscribers to deliver
* the message to
*/
- public void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException
+ public void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg) throws FailedDequeueException, AMQException
{
// first check whether we are queueing, and enqueue if we are
if (!enqueue(msg))
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Mon Jan 8 09:02:26 2007
@@ -180,11 +180,11 @@
public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
{
- _messageStore.enqueueMessage(storeContext, queue.getName(), messageId);
+ _messageStore.enqueueMessage(storeContext, queue.getName().toString(), messageId);
}
public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
{
- _messageStore.dequeueMessage(storeContext, queue.getName(), messageId);
+ _messageStore.dequeueMessage(storeContext, queue.getName().toString(), messageId);
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java Mon Jan 8 09:02:26 2007
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.security.auth;
+import org.apache.qpid.framing.AMQShortString;
+
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java Mon Jan 8 09:02:26 2007
@@ -22,6 +22,7 @@
import org.apache.qpid.server.security.auth.AuthenticationManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.framing.AMQShortString;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java Mon Jan 8 09:02:26 2007
@@ -24,6 +24,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.configuration.PropertyUtils;
+import org.apache.qpid.framing.AMQShortString;
import javax.security.auth.callback.CallbackHandler;
import javax.security.sasl.Sasl;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java Mon Jan 8 09:02:26 2007
@@ -56,13 +56,13 @@
try
{
final FieldTable ft = FieldTableFactory.newFieldTable(ByteBuffer.wrap(response), response.length);
- String username = (String) ft.get("LOGIN");
+ String username = (String) ft.getString("LOGIN");
// we do not care about the prompt but it throws if null
NameCallback nameCb = new NameCallback("prompt", username);
// we do not care about the prompt but it throws if null
PasswordCallback passwordCb = new PasswordCallback("prompt", false);
// TODO: should not get pwd as a String but as a char array...
- String pwd = (String) ft.get("PASSWORD");
+ String pwd = (String) ft.getString("PASSWORD");
passwordCb.setPassword(pwd.toCharArray());
AuthorizeCallback authzCb = new AuthorizeCallback(username, username);
Callback[] callbacks = new Callback[]{nameCb, passwordCb, authzCb};
Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Mon Jan 8 09:02:26 2007
@@ -37,157 +37,157 @@
{
bindHeaders.setString("A", "Value of A");
- matchHeaders.put("A", "Value of A");
+ matchHeaders.setString("A", "Value of A");
assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
}
public void testDefault_2()
{
- bindHeaders.put("A", "Value of A");
+ bindHeaders.setString("A", "Value of A");
- matchHeaders.put("A", "Value of A");
- matchHeaders.put("B", "Value of B");
+ matchHeaders.setString("A", "Value of A");
+ matchHeaders.setString("B", "Value of B");
assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
}
public void testDefault_3()
{
- bindHeaders.put("A", "Value of A");
+ bindHeaders.setString("A", "Value of A");
- matchHeaders.put("A", "Altered value of A");
+ matchHeaders.setString("A", "Altered value of A");
assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
}
public void testAll_1()
{
- bindHeaders.put("X-match", "all");
- bindHeaders.put("A", "Value of A");
+ bindHeaders.setString("X-match", "all");
+ bindHeaders.setString("A", "Value of A");
- matchHeaders.put("A", "Value of A");
+ matchHeaders.setString("A", "Value of A");
assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
}
public void testAll_2()
{
- bindHeaders.put("X-match", "all");
- bindHeaders.put("A", "Value of A");
- bindHeaders.put("B", "Value of B");
+ bindHeaders.setString("X-match", "all");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
- matchHeaders.put("A", "Value of A");
+ matchHeaders.setString("A", "Value of A");
assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
}
public void testAll_3()
{
- bindHeaders.put("X-match", "all");
- bindHeaders.put("A", "Value of A");
- bindHeaders.put("B", "Value of B");
+ bindHeaders.setString("X-match", "all");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
- matchHeaders.put("A", "Value of A");
- matchHeaders.put("B", "Value of B");
+ matchHeaders.setString("A", "Value of A");
+ matchHeaders.setString("B", "Value of B");
assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
}
public void testAll_4()
{
- bindHeaders.put("X-match", "all");
- bindHeaders.put("A", "Value of A");
- bindHeaders.put("B", "Value of B");
-
- matchHeaders.put("A", "Value of A");
- matchHeaders.put("B", "Value of B");
- matchHeaders.put("C", "Value of C");
+ bindHeaders.setString("X-match", "all");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
+
+ matchHeaders.setString("A", "Value of A");
+ matchHeaders.setString("B", "Value of B");
+ matchHeaders.setString("C", "Value of C");
assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
}
public void testAll_5()
{
- bindHeaders.put("X-match", "all");
- bindHeaders.put("A", "Value of A");
- bindHeaders.put("B", "Value of B");
-
- matchHeaders.put("A", "Value of A");
- matchHeaders.put("B", "Altered value of B");
- matchHeaders.put("C", "Value of C");
+ bindHeaders.setString("X-match", "all");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
+
+ matchHeaders.setString("A", "Value of A");
+ matchHeaders.setString("B", "Altered value of B");
+ matchHeaders.setString("C", "Value of C");
assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
}
public void testAny_1()
{
- bindHeaders.put("X-match", "any");
- bindHeaders.put("A", "Value of A");
+ bindHeaders.setString("X-match", "any");
+ bindHeaders.setString("A", "Value of A");
- matchHeaders.put("A", "Value of A");
+ matchHeaders.setString("A", "Value of A");
assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
}
public void testAny_2()
{
- bindHeaders.put("X-match", "any");
- bindHeaders.put("A", "Value of A");
- bindHeaders.put("B", "Value of B");
+ bindHeaders.setString("X-match", "any");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
- matchHeaders.put("A", "Value of A");
+ matchHeaders.setString("A", "Value of A");
assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
}
public void testAny_3()
{
- bindHeaders.put("X-match", "any");
- bindHeaders.put("A", "Value of A");
- bindHeaders.put("B", "Value of B");
+ bindHeaders.setString("X-match", "any");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
- matchHeaders.put("A", "Value of A");
- matchHeaders.put("B", "Value of B");
+ matchHeaders.setString("A", "Value of A");
+ matchHeaders.setString("B", "Value of B");
assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
}
public void testAny_4()
{
- bindHeaders.put("X-match", "any");
- bindHeaders.put("A", "Value of A");
- bindHeaders.put("B", "Value of B");
-
- matchHeaders.put("A", "Value of A");
- matchHeaders.put("B", "Value of B");
- matchHeaders.put("C", "Value of C");
+ bindHeaders.setString("X-match", "any");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
+
+ matchHeaders.setString("A", "Value of A");
+ matchHeaders.setString("B", "Value of B");
+ matchHeaders.setString("C", "Value of C");
assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
}
public void testAny_5()
{
- bindHeaders.put("X-match", "any");
- bindHeaders.put("A", "Value of A");
- bindHeaders.put("B", "Value of B");
-
- matchHeaders.put("A", "Value of A");
- matchHeaders.put("B", "Altered value of B");
- matchHeaders.put("C", "Value of C");
+ bindHeaders.setString("X-match", "any");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
+
+ matchHeaders.setString("A", "Value of A");
+ matchHeaders.setString("B", "Altered value of B");
+ matchHeaders.setString("C", "Value of C");
assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
}
public void testAny_6()
{
- bindHeaders.put("X-match", "any");
- bindHeaders.put("A", "Value of A");
- bindHeaders.put("B", "Value of B");
-
- matchHeaders.put("A", "Altered value of A");
- matchHeaders.put("B", "Altered value of B");
- matchHeaders.put("C", "Value of C");
+ bindHeaders.setString("X-match", "any");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
+
+ matchHeaders.setString("A", "Altered value of A");
+ matchHeaders.setString("B", "Altered value of B");
+ matchHeaders.setString("C", "Value of C");
assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java Mon Jan 8 09:02:26 2007
@@ -308,7 +308,7 @@
}
}
- //remove the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
+ //removeKey the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
optionsURL.deleteCharAt(optionsURL.length() - 1);
return optionsURL.toString();
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Jan 8 09:02:26 2007
@@ -965,7 +965,7 @@
public void resubscribeSessions() throws JMSException, AMQException
{
ArrayList sessions = new ArrayList(_sessions.values());
- _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: remove?
+ _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession s = (AMQSession) it.next();
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Mon Jan 8 09:02:26 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.url.URLHelper;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import javax.naming.Reference;
import javax.naming.NamingException;
@@ -35,19 +36,27 @@
public abstract class AMQDestination implements Destination, Referenceable
{
- protected final String _exchangeName;
+ protected final AMQShortString _exchangeName;
- protected final String _exchangeClass;
+ protected final AMQShortString _exchangeClass;
- protected final String _destinationName;
+ protected final AMQShortString _destinationName;
- protected boolean _isDurable;
+ protected final boolean _isDurable;
protected final boolean _isExclusive;
protected final boolean _isAutoDelete;
- protected String _queueName;
+ private AMQShortString _queueName;
+
+ private String _url;
+ private AMQShortString _urlAsShortString;
+
+ private byte[] _byteEncoding;
+ private static final int IS_DURABLE_MASK = 0x1;
+ private static final int IS_EXCLUSIVE_MASK = 0x2;
+ private static final int IS_AUTODELETE_MASK = 0x4;
protected AMQDestination(String url) throws URLSyntaxException
{
@@ -63,27 +72,27 @@
_isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
_isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
_isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
- _queueName = binding.getQueueName();
+ _queueName = new AMQShortString(binding.getQueueName());
}
- protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, String queueName)
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, AMQShortString queueName)
{
this(exchangeName, exchangeClass, destinationName, false, false, queueName);
}
- protected AMQDestination(String exchangeName, String exchangeClass, String destinationName)
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName)
{
this(exchangeName, exchangeClass, destinationName, false, false, null);
}
- protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
- boolean isAutoDelete, String queueName)
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName)
{
this(exchangeName, exchangeClass, destinationName, isExclusive, isAutoDelete, queueName, false);
}
- protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
- boolean isAutoDelete, String queueName, boolean isDurable)
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
{
if (destinationName == null)
{
@@ -106,9 +115,13 @@
_isDurable = isDurable;
}
- public String getEncodedName()
+ public AMQShortString getEncodedName()
{
- return toURL();
+ if(_urlAsShortString == null)
+ {
+ toURL();
+ }
+ return _urlAsShortString;
}
public boolean isDurable()
@@ -116,12 +129,12 @@
return _isDurable;
}
- public String getExchangeName()
+ public AMQShortString getExchangeName()
{
return _exchangeName;
}
- public String getExchangeClass()
+ public AMQShortString getExchangeClass()
{
return _exchangeClass;
}
@@ -136,22 +149,34 @@
return ExchangeDefaults.DIRECT_EXCHANGE_NAME.equals(_exchangeName);
}
- public String getDestinationName()
+ public AMQShortString getDestinationName()
{
return _destinationName;
}
public String getQueueName()
{
+ return _queueName == null ? null : _queueName.toString();
+ }
+
+ public AMQShortString getAMQQueueName()
+ {
return _queueName;
}
- public void setQueueName(String queueName)
+
+
+ public void setQueueName(AMQShortString queueName)
{
+
_queueName = queueName;
+ // calculated URL now out of date
+ _url = null;
+ _urlAsShortString = null;
+ _byteEncoding = null;
}
- public abstract String getRoutingKey();
+ public abstract AMQShortString getRoutingKey();
public boolean isExclusive()
{
@@ -179,53 +204,114 @@
public String toURL()
{
- StringBuffer sb = new StringBuffer();
-
- sb.append(_exchangeClass);
- sb.append("://");
- sb.append(_exchangeName);
-
- sb.append("/");
-
- if (_destinationName != null)
+ String url = _url;
+ if(url == null)
{
- sb.append(_destinationName);
- }
- sb.append("/");
- if (_queueName != null)
- {
- sb.append(_queueName);
- }
+ StringBuffer sb = new StringBuffer();
- sb.append("?");
+ sb.append(_exchangeClass);
+ sb.append("://");
+ sb.append(_exchangeName);
+
+ sb.append('/');
+
+ if (_destinationName != null)
+ {
+ sb.append(_destinationName);
+ }
+
+ sb.append('/');
+
+ if (_queueName != null)
+ {
+ sb.append(_queueName);
+ }
+
+ sb.append('?');
+
+ if (_isDurable)
+ {
+ sb.append(BindingURL.OPTION_DURABLE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ if (_isExclusive)
+ {
+ sb.append(BindingURL.OPTION_EXCLUSIVE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ if (_isAutoDelete)
+ {
+ sb.append(BindingURL.OPTION_AUTODELETE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ //removeKey the last char '?' if there is no options , ',' if there are.
+ sb.deleteCharAt(sb.length() - 1);
+ url = sb.toString();
+ _url = url;
+ _urlAsShortString = new AMQShortString(url);
+ }
+ return url;
+ }
+
+ public byte[] toByteEncoding()
+ {
+ byte[] encoding = _byteEncoding;
+ if(encoding == null)
+ {
+ int size = _exchangeClass.length() + 1 +
+ _exchangeName.length() + 1 +
+ (_destinationName == null ? 0 : _destinationName.length()) + 1 +
+ (_queueName == null ? 0 : _queueName.length()) + 1 +
+ 1;
+ encoding = new byte[size];
+ int pos = 0;
+
+ pos = _exchangeClass.writeToByteArray(encoding, pos);
+ pos = _exchangeName.writeToByteArray(encoding, pos);
+ if(_destinationName == null)
+ {
+ encoding[pos++] = (byte)0;
+ }
+ else
+ {
+ pos = _destinationName.writeToByteArray(encoding,pos);
+ }
+ if(_queueName == null)
+ {
+ encoding[pos++] = (byte)0;
+ }
+ else
+ {
+ pos = _queueName.writeToByteArray(encoding,pos);
+ }
+ byte options = 0;
+ if(_isDurable)
+ {
+ options |= IS_DURABLE_MASK;
+ }
+ if(_isExclusive)
+ {
+ options |= IS_EXCLUSIVE_MASK;
+ }
+ if(_isAutoDelete)
+ {
+ options |= IS_AUTODELETE_MASK;
+ }
+ encoding[pos] = options;
- if (_isDurable)
- {
- sb.append(BindingURL.OPTION_DURABLE);
- sb.append("='true'");
- sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
- }
- if (_isExclusive)
- {
- sb.append(BindingURL.OPTION_EXCLUSIVE);
- sb.append("='true'");
- sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
- }
+ _byteEncoding = encoding;
- if (_isAutoDelete)
- {
- sb.append(BindingURL.OPTION_AUTODELETE);
- sb.append("='true'");
- sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
}
-
- //remove the last char '?' if there is no options , ',' if there are.
- sb.deleteCharAt(sb.length() - 1);
-
- return sb.toString();
+ return encoding;
}
public boolean equals(Object o)
@@ -293,9 +379,55 @@
null); // factory location
}
+
+ public static Destination createDestination(byte[] byteEncodedDestination)
+ {
+ AMQShortString exchangeClass;
+ AMQShortString exchangeName;
+ AMQShortString destinationName;
+ AMQShortString queueName;
+ boolean isDurable;
+ boolean isExclusive;
+ boolean isAutoDelete;
+
+ int pos = 0;
+ exchangeClass = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+ pos+= exchangeClass.length() + 1;
+ exchangeName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+ pos+= exchangeName.length() + 1;
+ destinationName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+ pos+= (destinationName == null ? 0 : destinationName.length()) + 1;
+ queueName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+ pos+= (queueName == null ? 0 : queueName.length()) + 1;
+ int options = byteEncodedDestination[pos];
+ isDurable = (options & IS_DURABLE_MASK) != 0;
+ isExclusive = (options & IS_EXCLUSIVE_MASK) != 0;
+ isAutoDelete = (options & IS_AUTODELETE_MASK) != 0;
+
+ if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
+ {
+ return new AMQQueue(destinationName,queueName,isExclusive,isAutoDelete,isDurable);
+ }
+ else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+ {
+ return new AMQTopic(destinationName,isAutoDelete,queueName,isDurable);
+ }
+ else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))
+ {
+ return new AMQHeadersExchange(destinationName);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unknown Exchange Class:" + exchangeClass);
+ }
+
+
+
+ }
+
public static Destination createDestination(BindingURL binding)
{
- String type = binding.getExchangeClass();
+ AMQShortString type = binding.getExchangeClass();
if (type.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java Mon Jan 8 09:02:26 2007
@@ -22,6 +22,7 @@
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.framing.AMQShortString;
/**
* A destination backed by a headers exchange
@@ -33,12 +34,17 @@
this(binding.getExchangeName());
}
- public AMQHeadersExchange(String queueName)
+ public AMQHeadersExchange(String name)
+ {
+ this(new AMQShortString(name));
+ }
+
+ public AMQHeadersExchange(AMQShortString queueName)
{
super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName, true, true, null);
}
- public String getRoutingKey()
+ public AMQShortString getRoutingKey()
{
return getDestinationName();
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java Mon Jan 8 09:02:26 2007
@@ -22,6 +22,7 @@
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.Queue;
@@ -42,12 +43,22 @@
* Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
* @param name the name of the queue
*/
- public AMQQueue(String name)
+ public AMQQueue(AMQShortString name)
{
this(name, false);
}
/**
+ * Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
+ * @param name the name of the queue
+ */
+ public AMQQueue(String name)
+ {
+ this(new AMQShortString(name), false);
+ }
+
+
+ /**
* Create a queue with a specified name.
*
* @param name the destination name (used in the routing key)
@@ -56,10 +67,23 @@
*/
public AMQQueue(String name, boolean temporary)
{
+ this(new AMQShortString(name),temporary);
+ }
+
+
+ /**
+ * Create a queue with a specified name.
+ *
+ * @param name the destination name (used in the routing key)
+ * @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted
+ * and exclusive
+ */
+ public AMQQueue(AMQShortString name, boolean temporary)
+ {
// queue name is set to null indicating that the broker assigns a name in the case of temporary queues
// temporary queues are typically used as response queues
- this(name, temporary?null:name, temporary, temporary);
- _isDurable = !temporary;
+ this(name, temporary?null:name, temporary, temporary, !temporary);
+
}
/**
@@ -69,16 +93,22 @@
* @param exclusive true if the queue should only permit a single consumer
* @param autoDelete true if the queue should be deleted automatically when the last consumers detaches
*/
- public AMQQueue(String destinationName, String queueName, boolean exclusive, boolean autoDelete)
+ public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete)
+ {
+ this(destinationName, queueName, exclusive, autoDelete, false);
+ }
+
+
+ public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable)
{
super(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive,
- autoDelete, queueName);
+ autoDelete, queueName, durable);
}
- public String getRoutingKey()
+ public AMQShortString getRoutingKey()
{
- return getQueueName();
+ return getAMQQueueName();
}
public boolean isNameRequired()
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Jan 8 09:02:26 2007
@@ -26,10 +26,7 @@
import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.client.failover.FailoverSupport;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.JMSStreamMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
@@ -104,7 +101,7 @@
/**
* Maps from consumer tag (String) to JMSMessageConsumer instance
*/
- private Map<String, BasicMessageConsumer> _consumers = new ConcurrentHashMap<String, BasicMessageConsumer>();
+ private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
/**
* Maps from destination to count of JMSMessageConsumers
@@ -205,7 +202,7 @@
message.bodies);
int errorCode = message.bounceBody.replyCode;
- String reason = message.bounceBody.replyText;
+ AMQShortString reason = message.bounceBody.replyText;
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
//@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
@@ -322,14 +319,7 @@
synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
- try
- {
- return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
+ return new JMSBytesMessage();
}
}
@@ -338,31 +328,13 @@
synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
- try
- {
- return (MapMessage) _messageFactoryRegistry.createMessage("jms/map-message");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
+ return new JMSMapMessage();
}
}
public javax.jms.Message createMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
- try
- {
- return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
- }
+ return createBytesMessage();
}
public ObjectMessage createObjectMessage() throws JMSException
@@ -370,33 +342,15 @@
synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
- try
- {
- return (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
+ return (ObjectMessage) new JMSObjectMessage();
}
}
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
- try
- {
- ObjectMessage msg = (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream");
- msg.setObject(object);
- return msg;
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
- }
+ ObjectMessage msg = createObjectMessage();
+ msg.setObject(object);
+ return msg;
}
public StreamMessage createStreamMessage() throws JMSException
@@ -405,14 +359,7 @@
{
checkNotClosed();
- try
- {
- return (StreamMessage) _messageFactoryRegistry.createMessage(JMSStreamMessage.MIME_TYPE);
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create text message: " + e);
- }
+ return new JMSStreamMessage();
}
}
@@ -422,33 +369,16 @@
{
checkNotClosed();
- try
- {
- return (TextMessage) _messageFactoryRegistry.createMessage("text/plain");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create text message: " + e);
- }
+ return new JMSTextMessage();
}
}
public TextMessage createTextMessage(String text) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
- try
- {
- TextMessage msg = (TextMessage) _messageFactoryRegistry.createMessage("text/plain");
- msg.setText(text);
- return msg;
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create text message: " + e);
- }
- }
+
+ TextMessage msg = createTextMessage();
+ msg.setText(text);
+ return msg;
}
public boolean getTransacted() throws JMSException
@@ -530,7 +460,7 @@
0, // classId
0, // methodId
AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- "JMS client closing channel"); // replyText
+ new AMQShortString("JMS client closing channel")); // replyText
_connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
@@ -1050,12 +980,12 @@
}
- public void declareExchange(String name, String type)
+ public void declareExchange(AMQShortString name, AMQShortString type)
{
declareExchange(name, type, _connection.getProtocolHandler());
}
- public void declareExchangeSynch(String name, String type) throws AMQException
+ public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -1079,7 +1009,7 @@
declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
}
- private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler)
+ private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -1106,7 +1036,7 @@
* @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
* @throws AMQException
*/
- private String declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
+ private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
{
// For queues (but not topics) we generate the name in the client rather than the
// server. This allows the name to be reused on failover if required. In general,
@@ -1127,14 +1057,14 @@
amqd.isExclusive(), // exclusive
true, // nowait
false, // passive
- amqd.getQueueName(), // queue
+ amqd.getAMQQueueName(), // queue
0); // ticket
protocolHandler.writeFrame(queueDeclare);
- return amqd.getQueueName();
+ return amqd.getAMQQueueName();
}
- private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
+ private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -1157,12 +1087,12 @@
* @param queueName
* @return the consumer tag generated by the broker
*/
- private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
+ private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
boolean nowait, String messageSelector) throws AMQException
{
//fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
- String tag = Integer.toString(_nextTag++);
+ AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
FieldTable arguments = FieldTableFactory.newFieldTable();
if (messageSelector != null && !messageSelector.equals(""))
@@ -1282,7 +1212,7 @@
if (topicName.indexOf('/') == -1)
{
- return new AMQTopic(topicName);
+ return new AMQTopic(new AMQShortString(topicName));
}
else
{
@@ -1352,12 +1282,21 @@
}
else
{
+ AMQShortString topicName;
+ if(topic instanceof AMQTopic)
+ {
+ topicName = ((AMQTopic)topic).getDestinationName();
+ }
+ else
+ {
+ topicName = new AMQShortString(topic.getTopicName());
+ }
// if the queue is bound to the exchange but NOT for this topic, then the JMS spec
// says we must trash the subscription.
- if (isQueueBound(dest.getQueueName()) &&
- !isQueueBound(dest.getQueueName(), topic.getTopicName()))
+ if (isQueueBound(dest.getAMQQueueName()) &&
+ !isQueueBound(dest.getAMQQueueName(), topicName))
{
- deleteQueue(dest.getQueueName());
+ deleteQueue(dest.getAMQQueueName());
}
}
@@ -1369,7 +1308,7 @@
return subscriber;
}
- void deleteQueue(String queueName) throws JMSException
+ void deleteQueue(AMQShortString queueName) throws JMSException
{
try
{
@@ -1461,12 +1400,12 @@
}
}
- boolean isQueueBound(String queueName) throws JMSException
+ boolean isQueueBound(AMQShortString queueName) throws JMSException
{
return isQueueBound(queueName, null);
}
- boolean isQueueBound(String queueName, String routingKey) throws JMSException
+ boolean isQueueBound(AMQShortString queueName, AMQShortString routingKey) throws JMSException
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -1606,7 +1545,7 @@
declareExchange(amqd, protocolHandler);
- String queueName = declareQueue(amqd, protocolHandler);
+ AMQShortString queueName = declareQueue(amqd, protocolHandler);
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
@@ -1674,7 +1613,7 @@
private void resubscribeProducers() throws AMQException
{
ArrayList producers = new ArrayList(_producers.values());
- _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: remove
+ _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
for (Iterator it = producers.iterator(); it.hasNext();)
{
BasicMessageProducer producer = (BasicMessageProducer) it.next();
@@ -1718,7 +1657,7 @@
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
- public void confirmConsumerCancelled(String consumerTag)
+ public void confirmConsumerCancelled(AMQShortString consumerTag)
{
BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
if((consumer != null) && (consumer.isAutoClose()))
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Mon Jan 8 09:02:26 2007
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.AMQShortString;
+
import javax.jms.JMSException;
import javax.jms.TemporaryQueue;
@@ -38,7 +40,7 @@
*/
public AMQTemporaryQueue(AMQSession session)
{
- super("TempQueue" + Long.toString(System.currentTimeMillis()), true);
+ super(new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true);
_session = session;
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Mon Jan 8 09:02:26 2007
@@ -22,6 +22,7 @@
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
import javax.jms.Topic;
@@ -40,10 +41,15 @@
public AMQTopic(String name)
{
+ this(new AMQShortString(name));
+ }
+
+ public AMQTopic(AMQShortString name)
+ {
this(name, true, null, false);
}
- public AMQTopic(String name, boolean isAutoDelete, String queueName, boolean isDurable)
+ public AMQTopic(AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
{
super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete,
queueName, isDurable);
@@ -56,17 +62,17 @@
true);
}
- public static String getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
+ public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
{
- return connection.getClientID() + ":" + subscriptionName;
+ return new AMQShortString(connection.getClientID() + ":" + subscriptionName);
}
public String getTopicName() throws JMSException
{
- return super.getDestinationName();
+ return super.getDestinationName().toString();
}
- public String getRoutingKey()
+ public AMQShortString getRoutingKey()
{
return getDestinationName();
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Mon Jan 8 09:02:26 2007
@@ -22,16 +22,11 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
@@ -74,7 +69,7 @@
* The consumer tag allows us to close the consumer by sending a jmsCancel method to the
* broker
*/
- private String _consumerTag;
+ private AMQShortString _consumerTag;
/**
* We need to know the channel id when constructing frames
@@ -255,17 +250,10 @@
if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
_unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
- String url = jmsMsg.getStringProperty(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString());
- try
- {
- Destination dest = AMQDestination.createDestination(new AMQBindingURL(url));
- jmsMsg.setJMSDestination(dest);
- }
- catch (URLSyntaxException e)
- {
- _logger.warn("Unable to parse the supplied destination header: " + url);
- }
-
+ byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName());
+ Destination dest = AMQDestination.createDestination(url);
+ jmsMsg.setJMSDestination(dest);
+
}
_session.setInRecovery(false);
}
@@ -498,7 +486,9 @@
*/
void notifyMessage(UnprocessedMessage messageFrame, int channelId)
{
- if (_logger.isDebugEnabled())
+ final boolean debug = _logger.isDebugEnabled();
+
+ if (debug)
{
_logger.debug("notifyMessage called with message number " + messageFrame.deliverBody.deliveryTag);
}
@@ -509,7 +499,10 @@
messageFrame.contentHeader,
messageFrame.bodies);
- _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
+ if(debug)
+ {
+ _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
+ }
jmsMessage.setConsumer(this);
preDeliver(jmsMessage);
@@ -642,12 +635,12 @@
_session.deregisterConsumer(this);
}
- public String getConsumerTag()
+ public AMQShortString getConsumerTag()
{
return _consumerTag;
}
- public void setConsumerTag(String consumerTag)
+ public void setConsumerTag(AMQShortString consumerTag)
{
_consumerTag = consumerTag;
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Mon Jan 8 09:02:26 2007
@@ -522,7 +522,7 @@
AbstractJMSMessage message = convertToNativeMessage(origMessage);
- message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL());
+ message.getJmsContentHeaderProperties().setBytes(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName(), destination.toByteEncoding());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
@@ -534,26 +534,22 @@
destination.getRoutingKey(), // routingKey
0); // ticket
- long currentTime = 0;
- if (!_disableTimestamps)
- {
- currentTime = System.currentTimeMillis();
- message.setJMSTimestamp(currentTime);
- }
+
+
message.prepareForSending();
ByteBuffer payload = message.getData();
BasicContentHeaderProperties contentHeaderProperties = message.getJmsContentHeaderProperties();
- if (timeToLive > 0)
+ if (!_disableTimestamps)
{
- if (!_disableTimestamps)
+ final long currentTime = System.currentTimeMillis();
+ contentHeaderProperties.setTimestamp(currentTime);
+
+ if (timeToLive > 0)
{
contentHeaderProperties.setExpiration(currentTime + timeToLive);
}
- }
- else
- {
- if (!_disableTimestamps)
+ else
{
contentHeaderProperties.setExpiration(0);
}
@@ -561,14 +557,16 @@
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
- int size = (payload != null) ? payload.limit() : 0;
- ContentBody[] contentBodies = createContentBodies(payload);
- AMQFrame[] frames = new AMQFrame[2 + contentBodies.length];
- for (int i = 0; i < contentBodies.length; i++)
+ final int size = (payload != null) ? payload.limit() : 0;
+ final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
+ final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
+
+ if(payload != null)
{
- frames[2 + i] = ContentBody.createAMQFrame(_channelId, contentBodies[i]);
+ createContentBodies(payload, frames, 2, _channelId);
}
- if (contentBodies.length > 0 && _logger.isDebugEnabled())
+
+ if (contentBodyFrameCount != 0 && _logger.isDebugEnabled())
{
_logger.debug("Sending content body frames to " + destination);
}
@@ -592,10 +590,10 @@
if (message != origMessage)
{
- _logger.warn("Updating original message");
+ _logger.debug("Updating original message");
origMessage.setJMSPriority(message.getJMSPriority());
origMessage.setJMSTimestamp(message.getJMSTimestamp());
- _logger.warn("Setting JMSExpiration:" + message.getJMSExpiration());
+ _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
origMessage.setJMSExpiration(message.getJMSExpiration());
origMessage.setJMSMessageID(message.getJMSMessageID());
}
@@ -625,42 +623,52 @@
* maximum frame size.
*
* @param payload
- * @return the array of content bodies
+ * @param frames
+ * @param offset
+ * @param channelId @return the array of content bodies
*/
- private ContentBody[] createContentBodies(ByteBuffer payload)
+ private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
{
- if (payload == null || payload.remaining() == 0)
- {
- return NO_CONTENT_BODIES;
- }
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
- // (0xCE byte).
- int dataLength = payload.remaining();
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
- int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
- int frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
- final ContentBody[] bodies = new ContentBody[frameCount];
-
- if (frameCount == 1)
+ if (frames.length == offset + 1)
{
- bodies[0] = new ContentBody();
- bodies[0].payload = payload;
+ frames[offset] = ContentBody.createAMQFrame(channelId,new ContentBody(payload));
}
else
{
- long remaining = dataLength;
- for (int i = 0; i < bodies.length; i++)
+
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ long remaining = payload.remaining();
+ for (int i = offset; i < frames.length; i++)
{
- bodies[i] = new ContentBody();
- payload.position((int) framePayloadMax * i);
+ payload.position((int) framePayloadMax * (i-offset));
int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
payload.limit(payload.position() + length);
- bodies[i].payload = payload.slice();
+ frames[i] = ContentBody.createAMQFrame(channelId,new ContentBody(payload.slice()));
+
remaining -= length;
}
}
- return bodies;
+
+ }
+
+ private int calculateContentBodyFrameCount(ByteBuffer payload)
+ {
+ // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // (0xCE byte).
+ int frameCount;
+ if(payload == null || payload.remaining() == 0)
+ {
+ frameCount = 0;
+ }
+ else
+ {
+ int dataLength = payload.remaining();
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
+ frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
+ }
+ return frameCount;
}
public void setMimeType(String mimeType) throws JMSException
Copied: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java (from r494042, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java?view=diff&rev=494121&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java&r1=494042&p2=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java Mon Jan 8 09:02:26 2007
@@ -20,23 +20,38 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.AMQShortString;
+
import java.util.*;
-public enum CustomJMXProperty
+public enum CustomJMSXProperty
{
JMSX_QPID_JMSDESTINATIONURL,
JMSXGroupID,
JMSXGroupSeq;
+
+ private final AMQShortString _nameAsShortString;
+
+ CustomJMSXProperty()
+ {
+ _nameAsShortString = new AMQShortString(toString());
+ }
+
+ public AMQShortString getShortStringName()
+ {
+ return _nameAsShortString;
+ }
+
private static Enumeration _names;
public static synchronized Enumeration asEnumeration()
{
if(_names == null)
{
- CustomJMXProperty[] properties = values();
+ CustomJMSXProperty[] properties = values();
ArrayList<String> nameList = new ArrayList<String>(properties.length);
- for(CustomJMXProperty property : properties)
+ for(CustomJMSXProperty property : properties)
{
nameList.add(property.toString());
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java?view=diff&rev=494121&r1=494120&r2=494121
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java Mon Jan 8 09:02:26 2007
@@ -56,7 +56,7 @@
public Enumeration getJMSXPropertyNames() throws JMSException
{
- return CustomJMXProperty.asEnumeration();
+ return CustomJMSXProperty.asEnumeration();
}
public int getProviderMajorVersion() throws JMSException