You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2010/01/31 01:31:57 UTC
svn commit: r904934 [8/11] - in /qpid/trunk/qpid/java:
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/ broker/src/main/java/org/apache/qpid/qmf/ br...
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java Sun Jan 31 00:31:49 2010
@@ -42,7 +42,7 @@
private static final Object CREATE_QUEUES_KEY = new Object();
private static final Object CREATE_EXCHANGES_KEY = new Object();
-
+
private static final Object CREATE_QUEUE_TEMPORARY_KEY = new Object();
private static final Object CREATE_QUEUE_QUEUES_KEY = new Object();
private static final Object CREATE_QUEUE_EXCHANGES_KEY = new Object();
@@ -64,9 +64,9 @@
}
/**
- *
+ *
* @param permission the type of permission to check
- *
+ *
* @param parameters vararg depending on what permission was passed in
* ACCESS: none
* BIND: none
@@ -113,7 +113,7 @@
{
_fullVHostAccess = true;
}
-
+
private void grantPublish(Permission permission, Object... parameters) {
Map publishRights = (Map) _permissions.get(permission);
@@ -344,9 +344,9 @@
}
/**
- *
+ *
* @param permission the type of permission to check
- *
+ *
* @param parameters vararg depending on what permission was passed in
* ACCESS: none
* BIND: QueueBindBody bindmethod, Exchange exchange, AMQQueue queue, AMQShortString routingKey
@@ -363,7 +363,7 @@
switch (permission)
{
- case ACCESS://No Parameters
+ case ACCESS://No Parameters
return AuthzResult.ALLOWED; // The existence of this user-specific PP infers some level of access is authorised
case BIND: // Parameters : QueueBindMethod , Exchange , AMQQueue, AMQShortString routingKey
return authoriseBind(parameters);
@@ -444,7 +444,7 @@
{
if ( new AMQShortString(queue.getPrincipalHolder().getPrincipal().getName()).equals(_user))
{
- return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ return (queues.size() == 0 || queues.contains(queue.getNameShortString())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
}
else
{
@@ -453,7 +453,7 @@
}
// If we are
- return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ return (queues.size() == 0 || queues.contains(queue.getNameShortString())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
}
}
@@ -486,7 +486,7 @@
// Otherwise exchange must be listed in the white list
// If the map doesn't have the exchange then it isn't allowed
- if (!exchanges.containsKey(((Exchange) parameters[0]).getName()))
+ if (!exchanges.containsKey(((Exchange) parameters[0]).getNameShortString()))
{
return AuthzResult.DENIED;
}
@@ -494,7 +494,7 @@
{
// Get valid routing keys
- HashSet routingKeys = (HashSet) exchanges.get(((Exchange)parameters[0]).getName());
+ HashSet routingKeys = (HashSet) exchanges.get(((Exchange)parameters[0]).getNameShortString());
// Having no routingKeys in the map then all are allowed.
if (routingKeys == null)
@@ -544,7 +544,7 @@
// check the valid exchanges
if (rights == null || rights.containsKey(exchangeName))
{
- return AuthzResult.ALLOWED;
+ return AuthzResult.ALLOWED;
}
else
{
@@ -587,13 +587,13 @@
// If there is a white list then check
if (create_queues_queues == null || create_queues_queues.containsKey(queueName))
{
- return AuthzResult.ALLOWED;
+ return AuthzResult.ALLOWED;
}
else
{
return AuthzResult.DENIED;
}
-
+
}
}
@@ -604,7 +604,7 @@
//user has been granted full access to the vhost
return AuthzResult.ALLOWED;
}
-
+
Exchange exchange = (Exchange) parameters[1];
AMQQueue bind_queueName = (AMQQueue) parameters[2];
@@ -631,7 +631,7 @@
}
// Check to see if we have a white list of routingkeys to check
- Map rkeys = (Map) exchangeDetails.get(exchange.getName());
+ Map rkeys = (Map) exchangeDetails.get(exchange.getNameShortString());
// if keys is null then any rkey is allowed on this exchange
if (rkeys == null)
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java (from r891203, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainInitialiser.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainInitialiser.java&r1=891203&r2=904934&rev=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainInitialiser.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java Sun Jan 31 00:31:49 2010
@@ -18,21 +18,22 @@
* under the License.
*
*/
-package org.apache.qpid.server.security.auth.sasl.amqplain;
+package org.apache.qpid.server.security.auth.sasl.anonymous;
import javax.security.sasl.SaslServerFactory;
import org.apache.qpid.server.security.auth.sasl.UsernamePasswordInitialiser;
+import org.apache.qpid.server.security.auth.sasl.amqplain.AmqPlainSaslServerFactory;
-public class AmqPlainInitialiser extends UsernamePasswordInitialiser
+public class AnonymousInitialiser extends UsernamePasswordInitialiser
{
public String getMechanismName()
{
- return "AMQPLAIN";
+ return "ANONYMOUS";
}
public Class<? extends SaslServerFactory> getServerFactoryClassForJCARegistration()
{
- return AmqPlainSaslServerFactory.class;
+ return AnonymousSaslServerFactory.class;
}
-}
+}
\ No newline at end of file
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java (from r891203, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java&r1=891203&r2=904934&rev=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java Sun Jan 31 00:31:49 2010
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.security.auth.sasl.amqplain;
+package org.apache.qpid.server.security.auth.sasl.anonymous;
import java.io.IOException;
@@ -36,19 +36,14 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
-public class AmqPlainSaslServer implements SaslServer
+public class AnonymousSaslServer implements SaslServer
{
- public static final String MECHANISM = "AMQPLAIN";
-
- private CallbackHandler _cbh;
-
- private String _authorizationId;
+ public static final String MECHANISM = "ANONYMOUS";
private boolean _complete = false;
- public AmqPlainSaslServer(CallbackHandler cbh)
+ public AnonymousSaslServer()
{
- _cbh = cbh;
}
public String getMechanismName()
@@ -58,46 +53,8 @@
public byte[] evaluateResponse(byte[] response) throws SaslException
{
- try
- {
- final FieldTable ft = FieldTableFactory.newFieldTable(ByteBuffer.wrap(response), response.length);
- String username = (String) ft.getString("LOGIN");
- // we do not care about the prompt but it throws if null
- NameCallback nameCb = new NameCallback("prompt", username);
- // we do not care about the prompt but it throws if null
- PasswordCallback passwordCb = new PasswordCallback("prompt", false);
- // TODO: should not get pwd as a String but as a char array...
- String pwd = (String) ft.getString("PASSWORD");
- AuthorizeCallback authzCb = new AuthorizeCallback(username, username);
- Callback[] callbacks = new Callback[]{nameCb, passwordCb, authzCb};
- _cbh.handle(callbacks);
- String storedPwd = new String(passwordCb.getPassword());
- if (storedPwd.equals(pwd))
- {
- _complete = true;
- }
- if (authzCb.isAuthorized() && _complete)
- {
- _authorizationId = authzCb.getAuthenticationID();
- return null;
- }
- else
- {
- throw new SaslException("Authentication failed");
- }
- }
- catch (AMQFrameDecodingException e)
- {
- throw new SaslException("Unable to decode response: " + e, e);
- }
- catch (IOException e)
- {
- throw new SaslException("Error processing data: " + e, e);
- }
- catch (UnsupportedCallbackException e)
- {
- throw new SaslException("Unable to obtain data from callback handler: " + e, e);
- }
+ _complete = true;
+ return null;
}
public boolean isComplete()
@@ -107,7 +64,7 @@
public String getAuthorizationID()
{
- return _authorizationId;
+ return null;
}
public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException
@@ -127,6 +84,5 @@
public void dispose() throws SaslException
{
- _cbh = null;
}
-}
+}
\ No newline at end of file
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java (from r891203, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java&r1=891203&r2=904934&rev=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java Sun Jan 31 00:31:49 2010
@@ -18,7 +18,9 @@
* under the License.
*
*/
-package org.apache.qpid.server.security.auth.sasl.amqplain;
+package org.apache.qpid.server.security.auth.sasl.anonymous;
+
+import org.apache.qpid.server.security.auth.sasl.amqplain.AmqPlainSaslServer;
import java.util.Map;
@@ -28,14 +30,14 @@
import javax.security.sasl.SaslServer;
import javax.security.sasl.SaslServerFactory;
-public class AmqPlainSaslServerFactory implements SaslServerFactory
+public class AnonymousSaslServerFactory implements SaslServerFactory
{
public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map props,
CallbackHandler cbh) throws SaslException
{
- if (AmqPlainSaslServer.MECHANISM.equals(mechanism))
+ if (AnonymousSaslServer.MECHANISM.equals(mechanism))
{
- return new AmqPlainSaslServer(cbh);
+ return new AnonymousSaslServer();
}
else
{
@@ -47,14 +49,15 @@
{
if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
props.containsKey(Sasl.POLICY_NODICTIONARY) ||
- props.containsKey(Sasl.POLICY_NOACTIVE))
+ props.containsKey(Sasl.POLICY_NOACTIVE) ||
+ props.containsKey(Sasl.POLICY_NOANONYMOUS))
{
// returned array must be non null according to interface documentation
return new String[0];
}
else
{
- return new String[]{AmqPlainSaslServer.MECHANISM};
+ return new String[]{AnonymousSaslServer.MECHANISM};
}
}
-}
+}
\ No newline at end of file
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Sun Jan 31 00:31:49 2010
@@ -20,22 +20,24 @@
*/
package org.apache.qpid.server.store;
+import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.commons.configuration.Configuration;
-
import java.io.ByteArrayInputStream;
import java.io.File;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.Driver;
@@ -49,8 +51,6 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.lang.ref.WeakReference;
-import java.nio.ByteBuffer;
public class DerbyMessageStore implements MessageStore
@@ -590,7 +590,10 @@
conn = newConnection();
PreparedStatement stmt = conn.prepareStatement(FIND_EXCHANGE);
- stmt.setString(1, exchange.getName().toString());
+ stmt.setString(1, exchange.getNameShortString().toString());
+ stmt.execute();
+ stmt.close();
+ conn.commit();
ResultSet rs = stmt.executeQuery();
@@ -617,7 +620,7 @@
}
catch (SQLException e)
{
- throw new AMQException("Error writing Exchange with name " + exchange.getName() + " to database: " + e, e);
+ throw new AMQException("Error writing Exchange with name " + exchange.getNameShortString() + " to database: " + e, e);
}
}
@@ -631,11 +634,11 @@
{
conn = newConnection();
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE);
- stmt.setString(1, exchange.getName().toString());
+ stmt.setString(1, exchange.getNameShortString().toString());
int results = stmt.executeUpdate();
if(results == 0)
{
- throw new AMQException("Exchange " + exchange.getName() + " not found");
+ throw new AMQException("Exchange " + exchange.getNameShortString() + " not found");
}
else
{
@@ -645,7 +648,7 @@
}
catch (SQLException e)
{
- throw new AMQException("Error writing deleting with name " + exchange.getName() + " from database: " + e, e);
+ throw new AMQException("Error writing deleting with name " + exchange.getNameShortString() + " from database: " + e, e);
}
finally
{
@@ -677,8 +680,8 @@
conn = newConnection();
PreparedStatement stmt = conn.prepareStatement(FIND_BINDING);
- stmt.setString(1, exchange.getName().toString() );
- stmt.setString(2, queue.getName().toString());
+ stmt.setString(1, exchange.getNameShortString().toString() );
+ stmt.setString(2, queue.getNameShortString().toString());
stmt.setString(3, routingKey == null ? null : routingKey.toString());
ResultSet rs = stmt.executeQuery();
@@ -687,8 +690,8 @@
if (!rs.next())
{
stmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
- stmt.setString(1, exchange.getName().toString() );
- stmt.setString(2, queue.getName().toString());
+ stmt.setString(1, exchange.getNameShortString().toString() );
+ stmt.setString(2, queue.getNameShortString().toString());
stmt.setString(3, routingKey == null ? null : routingKey.toString());
if(args != null)
{
@@ -713,8 +716,8 @@
}
catch (SQLException e)
{
- throw new AMQException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " to database: " + e, e);
+ throw new AMQException("Error writing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
+ + exchange.getNameShortString() + " to database: " + e, e);
}
finally
{
@@ -748,23 +751,23 @@
conn = newConnection();
// exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
- stmt.setString(1, exchange.getName().toString() );
- stmt.setString(2, queue.getName().toString());
+ stmt.setString(1, exchange.getNameShortString().toString() );
+ stmt.setString(2, queue.getNameShortString().toString());
stmt.setString(3, routingKey == null ? null : routingKey.toString());
if(stmt.executeUpdate() != 1)
{
- throw new AMQException("Queue binding for queue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " not found");
+ throw new AMQException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange "
+ + exchange.getNameShortString() + " not found");
}
conn.commit();
stmt.close();
}
catch (SQLException e)
{
- throw new AMQException("Error removing binding for AMQQueue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " in database: " + e, e);
+ throw new AMQException("Error removing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
+ + exchange.getNameShortString() + " in database: " + e, e);
}
finally
{
@@ -801,7 +804,7 @@
Connection conn = newConnection();
PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
- stmt.setString(1, queue.getName().toString());
+ stmt.setString(1, queue.getNameShortString().toString());
ResultSet rs = stmt.executeQuery();
@@ -816,7 +819,7 @@
? null
: queue.getPrincipalHolder().getPrincipal().getName();
- stmt.setString(1, queue.getName().toString());
+ stmt.setString(1, queue.getNameShortString().toString());
stmt.setString(2, owner);
stmt.execute();
@@ -830,7 +833,7 @@
}
catch (SQLException e)
{
- throw new AMQException("Error writing AMQQueue with name " + queue.getName() + " to database: " + e, e);
+ throw new AMQException("Error writing AMQQueue with name " + queue.getNameShortString() + " to database: " + e, e);
}
}
}
@@ -843,7 +846,7 @@
public void removeQueue(final AMQQueue queue) throws AMQException
{
- AMQShortString name = queue.getName();
+ AMQShortString name = queue.getNameShortString();
_logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
Connection conn = null;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Sun Jan 31 00:31:49 2010
@@ -31,6 +31,11 @@
public interface DurableConfigurationStore
{
+ public static interface Source
+ {
+ DurableConfigurationStore getDurableConfigurationStore();
+ }
+
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Sun Jan 31 00:31:49 2010
@@ -20,41 +20,49 @@
*/
package org.apache.qpid.server.subscription;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.Map;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.configuration.SubscriptionConfig;
+import org.apache.qpid.server.configuration.SubscriptionConfigType;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.SubscriptionActor;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
* Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
* that was given out by the broker and the channel id. <p/>
*/
-public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener
+public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener,
+ SubscriptionConfig
{
private StateListener _stateListener = new StateListener()
@@ -85,6 +93,7 @@
private final long _subscriptionID = idGenerator.getAndIncrement();
private LogSubject _logSubject;
private LogActor _logActor;
+ private UUID _id;
static final class BrowserSubscription extends SubscriptionImpl
@@ -152,6 +161,12 @@
return false;
}
+ @Override
+ public boolean isExplicitAcknowledge()
+ {
+ return false;
+ }
+
/**
* This method can be called by each of the publisher threads. As a result all changes to the channel object must be
* thread safe.
@@ -318,9 +333,12 @@
_autoClose = false;
}
-
}
+ public ConfigStore getConfigStore()
+ {
+ return getQueue().getConfigStore();
+ }
public synchronized void setQueue(AMQQueue queue, boolean exclusive)
@@ -331,6 +349,9 @@
}
_queue = queue;
+ _id = getConfigStore().createId();
+ getConfigStore().addConfiguredObject(this);
+
_logSubject = new SubscriptionLogSubject(this);
_logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
@@ -414,7 +435,7 @@
public boolean hasInterest(QueueEntry entry)
{
-
+
//check that the message hasn't been rejected
@@ -505,7 +526,7 @@
{
_stateChangeLock.unlock();
}
-
+ getConfigStore().removeConfiguredObject(this);
//Log Subscription closed
CurrentActor.get().message(_logSubject, SubscriptionMessages.SUB_CLOSE());
@@ -689,4 +710,60 @@
}
abstract boolean isBrowser();
+
+ public String getCreditMode()
+ {
+ return "WINDOW";
+ }
+
+ public SessionConfig getSessionConfig()
+ {
+ return getChannel();
+ }
+
+ public boolean isBrowsing()
+ {
+ return isBrowser();
+ }
+
+ public boolean isExplicitAcknowledge()
+ {
+ return true;
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public SubscriptionConfigType getConfigType()
+ {
+ return SubscriptionConfigType.getInstance();
+ }
+
+ public boolean isExclusive()
+ {
+ return getQueue().hasExclusiveSubscriber();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return getSessionConfig();
+ }
+
+ public String getName()
+ {
+ return String.valueOf(_consumerTag);
+ }
+
+ public Map<String, Object> getArguments()
+ {
+ return null;
+ }
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Sun Jan 31 00:31:49 2010
@@ -36,11 +36,12 @@
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderProperties;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQTypedValue;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.*;
@@ -50,12 +51,10 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
+import java.util.*;
import java.nio.ByteBuffer;
-public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener
+public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig
{
private static final AtomicLong idGenerator = new AtomicLong(0);
@@ -98,6 +97,9 @@
private LogSubject _logSubject;
private LogActor _logActor;
private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+ private UUID _id;
+ private String _traceExclude;
+ private String _trace;
public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode,
@@ -116,7 +118,6 @@
_creditManager.addStateListener(this);
_state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED);
-
}
public void setNoLocal(boolean noLocal)
@@ -146,6 +147,11 @@
throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
}
_queue = queue;
+ Map<String, Object> arguments = queue.getArguments() == null ? Collections.EMPTY_MAP : queue.getArguments();
+ _traceExclude = (String) arguments.get("qpid.trace.exclude");
+ _trace = (String) arguments.get("qpid.trace.id");
+ _id = getConfigStore().createId();
+ getConfigStore().addConfiguredObject(this);
_logSubject = new SubscriptionLogSubject(this);
_logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
@@ -235,6 +241,7 @@
}
}
_creditManager.removeListener(this);
+ getConfigStore().removeConfiguredObject(this);
}
finally
{
@@ -245,6 +252,11 @@
}
+ public ConfigStore getConfigStore()
+ {
+ return getQueue().getConfigStore();
+ }
+
public void creditStateChanged(boolean hasCredit)
{
@@ -290,6 +302,9 @@
MessageTransfer xfr;
+ DeliveryProperties deliveryProps;
+ MessageProperties messageProps = null;
+
if(serverMsg instanceof MessageTransferMessage)
{
@@ -316,11 +331,15 @@
}
else
{
+ if(header instanceof MessageProperties)
+ {
+ messageProps = (MessageProperties) header;
+ }
newHeaders.add(header);
}
}
- DeliveryProperties deliveryProps = new DeliveryProperties();
+ deliveryProps = new DeliveryProperties();
if(origDeliveryProps != null)
{
if(origDeliveryProps.hasDeliveryMode())
@@ -343,21 +362,33 @@
{
deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
}
+ if(origDeliveryProps.hasTimestamp())
+ {
+ deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
+ }
+
}
deliveryProps.setRedelivered(entry.isRedelivered());
newHeaders.add(deliveryProps);
+
+ if(_trace != null && messageProps == null)
+ {
+ messageProps = new MessageProperties();
+ newHeaders.add(messageProps);
+ }
+
Header header = new Header(newHeaders);
xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
}
- else
+ else if(serverMsg instanceof AMQMessage)
{
AMQMessage message_0_8 = (AMQMessage) serverMsg;
- DeliveryProperties deliveryProps = new DeliveryProperties();
- MessageProperties messageProps = new MessageProperties();
+ deliveryProps = new DeliveryProperties();
+ messageProps = new MessageProperties();
int size = (int) message_0_8.getSize();
ByteBuffer body = ByteBuffer.allocate(size);
@@ -399,9 +430,52 @@
messageProps.setUserId(properties.getUserId().getBytes());
}
+ FieldTable fieldTable = properties.getHeaders();
+
+ final Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
+
+
+ messageProps.setApplicationHeaders(appHeaders);
+
+ Header header = new Header(headers);
+ xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
+ }
+ else
+ {
+
+ deliveryProps = new DeliveryProperties();
+ messageProps = new MessageProperties();
+
+ int size = (int) serverMsg.getSize();
+ ByteBuffer body = ByteBuffer.allocate(size);
+ serverMsg.getContent(body, 0);
+ body.flip();
+
+ Struct[] headers = new Struct[] { deliveryProps, messageProps };
+
+
+ deliveryProps.setExpiration(serverMsg.getExpiration());
+ deliveryProps.setImmediate(serverMsg.isImmediate());
+ deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
+ deliveryProps.setRedelivered(entry.isRedelivered());
+ deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
+ deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
+
+ messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
+ messageProps.setContentLength(size);
+ messageProps.setContentType(serverMsg.getMessageHeader().getMimeType());
+ if(serverMsg.getMessageHeader().getCorrelationId() != null)
+ {
+ messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
+ }
+
+
+ // TODO - ReplyTo
+
+
final Map<String, Object> appHeaders = new HashMap<String, Object>();
- properties.getHeaders().processOverElements(
+ /*properties.getHeaders().processOverElements(
new FieldTable.FieldTableElementProcessor()
{
@@ -424,39 +498,98 @@
messageProps.setApplicationHeaders(appHeaders);
-
+*/
Header header = new Header(headers);
xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
}
- if(_acceptMode == MessageAcceptMode.NONE)
+ boolean excludeDueToFederation = false;
+
+ if(_trace != null)
{
- xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
+ if(!messageProps.hasApplicationHeaders())
+ {
+ messageProps.setApplicationHeaders(new HashMap<String,Object>());
+ }
+ Map<String,Object> appHeaders = messageProps.getApplicationHeaders();
+ String trace = (String) appHeaders.get("x-qpid.trace");
+ if(trace == null)
+ {
+ trace = _trace;
+ }
+ else
+ {
+ if(_traceExclude != null)
+ {
+ excludeDueToFederation = Arrays.asList(trace.split(",")).contains(_traceExclude);
+ }
+ trace+=","+_trace;
+ }
+ appHeaders.put("x-qpid.trace",trace);
}
- else if(_flowMode == MessageFlowMode.WINDOW)
+
+ if(!excludeDueToFederation)
{
- xfr.setCompletionListener(new Method.CompletionListener()
- {
- public void onComplete(Method method)
+ if(_acceptMode == MessageAcceptMode.NONE)
+ {
+ xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
+ }
+ else if(_flowMode == MessageFlowMode.WINDOW)
+ {
+ xfr.setCompletionListener(new Method.CompletionListener()
{
- restoreCredit(entry);
- }
- });
- }
+ public void onComplete(Method method)
+ {
+ restoreCredit(entry);
+ }
+ });
+ }
- _postIdSettingAction._xfr = xfr;
- if(_acceptMode == MessageAcceptMode.EXPLICIT)
- {
- _postIdSettingAction._action = new ExplicitAcceptDispositionChangeListener(entry, this);
+ _postIdSettingAction._xfr = xfr;
+ if(_acceptMode == MessageAcceptMode.EXPLICIT)
+ {
+ _postIdSettingAction._action = new ExplicitAcceptDispositionChangeListener(entry, this);
+ }
+ else
+ {
+ _postIdSettingAction._action = new ImplicitAcceptDispositionChangeListener(entry, this);
+ }
+
+ _session.sendMessage(xfr, _postIdSettingAction);
+
+ if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ forceDequeue(entry, false);
+ }
}
else
{
- _postIdSettingAction._action = new ImplicitAcceptDispositionChangeListener(entry, this);
+ forceDequeue(entry, _flowMode == MessageFlowMode.WINDOW);
+
}
+ }
+
+ private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
+ {
+ ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+ txn.dequeue(entry.getQueue(),entry.getMessage(),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ if(restoreCredit)
+ {
+ restoreCredit(entry);
+ }
+ entry.discard();
+ }
- _session.sendMessage(xfr, _postIdSettingAction);
+ public void onRollback()
+ {
+ }
+ });
}
void reject(QueueEntry entry)
@@ -660,4 +793,59 @@
}
+ public SessionConfig getSessionConfig()
+ {
+ return getSession();
+ }
+
+ public boolean isBrowsing()
+ {
+ return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
+ }
+
+ public boolean isExclusive()
+ {
+ return getQueue().hasExclusiveSubscriber();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return getSessionConfig();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public SubscriptionConfigType getConfigType()
+ {
+ return SubscriptionConfigType.getInstance();
+ }
+
+ public boolean isExplicitAcknowledge()
+ {
+ return _acceptMode == MessageAcceptMode.EXPLICIT;
+ }
+
+ public String getCreditMode()
+ {
+ return _flowMode.toString();
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public String getName()
+ {
+ return _destination;
+ }
+
+ public Map<String, Object> getArguments()
+ {
+ //TODO
+ return Collections.EMPTY_MAP;
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Sun Jan 31 00:31:49 2010
@@ -20,12 +20,20 @@
*/
package org.apache.qpid.server.transport;
+import org.apache.qpid.server.configuration.ConnectionConfig;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.Method;
-import org.apache.qpid.server.virtualhost.VirtualHost;
public class ServerConnection extends Connection
{
+ private ConnectionConfig _config;
+ private Runnable _onOpenTask;
+
+ public ServerConnection()
+ {
+ }
+
@Override
protected void invoke(Method method)
{
@@ -36,6 +44,10 @@
protected void setState(State state)
{
super.setState(state);
+ if(state == State.OPEN && _onOpenTask != null)
+ {
+ _onOpenTask.run();
+ }
}
@Override
@@ -61,4 +73,19 @@
{
_virtualHost = virtualHost;
}
+
+ public void setConnectionConfig(final ConnectionConfig config)
+ {
+ _config = config;
+ }
+
+ public ConnectionConfig getConfig()
+ {
+ return _config;
+ }
+
+ public void onOpen(final Runnable task)
+ {
+ _onOpenTask = task;
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Sun Jan 31 00:31:49 2010
@@ -40,7 +40,7 @@
public ServerConnectionDelegate(IApplicationRegistry appRegistry,
String localFQDN)
{
- this(Collections.EMPTY_MAP, Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
+ this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Sun Jan 31 00:31:49 2010
@@ -20,32 +20,55 @@
*/
package org.apache.qpid.server.transport;
-import org.apache.qpid.transport.*;
+import com.sun.security.auth.UserPrincipal;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.ConnectionConfig;
+import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.configuration.SessionConfigType;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription_0_10;
-import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
-import org.apache.qpid.server.security.PrincipalHolder;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionDelegate;
+import static org.apache.qpid.util.Serial.gt;
-import java.util.*;
+import java.lang.ref.WeakReference;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ConcurrentHashMap;
-import java.security.Principal;
-import java.lang.ref.WeakReference;
-import static org.apache.qpid.util.Serial.*;
-import com.sun.security.auth.UserPrincipal;
-
-public class ServerSession extends Session implements PrincipalHolder
+public class ServerSession extends Session implements PrincipalHolder, SessionConfig
{
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
+ private final UUID _id;
+ private ConnectionConfig _connectionConfig;
+
public static interface MessageDispositionChangeListener
{
public void onAccept();
@@ -78,37 +101,41 @@
private final WeakReference<Session> _reference;
-
- ServerSession(Connection connection, Binary name, long expiry)
+ ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
- super(connection, name, expiry);
-
- _transaction = new AutoCommitTransaction(this.getMessageStore());
- _principal = new UserPrincipal(connection.getAuthorizationID());
- _reference = new WeakReference(this);
+ this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
}
- ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
+ public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
{
super(connection, delegate, name, expiry);
+ _connectionConfig = connConfig;
_transaction = new AutoCommitTransaction(this.getMessageStore());
_principal = new UserPrincipal(connection.getAuthorizationID());
_reference = new WeakReference(this);
+ _id = getConfigStore().createId();
+ getConfigStore().addConfiguredObject(this);
+ }
+
+ private ConfigStore getConfigStore()
+ {
+ return getConnectionConfig().getConfigStore();
}
+
@Override
protected boolean isFull(int id)
{
return isCommandsFull(id);
}
- public void enqueue(final ServerMessage message, final ArrayList<AMQQueue> queues)
+ public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
{
_transaction.enqueue(queues,message, new ServerTransaction.Action()
{
- AMQQueue[] _queues = queues.toArray(new AMQQueue[queues.size()]);
+ BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
public void postCommit()
{
@@ -243,7 +270,7 @@
Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
Iterator<Range> rangeIter = ranges.iterator();
- if(rangeIter.hasNext())
+ if(rangeIter.hasNext())
{
Range range = rangeIter.next();
@@ -290,6 +317,8 @@
}
_messageDispositionListenerMap.clear();
+ getConfigStore().removeConfiguredObject(this);
+
for (Task task : _taskList)
{
task.doTask(this);
@@ -391,8 +420,61 @@
public MessageStore getMessageStore()
{
- return ((ServerConnection)getConnection()).getVirtualHost().getMessageStore();
+ return getVirtualHost().getMessageStore();
}
+ public VirtualHost getVirtualHost()
+ {
+ return (VirtualHost) _connectionConfig.getVirtualHost();
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public SessionConfigType getConfigType()
+ {
+ return SessionConfigType.getInstance();
+ }
+ public ConfiguredObject getParent()
+ {
+ return getVirtualHost();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public boolean isAttached()
+ {
+ return true;
+ }
+
+ public long getDetachedLifespan()
+ {
+ return 0;
+ }
+
+ public Long getExpiryTime()
+ {
+ return null;
+ }
+
+ public Long getMaxClientRate()
+ {
+ return null;
+ }
+
+ public ConnectionConfig getConnectionConfig()
+ {
+ return _connectionConfig;
+ }
+
+ public String getSessionName()
+ {
+ return getName().toString();
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Sun Jan 31 00:31:49 2010
@@ -28,6 +28,7 @@
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.MessageMetaData_0_10;
import org.apache.qpid.server.subscription.Subscription_0_10;
@@ -42,6 +43,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
+import java.nio.ByteBuffer;
public class ServerSessionDelegate extends SessionDelegate
{
@@ -218,11 +220,15 @@
MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
final MessageStore store = getVirtualHost(ssn).getMessageStore();
StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
- storeMessage.addContent(0,xfr.getBody());
+ ByteBuffer body = xfr.getBody();
+ if(body != null)
+ {
+ storeMessage.addContent(0, body);
+ }
storeMessage.flushToStore();
MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
- ArrayList<AMQQueue> queues = exchange.route(message);
+ ArrayList<? extends BaseQueue> queues = exchange.route(message);
@@ -355,7 +361,7 @@
else
{
// TODO - check exchange has same properties
- if(!exchange.getType().toString().equals(method.getType()))
+ if(!exchange.getTypeShortString().toString().equals(method.getType()))
{
exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
}
@@ -419,7 +425,7 @@
}
else
{
- if(!exchange.getType().toString().equals(method.getType()))
+ if(!exchange.getTypeShortString().toString().equals(method.getType()))
{
exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
}
@@ -525,7 +531,7 @@
if(exchange != null)
{
result.setDurable(exchange.isDurable());
- result.setType(exchange.getType().toString());
+ result.setType(exchange.getTypeShortString().toString());
result.setNotFound(false);
}
else
@@ -582,30 +588,23 @@
+ "' to Queue: '" + method.getQueue()
+ "' not allowed");
}
- else if(exchange.getType().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
+ else if(exchange.getTypeShortString().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
{
exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header");
}
else
{
- try
- {
- AMQShortString routingKey = new AMQShortString(method.getBindingKey());
- FieldTable fieldTable = FieldTable.convertToFieldTable(method.getArguments());
+ AMQShortString routingKey = new AMQShortString(method.getBindingKey());
+ FieldTable fieldTable = FieldTable.convertToFieldTable(method.getArguments());
- if (!exchange.isBound(routingKey, fieldTable, queue))
- {
- queue.bind(exchange, routingKey, fieldTable);
+ if (!exchange.isBound(routingKey, fieldTable, queue))
+ {
+ virtualHost.getBindingFactory().addBinding(method.getBindingKey(), queue, exchange, method.getArguments());
- }
- else
- {
- // todo
- }
}
- catch (AMQException e)
+ else
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ // todo
}
}
@@ -649,14 +648,7 @@
}
else
{
- try
- {
- queue.unBind(exchange, new AMQShortString(method.getBindingKey()), null);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
+ virtualHost.getBindingFactory().removeBinding(method.getBindingKey(), queue, exchange, null);
}
}
@@ -827,7 +819,7 @@
{
queue.setDeleteOnNoConsumers(true);
}
-
+
final String alternateExchangeName = method.getAlternateExchange();
if(alternateExchangeName != null && alternateExchangeName.length() != 0)
{
@@ -870,11 +862,12 @@
if (autoRegister)
{
+
ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
- queue.bind(defaultExchange, new AMQShortString(queueName), null);
+ virtualHost.getBindingFactory().addBinding(queueName, queue, defaultExchange, null);
}
@@ -1114,7 +1107,7 @@
if(queue != null)
{
- result.setQueue(queue.getName().toString());
+ result.setQueue(queue.getNameShortString().toString());
result.setDurable(queue.isDurable());
result.setExclusive(queue.isExclusive());
result.setAutoDelete(queue.isAutoDelete());
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Sun Jan 31 00:31:49 2010
@@ -22,6 +22,7 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLog;
@@ -46,7 +47,7 @@
postCommitAction.postCommit();
}
- public void dequeue(AMQQueue queue, EnqueableMessage message, Action postCommitAction)
+ public void dequeue(BaseQueue queue, EnqueableMessage message, Action postCommitAction)
{
try
@@ -105,7 +106,7 @@
}
- public void enqueue(AMQQueue queue, EnqueableMessage message, Action postCommitAction)
+ public void enqueue(BaseQueue queue, EnqueableMessage message, Action postCommitAction)
{
try
{
@@ -128,7 +129,7 @@
}
- public void enqueue(List<AMQQueue> queues, EnqueableMessage message, Action postCommitAction)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postCommitAction)
{
try
{
@@ -137,7 +138,7 @@
{
TransactionLog.Transaction txn = _transactionLog.newTransaction();
Long id = message.getMessageNumber();
- for(AMQQueue q : queues)
+ for(BaseQueue q : queues)
{
if(q.isDurable())
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Sun Jan 31 00:31:49 2010
@@ -2,6 +2,7 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLog;
@@ -28,7 +29,7 @@
_postCommitActions.add(postCommitAction);
}
- public void dequeue(AMQQueue queue, EnqueableMessage message, Action postCommitAction)
+ public void dequeue(BaseQueue queue, EnqueableMessage message, Action postCommitAction)
{
if(message.isPersistent() && queue.isDurable())
{
@@ -113,7 +114,7 @@
}
}
- public void enqueue(AMQQueue queue, EnqueableMessage message, Action postCommitAction)
+ public void enqueue(BaseQueue queue, EnqueableMessage message, Action postCommitAction)
{
if(message.isPersistent() && queue.isDurable())
{
@@ -132,7 +133,7 @@
}
- public void enqueue(List<AMQQueue> queues, EnqueableMessage message, Action postCommitAction)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postCommitAction)
{
@@ -140,7 +141,7 @@
{
if(_transaction == null)
{
- for(AMQQueue queue : queues)
+ for(BaseQueue queue : queues)
{
if(queue.isDurable())
{
@@ -155,7 +156,7 @@
try
{
- for(AMQQueue queue : queues)
+ for(BaseQueue queue : queues)
{
if(queue.isDurable())
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Sun Jan 31 00:31:49 2010
@@ -20,15 +20,12 @@
*/
package org.apache.qpid.server.txn;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
-import java.util.List;
-import java.util.SortedSet;
import java.util.Collection;
+import java.util.List;
public interface ServerTransaction
{
@@ -45,13 +42,13 @@
public void onRollback();
}
- void dequeue(AMQQueue queue, EnqueableMessage message, Action postCommitAction);
+ void dequeue(BaseQueue queue, EnqueableMessage message, Action postCommitAction);
void dequeue(Collection<QueueEntry> ackedMessages, Action postCommitAction);
- void enqueue(AMQQueue queue, EnqueableMessage message, Action postCommitAction);
+ void enqueue(BaseQueue queue, EnqueableMessage message, Action postCommitAction);
- void enqueue(List<AMQQueue> queues, EnqueableMessage message, Action postCommitAction);
+ void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postCommitAction);
void commit();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Sun Jan 31 00:31:49 2010
@@ -21,7 +21,10 @@
package org.apache.qpid.server.virtualhost;
import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfig;
+import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.ExchangeFactory;
@@ -31,8 +34,13 @@
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.binding.BindingFactory;
-public interface VirtualHost
+import java.util.UUID;
+import java.util.TimerTask;
+
+public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig
{
IConnectionRegistry getConnectionRegistry();
@@ -59,4 +67,24 @@
void close() throws Exception;
ManagedObject getManagedObject();
+
+ UUID getBrokerId();
+
+ void scheduleTask(long period, TimerTask task);
+
+
+ IApplicationRegistry getApplicationRegistry();
+
+ BindingFactory getBindingFactory();
+
+ void createBrokerConnection(String transport,
+ String host,
+ int port,
+ String vhost,
+ boolean durable,
+ String authMechanism, String username, String password);
+
+ ConfigStore getConfigStore();
+
+ void removeBrokerConnection(BrokerLink brokerLink);
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Sun Jan 31 00:31:49 2010
@@ -34,10 +34,10 @@
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
@@ -215,7 +215,7 @@
if (queue == null)
{
_logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: "
- + exchange.getName());
+ + exchange.getNameShortString());
}
else
{
@@ -227,10 +227,18 @@
argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit());
}
- _logger.info("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + queueName
- + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")");
+ BindingFactory bf = _virtualHost.getBindingFactory();
- queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT);
+ Map<String, Object> argumentMap = FieldTable.convertToMap(argumentsFT);
+
+ if(bf.getBinding(bindingKey, queue, exchange, argumentMap) == null)
+ {
+
+ _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queueName
+ + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")");
+
+ bf.restoreBinding(bindingKey, queue, exchange, argumentMap);
+ }
}
}
@@ -271,7 +279,7 @@
if (_logger.isDebugEnabled())
{
- _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getName());
+ _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getNameShortString());
}
Integer count = _queueRecoveries.get(queueName);
@@ -286,7 +294,7 @@
}
else
{
- _logger.warn("Message id " + messageId + " referenced in log as enqueue in queue " + queue.getName() + " is unknwon, entry will be discarded");
+ _logger.warn("Message id " + messageId + " referenced in log as enqueue in queue " + queue.getNameShortString() + " is unknwon, entry will be discarded");
TransactionLog.Transaction txn = _transactionLog.newTransaction();
txn.dequeueMessage(queue, messageId);
txn.commitTranAsync();
@@ -333,7 +341,7 @@
CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1005(entry.getValue(), entry.getKey()));
CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1006(entry.getKey(), true));
- }
+ }
CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1006(null, false));
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Sun Jan 31 00:31:49 2010
@@ -20,19 +20,21 @@
*/
package org.apache.qpid.server.virtualhost;
-import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQBrokerManagerMBean;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.VirtualHostMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.binding.BindingFactory;
+import org.apache.qpid.server.configuration.BrokerConfig;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfigType;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.ConnectionRegistry;
import org.apache.qpid.server.connection.IConnectionRegistry;
@@ -41,6 +43,11 @@
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.federation.BrokerLink;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.VirtualHostMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.AMQQueue;
@@ -48,14 +55,15 @@
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.Accessable;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import javax.management.NotCompliantMBeanException;
import java.util.Collections;
@@ -63,6 +71,9 @@
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
public class VirtualHostImpl implements Accessable, VirtualHost
{
@@ -88,9 +99,17 @@
private ACLManager _accessManager;
- private final Timer _houseKeepingTimer;
+ private final Timer _timer;
+ private final IApplicationRegistry _appRegistry;
private VirtualHostConfiguration _configuration;
private DurableConfigurationStore _durableConfigurationStore;
+ private BindingFactory _bindingFactory;
+ private BrokerConfig _broker;
+ private UUID _id;
+
+
+ private final long _createTime = System.currentTimeMillis();
+ private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
public void setAccessableName(String name)
{
@@ -113,6 +132,26 @@
return _configuration;
}
+ public UUID getId()
+ {
+ return _id; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public VirtualHostConfigType getConfigType()
+ {
+ return VirtualHostConfigType.getInstance();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return getBroker();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
/**
* Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
* implementaion of an Exchange MBean should extend this class.
@@ -141,23 +180,28 @@
} // End of MBean class
- /**
- * Normal Constructor
- *
- * @param hostConfig
- *
- * @throws Exception
- */
- public VirtualHostImpl(VirtualHostConfiguration hostConfig) throws Exception
+
+
+ public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
{
- this(hostConfig, null);
+ this(appRegistry, hostConfig, null);
}
+
public VirtualHostImpl(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
{
+ this(ApplicationRegistry.getInstance(),hostConfig,store);
+ }
+
+ private VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
+ {
+ _appRegistry = appRegistry;
+ _broker = appRegistry.getBroker();
_configuration = hostConfig;
_name = hostConfig.getName();
+ _id = appRegistry.getConfigStore().createId();
+
CurrentActor.get().message(VirtualHostMessages.VHT_CREATED(_name));
if (_name == null || _name.length() == 0)
@@ -169,7 +213,7 @@
_connectionRegistry = new ConnectionRegistry(this);
- _houseKeepingTimer = new Timer("Queue-housekeeping-" + _name, true);
+ _timer = new Timer("TimerThread-" + _name + ":", true);
_queueRegistry = new DefaultQueueRegistry(this);
@@ -178,6 +222,7 @@
_exchangeRegistry = new DefaultExchangeRegistry(this);
+
//Create a temporary RT to store the durable entries from the config file
// so we can replay them in to the real _RT after it has been loaded.
/// This should be removed after the _RT has been fully split from the the TL
@@ -189,6 +234,8 @@
// This needs to be after the RT has been defined as it creates the default durable exchanges.
_exchangeRegistry.initialise();
+ _bindingFactory = new BindingFactory(this);
+
initialiseModel(hostConfig);
if (store != null)
@@ -205,9 +252,11 @@
initialiseMessageStore(hostConfig);
}
+
+
//Now that the RT has been initialised loop through the persistent queues/exchanges created from the config
// file and write them in to the new routing Table.
- for (StartupRoutingTable.CreateQueueTuple cqt : configFileRT.queue)
+/* for (StartupRoutingTable.CreateQueueTuple cqt : configFileRT.queue)
{
getDurableConfigurationStore().createQueue(cqt.queue, cqt.arguments);
}
@@ -220,7 +269,7 @@
for (StartupRoutingTable.CreateBindingTuple cbt : configFileRT.bindings)
{
getDurableConfigurationStore().bindQueue(cbt.exchange, cbt.routingKey, cbt.queue, cbt.arguments);
- }
+ }*/
_authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, hostConfig);
@@ -250,7 +299,7 @@
}
catch (Exception e)
{
- _logger.error("Exception in housekeeping for queue: " + q.getName().toString(), e);
+ _logger.error("Exception in housekeeping for queue: " + q.getNameShortString().toString(), e);
//Don't throw exceptions as this will stop the
// house keeping task from running.
}
@@ -258,9 +307,8 @@
}
}
- _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
- period / 2,
- period);
+ final TimerTask expiredMessagesTask = new RemoveExpiredMessagesTask();
+ scheduleTask(period, expiredMessagesTask);
class ForceChannelClosuresTask extends TimerTask
{
@@ -272,6 +320,12 @@
}
}
+ public void scheduleTask(final long period, final TimerTask task)
+ {
+ _timer.scheduleAtFixedRate(task, period / 2, period);
+ }
+
+
private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception
{
String messageStoreClass = hostConfig.getMessageStoreClass();
@@ -377,7 +431,7 @@
List routingKeys = queueConfiguration.getRoutingKeys();
if (routingKeys == null || routingKeys.isEmpty())
{
- routingKeys = Collections.singletonList(queue.getName());
+ routingKeys = Collections.singletonList(queue.getNameShortString());
}
for (Object routingKeyNameObj : routingKeys)
@@ -387,12 +441,12 @@
{
_logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this);
}
- queue.bind(exchange, routingKey, null);
+ _bindingFactory.addBinding(routingKey.toString(), queue, exchange, null);
}
if (exchange != _exchangeRegistry.getDefaultExchange())
{
- queue.bind(_exchangeRegistry.getDefaultExchange(), queue.getName(), null);
+ _bindingFactory.addBinding(queue.getNameShortString().toString(), queue, exchange, null);
}
}
@@ -401,6 +455,26 @@
return _name;
}
+ public BrokerConfig getBroker()
+ {
+ return _broker;
+ }
+
+ public String getFederationTag()
+ {
+ return _broker.getFederationTag();
+ }
+
+ public void setBroker(final BrokerConfig broker)
+ {
+ _broker = broker;
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
public QueueRegistry getQueueRegistry()
{
return _queueRegistry;
@@ -457,9 +531,9 @@
}
//Stop Housekeeping
- if (_houseKeepingTimer != null)
+ if (_timer != null)
{
- _houseKeepingTimer.cancel();
+ _timer.cancel();
}
//Close MessageStore
@@ -481,6 +555,59 @@
return _virtualHostMBean;
}
+ public UUID getBrokerId()
+ {
+ return _appRegistry.getBrokerId();
+ }
+
+ public IApplicationRegistry getApplicationRegistry()
+ {
+ return _appRegistry;
+ }
+
+ public BindingFactory getBindingFactory()
+ {
+ return _bindingFactory;
+ }
+
+ public void createBrokerConnection(final String transport,
+ final String host,
+ final int port,
+ final String vhost,
+ final boolean durable,
+ final String authMechanism,
+ final String username,
+ final String password)
+ {
+ BrokerLink blink = new BrokerLink(this, transport, host, port, vhost, durable, authMechanism, username, password);
+ _links.putIfAbsent(blink,blink);
+ getConfigStore().addConfiguredObject(blink);
+ }
+
+ public void removeBrokerConnection(final String transport,
+ final String host,
+ final int port,
+ final String vhost)
+ {
+ removeBrokerConnection(new BrokerLink(this, transport, host, port, vhost, false, null,null,null));
+
+ }
+
+ public void removeBrokerConnection(BrokerLink blink)
+ {
+ blink = _links.get(blink);
+ if(blink != null)
+ {
+ blink.close();
+ getConfigStore().removeConfiguredObject(blink);
+ }
+ }
+
+ public ConfigStore getConfigStore()
+ {
+ return getApplicationRegistry().getConfigStore();
+ }
+
/**
* Temporary Startup RT class to record the creation of persistent queues / exchanges.
*
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java Sun Jan 31 00:31:49 2010
@@ -21,6 +21,7 @@
package org.apache.qpid.server.virtualhost;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.configuration.ConfigStore;
import java.util.ArrayList;
import java.util.Collection;
@@ -52,7 +53,7 @@
public VirtualHost getVirtualHost(String name)
{
- if(name == null || name.trim().length() == 0 )
+ if(name == null || name.trim().length() == 0 || "/".equals(name.trim()))
{
name = getDefaultVirtualHostName();
}
@@ -60,6 +61,11 @@
return _registry.get(name);
}
+ public VirtualHost getDefaultVirtualHost()
+ {
+ return getVirtualHost(getDefaultVirtualHostName());
+ }
+
private String getDefaultVirtualHostName()
{
return _defaultVirtualHostName;
@@ -80,4 +86,9 @@
{
return _applicationRegistry;
}
+
+ public ConfigStore getConfigStore()
+ {
+ return _applicationRegistry.getConfigStore();
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java Sun Jan 31 00:31:49 2010
@@ -219,7 +219,7 @@
_console.println("");
- _console.println(BOILER_PLATE);
+ _console.println(BOILER_PLATE);
runCLI();
}
@@ -495,13 +495,13 @@
if (_exchange != null)
{
status.append("[");
- status.append(_exchange.getName());
+ status.append(_exchange.getNameShortString());
status.append("]");
if (_queue != null)
{
status.append("->'");
- status.append(_queue.getName());
+ status.append(_queue.getNameShortString());
status.append("'");
if (_msgids != null)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org