You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/08/27 16:39:52 UTC
svn commit: r808436 - in
/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server:
exchange/DefaultExchangeFactory.java transport/ServerSessionDelegate.java
Author: rgodfrey
Date: Thu Aug 27 14:39:51 2009
New Revision: 808436
URL: http://svn.apache.org/viewvc?rev=808436&view=rev
Log:
More updates
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=808436&r1=808435&r2=808436&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Thu Aug 27 14:39:51 2009
@@ -62,7 +62,7 @@
public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete)
throws AMQException
{
- ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(type);
+ ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(new AMQShortString(type));
if (exchType == null)
{
@@ -106,7 +106,7 @@
return;
}
Class<? extends ExchangeType> exchangeTypeClass = exchangeType.getClass();
- ExchangeType type = exchangeTypeClass.newInstance();
+ ExchangeType<? extends ExchangeType> type = exchangeTypeClass.newInstance();
registerExchangeType(type);
}
catch (ClassCastException classCastEx)
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=808436&r1=808435&r2=808436&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Thu Aug 27 14:39:51 2009
@@ -102,33 +102,65 @@
@Override
public void messageSubscribe(Session session, MessageSubscribe method)
{
- String destination = method.getDestination();
- String queueName = method.getQueue();
- QueueRegistry queueRegistry = getQueueRegistry(session);
- AMQQueue queue = queueRegistry.getQueue(queueName);
+ //TODO - work around broken Python tests
+ if(!method.hasAcceptMode())
+ {
+ method.setAcceptMode(MessageAcceptMode.EXPLICIT);
+ }
+ if(!method.hasAcquireMode())
+ {
+ method.setAcquireMode(MessageAcquireMode.PRE_ACQUIRED);
- //TODO null check
+ }
+ /* if(!method.hasAcceptMode())
+ {
+ exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "Accept-mode not supplied");
+ }
+ else if(!method.hasAcquireMode())
+ {
+ exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "Acquire-mode not supplied");
+ }
+ else */if(!method.hasQueue())
+ {
+ exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied");
+ }
+ else
+ {
+ String destination = method.getDestination();
+ String queueName = method.getQueue();
+ QueueRegistry queueRegistry = getQueueRegistry(session);
- FlowCreditManager_0_10 creditManager = new CreditCreditManager(0L,0L);
- // TODO filters
+ AMQQueue queue = queueRegistry.getQueue(queueName);
- Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null);
+ if(queue == null)
+ {
+ exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
+ }
+ else
+ {
- ((ServerSession)session).register(destination, sub);
- try
- {
- queue.registerSubscription(sub, method.getExclusive());
- }
- catch (AMQException e)
- {
- // TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- throw new RuntimeException(e);
- }
+ FlowCreditManager_0_10 creditManager = new CreditCreditManager(0L,0L);
+
+ // TODO filters
+ Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null);
+
+ ((ServerSession)session).register(destination, sub);
+ try
+ {
+ queue.registerSubscription(sub, method.getExclusive());
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
@@ -266,11 +298,13 @@
}
- else
+ else if(exchange == null)
{
ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+
+
try
{
@@ -293,6 +327,10 @@
}
}
+ else
+ {
+ // TODO check same as declared
+ }
}
}
@@ -371,7 +409,23 @@
@Override
public void exchangeQuery(Session session, ExchangeQuery method)
{
- super.exchangeQuery(session, method);
+
+ ExchangeQueryResult result = new ExchangeQueryResult();
+
+ Exchange exchange = getExchange(session, method.getName());
+
+ if(exchange != null)
+ {
+ result.setDurable(exchange.isDurable());
+ result.setType(exchange.getType().toString());
+ result.setNotFound(false);
+ }
+ else
+ {
+ result.setNotFound(true);
+ }
+
+ session.executionResult((int) method.getId(), result);
}
@Override
@@ -462,7 +516,7 @@
session.executionResult((int) method.getId(), result);
- super.exchangeBound(session, method);
+
}
private AMQQueue getQueue(Session session, String queue)
@@ -551,7 +605,7 @@
}
}
}
- else if (queue.getOwner() != null && !((ServerSession)session).getPrincipal().getName().equals(queue.getOwner()))
+ else if (method.getExclusive() && (queue.getOwner() != null && !queue.getOwner().equals(((ServerSession)session).getPrincipal().getName())))
{
String description = "Cannot declare queue('" + queueName + "'),"
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org