You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/08/31 13:36:28 UTC
svn commit: r809544 [2/2] - in /qpid/branches/java-broker-0-10/qpid/java:
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/flow/ ...
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Mon Aug 31 11:36:26 2009
@@ -24,10 +24,7 @@
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeInUseException;
+import org.apache.qpid.server.exchange.*;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
@@ -39,14 +36,13 @@
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.framing.*;
import java.util.ArrayList;
import java.util.Map;
+import java.util.Collection;
public class ServerSessionDelegate extends SessionDelegate
{
@@ -90,7 +86,14 @@
@Override
public void messageAcquire(Session session, MessageAcquire method)
{
- super.messageAcquire(session, method);
+ RangeSet acquiredRanges = ((ServerSession)session).acquire(method.getTransfers());
+
+ Acquired result = new Acquired(acquiredRanges);
+
+
+ session.executionResult((int) method.getId(), result);
+
+
}
@Override
@@ -129,35 +132,60 @@
else
{
String destination = method.getDestination();
- String queueName = method.getQueue();
- QueueRegistry queueRegistry = getQueueRegistry(session);
-
-
- AMQQueue queue = queueRegistry.getQueue(queueName);
- if(queue == null)
+ if(((ServerSession)session).getSubscription(destination)!=null)
{
- exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Subscription already exists with destaination: '"+destination+"'");
}
else
{
+ String queueName = method.getQueue();
+ QueueRegistry queueRegistry = getQueueRegistry(session);
- FlowCreditManager_0_10 creditManager = new CreditCreditManager(0L,0L);
-
- // TODO filters
- Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null);
+ AMQQueue queue = queueRegistry.getQueue(queueName);
- ((ServerSession)session).register(destination, sub);
- try
+ if(queue == null)
{
- queue.registerSubscription(sub, method.getExclusive());
+ exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
}
- catch (AMQException e)
+ else if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
{
- // TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- throw new RuntimeException(e);
+ exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
+ }
+ else
+ {
+
+ FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
+
+ // TODO filters
+
+ Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
+ destination,
+ method.getAcceptMode(),
+ method.getAcquireMode(),
+ MessageFlowMode.WINDOW,
+ creditManager, null);
+
+ ((ServerSession)session).register(destination, sub);
+ try
+ {
+ queue.registerSubscription(sub, method.getExclusive());
+ }
+ catch (AMQQueue.ExistingExclusiveSubscription existing)
+ {
+ exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer");
+ }
+ catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive)
+ {
+ exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively");
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
+ }
}
}
}
@@ -172,18 +200,34 @@
if(xfr.hasDestination())
{
exchange = exchangeRegistry.getExchange(xfr.getDestination());
+ if(exchange == null)
+ {
+ exchange = exchangeRegistry.getDefaultExchange();
+ }
}
else
{
exchange = exchangeRegistry.getDefaultExchange();
}
- MessageTransferMessage message = new MessageTransferMessage(xfr);
+ MessageTransferMessage message = new MessageTransferMessage(xfr, ((ServerSession)ssn).getReference());
+
+ DeliveryProperties delvProps = null;
+ if(message.getHeader() != null && (delvProps = message.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+ {
+ delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
+ }
+
try
{
ArrayList<AMQQueue> queues = exchange.route(message);
- ((ServerSession) ssn).enqueue(message, queues);
+
+
+ if(queues != null)
+ {
+ ((ServerSession) ssn).enqueue(message, queues);
+ }
ssn.processed(xfr);
}
@@ -281,6 +325,10 @@
else
{
// TODO - check exchange has same properties
+ if(!exchange.getType().toString().equals(method.getType()))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
+ }
}
}
@@ -329,7 +377,10 @@
}
else
{
- // TODO check same as declared
+ if(!exchange.getType().toString().equals(method.getType()))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
+ }
}
}
@@ -343,7 +394,7 @@
ex.setDescription(description);
session.invoke(ex);
- session.close();
+ //session.close();
}
private Exchange getExchange(Session session, String exchangeName)
@@ -431,12 +482,128 @@
@Override
public void exchangeBind(Session session, ExchangeBind method)
{
- super.exchangeBind(session, method);
+
+ VirtualHost virtualHost = getVirtualHost(session);
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ if (!method.hasQueue())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
+ }
+ else if (!method.hasExchange())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set");
+ }
+/*
+ else if (!method.hasBindingKey())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set");
+ }
+*/
+ else
+ {
+ //TODO - here because of non-compiant python tests
+ if (!method.hasBindingKey())
+ {
+ method.setBindingKey(method.getQueue());
+ }
+ AMQQueue queue = queueRegistry.getQueue(method.getQueue());
+ Exchange exchange = exchangeRegistry.getExchange(method.getExchange());
+ if(queue == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
+ }
+ else if(exchange == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found");
+ }
+ else if (!virtualHost.getAccessManager().authoriseBind((ServerSession)session, exchange,
+ queue, new AMQShortString(method.getBindingKey())))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Bind Exchange: '" + method.getExchange()
+ + "' to Queue: '" + method.getQueue()
+ + "' not allowed");
+ }
+ else if(exchange.getType().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
+ {
+ exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header");
+ }
+ else
+ {
+ try
+ {
+ AMQShortString routingKey = new AMQShortString(method.getBindingKey());
+ FieldTable fieldTable = FieldTable.convertToFieldTable(method.getArguments());
+
+ if (!exchange.isBound(routingKey, fieldTable, queue))
+ {
+ queue.bind(exchange, routingKey, fieldTable);
+ }
+ else
+ {
+ // todo
+ }
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+
+ }
+
+
+
}
@Override
public void exchangeUnbind(Session session, ExchangeUnbind method)
{
+ VirtualHost virtualHost = getVirtualHost(session);
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ if (!method.hasQueue())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
+ }
+ else if (!method.hasExchange())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set");
+ }
+ else if (!method.hasBindingKey())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set");
+ }
+ else
+ {
+ AMQQueue queue = queueRegistry.getQueue(method.getQueue());
+ Exchange exchange = exchangeRegistry.getExchange(method.getExchange());
+ if(queue == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
+ }
+ else if(exchange == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found");
+ }
+ else
+ {
+ try
+ {
+ queue.unBind(exchange, new AMQShortString(method.getBindingKey()), null);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+
super.exchangeUnbind(session, method);
}
@@ -445,41 +612,65 @@
{
ExchangeBoundResult result = new ExchangeBoundResult();
+ Exchange exchange;
+ AMQQueue queue;
if(method.hasExchange())
{
- Exchange exchange = getExchange(session, method.getExchange());
+ exchange = getExchange(session, method.getExchange());
if(exchange == null)
{
result.setExchangeNotFound(true);
}
+ }
+ else
+ {
+ exchange = getExchangeRegistry(session).getDefaultExchange();
+ }
- if(method.hasQueue())
+
+ if(method.hasQueue())
+ {
+
+ queue = getQueue(session, method.getQueue());
+ if(queue == null)
{
+ result.setQueueNotFound(true);
+ }
+
- AMQQueue queue = getQueue(session, method.getQueue());
- if(queue == null)
- {
- result.setQueueNotFound(true);
- }
+ if(exchange != null && queue != null)
+ {
+
+ boolean queueMatched = exchange.isBound(queue);
+
+ result.setQueueNotMatched(!queueMatched);
- if(exchange != null && queue != null)
+
+ if(method.hasBindingKey())
{
- if(method.hasBindingKey())
+ if(method.hasArguments())
+ {
+ // TODO
+ }
+ if(queueMatched)
{
-
- if(method.hasArguments())
- {
- // TODO
- }
result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue));
-
}
+ else
+ {
+ result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
+ }
+ }
+ else if (method.hasArguments())
+ {
+ // TODO
+
+ }
- result.setQueueNotMatched(!exchange.isBound(queue));
+ result.setQueueNotMatched(!exchange.isBound(queue));
- }
}
else if(exchange != null && method.hasBindingKey())
{
@@ -492,31 +683,20 @@
}
}
- else if(method.hasQueue())
+ else if(exchange != null && method.hasBindingKey())
{
- AMQQueue queue = getQueue(session, method.getQueue());
- if(queue == null)
+ if(method.hasArguments())
{
- result.setQueueNotFound(true);
- }
- else
- {
- if(method.hasBindingKey())
- {
- if(method.hasArguments())
- {
- // TODO
- }
-
- // TODO
- }
+ // TODO
}
+ result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
}
session.executionResult((int) method.getId(), result);
+
}
private AMQQueue getQueue(Session session, String queue)
@@ -580,6 +760,11 @@
try
{
queue = createQueue(queueName, method, virtualHost, (ServerSession)session);
+ if(method.getExclusive())
+ {
+ queue.setPrincipalHolder((ServerSession)session);
+ }
+
if (queue.isDurable() && !queue.isAutoDelete())
{
@@ -597,6 +782,64 @@
queue.bind(defaultExchange, new AMQShortString(queueName), null);
}
+
+ if(method.hasAutoDelete()
+ && method.getAutoDelete()
+ && method.hasExclusive()
+ && method.getExclusive())
+ {
+ final AMQQueue q = queue;
+ final ServerSession.Task deleteQueueTask = new ServerSession.Task()
+ {
+
+ public void doTask(ServerSession session)
+ {
+ try
+ {
+ q.delete();
+ }
+ catch (AMQException e)
+ {
+ //TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ };
+ final ServerSession s = (ServerSession) session;
+ s.addSessionCloseTask(deleteQueueTask);
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+
+ public void doTask(AMQQueue queue) throws AMQException
+ {
+ s.removeSessionCloseTask(deleteQueueTask);
+ }
+ });
+ }
+ else if(method.getExclusive())
+ {
+ {
+ final AMQQueue q = queue;
+ final ServerSession.Task removeExclusive = new ServerSession.Task()
+ {
+
+ public void doTask(ServerSession session)
+ {
+ q.setPrincipalHolder(null);
+ }
+ };
+ final ServerSession s = (ServerSession) session;
+ s.addSessionCloseTask(removeExclusive);
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+
+ public void doTask(AMQQueue queue) throws AMQException
+ {
+ s.removeSessionCloseTask(removeExclusive);
+ }
+ });
+ }
+ }
}
catch (AMQException e)
{
@@ -605,13 +848,12 @@
}
}
}
- else if (method.getExclusive() && (queue.getOwner() != null && !queue.getOwner().equals(((ServerSession)session).getPrincipal().getName())))
+ else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session)))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
- + "declared on another client ID('"
- + queue.getOwner() + "')";
+ + "declared on another session";
ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED;
exception(session, method, errorCode, description);
@@ -695,7 +937,11 @@
}
else
{
- if (method.getIfEmpty() && !queue.isEmpty())
+ if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
+ {
+ exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
+ }
+ else if (method.getIfEmpty() && !queue.isEmpty())
{
exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " not empty");
}
@@ -746,7 +992,7 @@
String queueName = method.getQueue();
if(queueName == null || queueName.length()==0)
{
- exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "No queue name supplied");
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "No queue name supplied");
}
else
@@ -778,7 +1024,25 @@
@Override
public void queueQuery(Session session, QueueQuery method)
{
- super.queueQuery(session, method);
+ QueueQueryResult result = new QueueQueryResult();
+
+ AMQQueue queue = getQueue(session, method.getQueue());
+
+ if(queue != null)
+ {
+ result.setQueue(queue.getName().toString());
+ result.setDurable(queue.isDurable());
+ result.setExclusive(queue.isExclusive());
+ result.setAutoDelete(queue.isAutoDelete());
+ result.setArguments(queue.getArguments());
+ result.setMessageCount(queue.getMessageCount());
+ result.setSubscriberCount(queue.getConsumerCount());
+
+ }
+
+
+ session.executionResult((int) method.getId(), result);
+
}
@Override
@@ -835,15 +1099,14 @@
public void closed(Session session)
{
super.closed(session);
- for(Subscription_0_10 sub : getSubscriptions(session).values())
+ for(Subscription_0_10 sub : getSubscriptions(session))
{
- sub.close();
+ ((ServerSession)session).unregister(sub);
}
((ServerSession)session).onClose();
- ((ServerSession)session).onClose();
}
- public Map<String, Subscription_0_10> getSubscriptions(Session session)
+ public Collection<Subscription_0_10> getSubscriptions(Session session)
{
return ((ServerSession)session).getSubscriptions();
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Mon Aug 31 11:36:26 2009
@@ -40,7 +40,10 @@
try
{
-
+ for(Action action : _postCommitActions)
+ {
+ action.onRollback();
+ }
}
finally
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java Mon Aug 31 11:36:26 2009
@@ -30,6 +30,8 @@
public static interface Action
{
public void postCommit();
+
+ public void onRollback();
}
void dequeue(AMQQueue queue, ServerMessage message, Action postCommitAction);
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Mon Aug 31 11:36:26 2009
@@ -318,6 +318,11 @@
return false; //To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean isAcquiredBy(Subscription subscription)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void setDeliveredToSubscription()
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -368,6 +373,11 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public void requeue(Subscription subscription)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void dequeue(final StoreContext storeContext) throws FailedDequeueException
{
//To change body of implemented methods use File | Settings | File Templates.
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Mon Aug 31 11:36:26 2009
@@ -32,6 +32,7 @@
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.PrincipalHolder;
import org.apache.qpid.AMQException;
import org.apache.commons.configuration.Configuration;
@@ -46,6 +47,8 @@
private boolean _deleted = false;
private AMQShortString _name;
+ private PrincipalHolder _principalHolder;
+
public MockAMQQueue(String name)
{
_name = new AMQShortString(name);
@@ -171,6 +174,11 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public void requeue(QueueEntryImpl storeContext, Subscription subscription)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -312,6 +320,16 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean isExclusive()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Map<String, Object> getArguments()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public ManagedObject getManagedObject()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -333,4 +351,14 @@
}
+ public PrincipalHolder getPrincipalHolder()
+ {
+ return _principalHolder;
+ }
+
+ public void setPrincipalHolder(PrincipalHolder principalHolder)
+ {
+ _principalHolder = principalHolder;
+ }
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Mon Aug 31 11:36:26 2009
@@ -44,6 +44,11 @@
return false;
}
+ public boolean isAcquiredBy(Subscription subscription)
+ {
+ return false;
+ }
+
public void addStateChangeListener(StateChangeListener listener)
{
@@ -163,7 +168,12 @@
}
-
+ public void requeue(Subscription subscription)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
public void setDeliveredToSubscription()
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Mon Aug 31 11:36:26 2009
@@ -186,7 +186,7 @@
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
- assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+ assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
// Check removing the subscription removes it's information from the queue
_queue.unregisterSubscription(_subscription);
@@ -197,7 +197,7 @@
AMQMessage messageB = createMessage(new Long (25));
_queue.enqueue(messageB);
- QueueEntry entry = _subscription.getLastSeenEntry();
+ QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
assertNull(entry);
}
@@ -207,7 +207,7 @@
_queue.enqueue(messageA);
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
- assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+ assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
}
public void testExclusiveConsumer() throws AMQException
@@ -224,7 +224,7 @@
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
- assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+ assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
Subscription subB = new MockSubscription();
@@ -273,7 +273,7 @@
Long id = new Long(26);
AMQMessage message = createMessage(id);
_queue.enqueue(message);
- QueueEntry entry = _subscription.getLastSeenEntry();
+ QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
entry.setRedelivered(true);
_queue.resend(entry, _subscription);
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Mon Aug 31 11:36:26 2009
@@ -39,7 +39,7 @@
private AMQShortString tag = new AMQShortString("mocktag");
private AMQQueue queue = null;
private StateListener _listener = null;
- private QueueEntry lastSeen = null;
+ private AMQQueue.Context _queueContext = null;
private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
@@ -69,9 +69,9 @@
return tag ;
}
- public QueueEntry getLastSeenEntry()
+ public AMQQueue.Context getQueueContext()
{
- return lastSeen;
+ return _queueContext;
}
public SubscriptionAcquiredState getOwningState()
@@ -147,25 +147,23 @@
{
}
+ public void onDequeue(QueueEntry queueEntry)
+ {
+ }
+
public void restoreCredit(QueueEntry queueEntry)
{
+ //To change body of implemented methods use File | Settings | File Templates.
}
public void send(QueueEntry msg) throws AMQException
{
- lastSeen = msg;
messages.add(msg);
}
- public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
+ public void setQueueContext(AMQQueue.Context queueContext)
{
- boolean result = false;
- if (expected != null)
- {
- result = (expected.equals(lastSeen));
- }
- lastSeen = newValue;
- return result;
+ _queueContext = queueContext;
}
public void setQueue(AMQQueue queue)
Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Mon Aug 31 11:36:26 2009
@@ -22,6 +22,10 @@
import org.apache.mina.common.ByteBuffer;
+import java.util.Date;
+import java.util.Map;
+import java.math.BigDecimal;
+
/**
* AMQTypedValue combines together a native Java Object value, and an {@link AMQType}, as a fully typed AMQP parameter
* value. It provides the ability to read and write fully typed parameters to and from byte buffers. It also provides
@@ -113,4 +117,63 @@
return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
}
+
+ public static AMQTypedValue toTypedValue(Object val)
+ {
+ if(val == null)
+ {
+ return AMQType.VOID.asTypedValue(null);
+ }
+
+ Class klass = val.getClass();
+ if(klass == String.class)
+ {
+ return AMQType.ASCII_STRING.asTypedValue(val);
+ }
+ else if(klass == Character.class)
+ {
+ return AMQType.ASCII_CHARACTER.asTypedValue(val);
+ }
+ else if(klass == Integer.class)
+ {
+ return AMQType.INT.asTypedValue(val);
+ }
+ else if(klass == Long.class)
+ {
+ return AMQType.LONG.asTypedValue(val);
+ }
+ else if(klass == Float.class)
+ {
+ return AMQType.FLOAT.asTypedValue(val);
+ }
+ else if(klass == Double.class)
+ {
+ return AMQType.DOUBLE.asTypedValue(val);
+ }
+ else if(klass == Date.class)
+ {
+ return AMQType.TIMESTAMP.asTypedValue(val);
+ }
+ else if(klass == Byte.class)
+ {
+ return AMQType.BYTE.asTypedValue(val);
+ }
+ else if(klass == Boolean.class)
+ {
+ return AMQType.BOOLEAN.asTypedValue(val);
+ }
+ else if(klass == byte[].class)
+ {
+ return AMQType.BINARY.asTypedValue(val);
+ }
+ else if(klass == BigDecimal.class)
+ {
+ return AMQType.DECIMAL.asTypedValue(val);
+ }
+ else if(val instanceof Map)
+ {
+ return AMQType.FIELD_TABLE.asTypedValue(FieldTable.convertToFieldTable((Map)val));
+ }
+ return null;
+ }
}
Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Mon Aug 31 11:36:26 2009
@@ -828,6 +828,7 @@
recalculateEncodedSize();
}
+
public static interface FieldTableElementProcessor
{
public boolean processElement(String propertyName, AMQTypedValue value);
@@ -1187,4 +1188,24 @@
return _properties.equals(f._properties);
}
+
+ public static FieldTable convertToFieldTable(Map<String, Object> map)
+ {
+ if (map != null)
+ {
+ FieldTable table = new FieldTable();
+ for(Map.Entry<String,Object> entry : map.entrySet())
+ {
+ table.put(new AMQShortString(entry.getKey()), entry.getValue());
+ }
+
+ return table;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Mon Aug 31 11:36:26 2009
@@ -316,7 +316,14 @@
public void dispatch(Method method)
{
Session ssn = getSession(method.getChannel());
- ssn.received(method);
+ if(ssn != null)
+ {
+ ssn.received(method);
+ }
+ else
+ {
+ // TODO
+ }
}
public int getChannelMax()
Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java Mon Aug 31 11:36:26 2009
@@ -179,6 +179,11 @@
}
}
+ public boolean hasCompletionListener()
+ {
+ return completionListener != null;
+ }
+
public String toString()
{
StringBuilder str = new StringBuilder();
Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Mon Aug 31 11:36:26 2009
@@ -617,7 +617,7 @@
{
sessionCommandPoint(0, 0);
}
- if (expiry > 0 && !m.isUnreliable())
+ if ((expiry > 0 && !m.isUnreliable()) || m.hasCompletionListener())
{
commands[mod(next, commands.length)] = m;
commandBytes += m.getBodySize();
Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Mon Aug 31 11:36:26 2009
@@ -137,6 +137,7 @@
}
catch (Throwable t)
{
+ t.printStackTrace();
if (!(shutdownBroken &&
t instanceof SocketException &&
t.getMessage().equalsIgnoreCase("socket closed") &&
Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Mon Aug 31 11:36:26 2009
@@ -33,6 +33,7 @@
private final List<QueueEntry> messages;
private final Object key;
private boolean isSuspended;
+ private AMQQueue.Context _queueContext;
public SubscriptionTestHelper(Object key)
{
@@ -101,11 +102,16 @@
//To change body of implemented methods use File | Settings | File Templates.
}
- public void restoreCredit(final QueueEntry queueEntry)
+ public void onDequeue(final QueueEntry queueEntry)
{
}
+ public void restoreCredit(QueueEntry queueEntry)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void setStateListener(final StateListener listener)
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -116,9 +122,14 @@
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public QueueEntry getLastSeenEntry()
+ public AMQQueue.Context getQueueContext()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _queueContext;
+ }
+
+ public void setQueueContext(AMQQueue.Context queueContext)
+ {
+ _queueContext = queueContext;
}
public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org