You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2010/01/31 01:31:57 UTC

svn commit: r904934 [8/11] - in /qpid/trunk/qpid/java: broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/ broker/src/main/java/org/apache/qpid/qmf/ br...

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java Sun Jan 31 00:31:49 2010
@@ -42,7 +42,7 @@
     private static final Object CREATE_QUEUES_KEY = new Object();
     private static final Object CREATE_EXCHANGES_KEY = new Object();
 
-    
+
     private static final Object CREATE_QUEUE_TEMPORARY_KEY = new Object();
     private static final Object CREATE_QUEUE_QUEUES_KEY = new Object();
     private static final Object CREATE_QUEUE_EXCHANGES_KEY = new Object();
@@ -64,9 +64,9 @@
     }
 
     /**
-     * 
+     *
      * @param permission the type of permission to check
-     * 
+     *
      * @param parameters vararg depending on what permission was passed in
      *  ACCESS: none
      *  BIND: none
@@ -113,7 +113,7 @@
     {
         _fullVHostAccess = true;
     }
-    
+
 	private void grantPublish(Permission permission, Object... parameters) {
 		Map publishRights = (Map) _permissions.get(permission);
 
@@ -344,9 +344,9 @@
     }
 
     /**
-     * 
+     *
      * @param permission the type of permission to check
-     * 
+     *
      * @param parameters vararg depending on what permission was passed in
      *  ACCESS: none
      *  BIND: QueueBindBody bindmethod, Exchange exchange, AMQQueue queue, AMQShortString routingKey
@@ -363,7 +363,7 @@
 
         switch (permission)
         {
-            case ACCESS://No Parameters 
+            case ACCESS://No Parameters
                 return AuthzResult.ALLOWED; // The existence of this user-specific PP infers some level of access is authorised
             case BIND: // Parameters : QueueBindMethod , Exchange , AMQQueue, AMQShortString routingKey
                 return authoriseBind(parameters);
@@ -444,7 +444,7 @@
                 {
                     if ( new AMQShortString(queue.getPrincipalHolder().getPrincipal().getName()).equals(_user))
                     {
-                        return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+                        return (queues.size() == 0 || queues.contains(queue.getNameShortString())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
                     }
                     else
                     {
@@ -453,7 +453,7 @@
                 }
 
                 // If we are
-                return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+                return (queues.size() == 0 || queues.contains(queue.getNameShortString())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
             }
         }
 
@@ -486,7 +486,7 @@
 		// Otherwise exchange must be listed in the white list
 
 		// If the map doesn't have the exchange then it isn't allowed
-		if (!exchanges.containsKey(((Exchange) parameters[0]).getName()))
+		if (!exchanges.containsKey(((Exchange) parameters[0]).getNameShortString()))
 		{
 		    return AuthzResult.DENIED;
 		}
@@ -494,7 +494,7 @@
 		{
 
 		    // Get valid routing keys
-		    HashSet routingKeys = (HashSet) exchanges.get(((Exchange)parameters[0]).getName());
+		    HashSet routingKeys = (HashSet) exchanges.get(((Exchange)parameters[0]).getNameShortString());
 
 		    // Having no routingKeys in the map then all are allowed.
 		    if (routingKeys == null)
@@ -544,7 +544,7 @@
 		// check the valid exchanges
 		if (rights == null || rights.containsKey(exchangeName))
 		{
-		    return AuthzResult.ALLOWED; 
+		    return AuthzResult.ALLOWED;
 		}
 		else
 		{
@@ -587,13 +587,13 @@
 		    // If there is a white list then check
 		    if (create_queues_queues == null || create_queues_queues.containsKey(queueName))
 		    {
-		        return AuthzResult.ALLOWED; 
+		        return AuthzResult.ALLOWED;
 		    }
 		    else
 		    {
 		        return AuthzResult.DENIED;
 		    }
-		        
+
 		}
 	}
 
@@ -604,7 +604,7 @@
             //user has been granted full access to the vhost
             return AuthzResult.ALLOWED;
         }
-	    
+
         Exchange exchange = (Exchange) parameters[1];
 
 		AMQQueue bind_queueName = (AMQQueue) parameters[2];
@@ -631,7 +631,7 @@
 		    }
 
 		    // Check to see if we have a white list of routingkeys to check
-		    Map rkeys = (Map) exchangeDetails.get(exchange.getName());
+		    Map rkeys = (Map) exchangeDetails.get(exchange.getNameShortString());
 
 		    // if keys is null then any rkey is allowed on this exchange
 		    if (rkeys == null)

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java (from r891203, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainInitialiser.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainInitialiser.java&r1=891203&r2=904934&rev=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainInitialiser.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java Sun Jan 31 00:31:49 2010
@@ -18,21 +18,22 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.security.auth.sasl.amqplain;
+package org.apache.qpid.server.security.auth.sasl.anonymous;
 
 import javax.security.sasl.SaslServerFactory;
 
 import org.apache.qpid.server.security.auth.sasl.UsernamePasswordInitialiser;
+import org.apache.qpid.server.security.auth.sasl.amqplain.AmqPlainSaslServerFactory;
 
-public class AmqPlainInitialiser extends UsernamePasswordInitialiser
+public class AnonymousInitialiser extends UsernamePasswordInitialiser
 {
     public String getMechanismName()
     {
-        return "AMQPLAIN";
+        return "ANONYMOUS";
     }
 
     public Class<? extends SaslServerFactory> getServerFactoryClassForJCARegistration()
     {
-        return AmqPlainSaslServerFactory.class;
+        return AnonymousSaslServerFactory.class;
     }
-}
+}
\ No newline at end of file

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java (from r891203, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java&r1=891203&r2=904934&rev=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java Sun Jan 31 00:31:49 2010
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.security.auth.sasl.amqplain;
+package org.apache.qpid.server.security.auth.sasl.anonymous;
 
 import java.io.IOException;
 
@@ -36,19 +36,14 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
 
-public class AmqPlainSaslServer implements SaslServer
+public class AnonymousSaslServer implements SaslServer
 {
-    public static final String MECHANISM = "AMQPLAIN";
-
-    private CallbackHandler _cbh;
-
-    private String _authorizationId;
+    public static final String MECHANISM = "ANONYMOUS";
 
     private boolean _complete = false;
 
-    public AmqPlainSaslServer(CallbackHandler cbh)
+    public AnonymousSaslServer()
     {
-        _cbh = cbh;
     }
 
     public String getMechanismName()
@@ -58,46 +53,8 @@
 
     public byte[] evaluateResponse(byte[] response) throws SaslException
     {
-        try
-        {
-            final FieldTable ft = FieldTableFactory.newFieldTable(ByteBuffer.wrap(response), response.length);
-            String username = (String) ft.getString("LOGIN");
-            // we do not care about the prompt but it throws if null
-            NameCallback nameCb = new NameCallback("prompt", username);
-            // we do not care about the prompt but it throws if null
-            PasswordCallback passwordCb = new PasswordCallback("prompt", false);
-            // TODO: should not get pwd as a String but as a char array...
-            String pwd = (String) ft.getString("PASSWORD");
-            AuthorizeCallback authzCb = new AuthorizeCallback(username, username);
-            Callback[] callbacks = new Callback[]{nameCb, passwordCb, authzCb};
-            _cbh.handle(callbacks);
-            String storedPwd = new String(passwordCb.getPassword());
-            if (storedPwd.equals(pwd))
-            {
-                _complete = true;
-            }
-            if (authzCb.isAuthorized() && _complete)
-            {
-                _authorizationId = authzCb.getAuthenticationID();
-                return null;
-            }
-            else
-            {
-                throw new SaslException("Authentication failed");
-            }
-        }
-        catch (AMQFrameDecodingException e)
-        {
-            throw new SaslException("Unable to decode response: " + e, e);
-        }
-        catch (IOException e)
-        {
-            throw new SaslException("Error processing data: " + e, e);
-        }
-        catch (UnsupportedCallbackException e)
-        {
-            throw new SaslException("Unable to obtain data from callback handler: " + e, e);
-        }
+        _complete = true;
+        return null;
     }
 
     public boolean isComplete()
@@ -107,7 +64,7 @@
 
     public String getAuthorizationID()
     {
-        return _authorizationId;
+        return null;
     }
 
     public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException
@@ -127,6 +84,5 @@
 
     public void dispose() throws SaslException
     {
-        _cbh = null;
     }
-}
+}
\ No newline at end of file

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java (from r891203, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java&r1=891203&r2=904934&rev=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java Sun Jan 31 00:31:49 2010
@@ -18,7 +18,9 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.security.auth.sasl.amqplain;
+package org.apache.qpid.server.security.auth.sasl.anonymous;
+
+import org.apache.qpid.server.security.auth.sasl.amqplain.AmqPlainSaslServer;
 
 import java.util.Map;
 
@@ -28,14 +30,14 @@
 import javax.security.sasl.SaslServer;
 import javax.security.sasl.SaslServerFactory;
 
-public class AmqPlainSaslServerFactory implements SaslServerFactory
+public class AnonymousSaslServerFactory implements SaslServerFactory
 {
     public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map props,
                                        CallbackHandler cbh) throws SaslException
     {
-        if (AmqPlainSaslServer.MECHANISM.equals(mechanism))
+        if (AnonymousSaslServer.MECHANISM.equals(mechanism))
         {
-            return new AmqPlainSaslServer(cbh);
+            return new AnonymousSaslServer();
         }
         else
         {
@@ -47,14 +49,15 @@
     {
         if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
             props.containsKey(Sasl.POLICY_NODICTIONARY) ||
-            props.containsKey(Sasl.POLICY_NOACTIVE))
+            props.containsKey(Sasl.POLICY_NOACTIVE) ||
+            props.containsKey(Sasl.POLICY_NOANONYMOUS))
         {
             // returned array must be non null according to interface documentation
             return new String[0];
         }
         else
         {
-            return new String[]{AmqPlainSaslServer.MECHANISM};
+            return new String[]{AnonymousSaslServer.MECHANISM};
         }
     }
-}
+}
\ No newline at end of file

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Sun Jan 31 00:31:49 2010
@@ -20,22 +20,24 @@
 */
 package org.apache.qpid.server.store;
 
+import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
 import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
 import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.commons.configuration.Configuration;
-
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
 import java.sql.Blob;
 import java.sql.Connection;
 import java.sql.Driver;
@@ -49,8 +51,6 @@
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.lang.ref.WeakReference;
-import java.nio.ByteBuffer;
 
 
 public class DerbyMessageStore implements MessageStore
@@ -590,7 +590,10 @@
                     conn = newConnection();
 
                     PreparedStatement stmt = conn.prepareStatement(FIND_EXCHANGE);
-                    stmt.setString(1, exchange.getName().toString());
+                    stmt.setString(1, exchange.getNameShortString().toString());
+                    stmt.execute();
+                    stmt.close();
+                    conn.commit();
 
                     ResultSet rs = stmt.executeQuery();
 
@@ -617,7 +620,7 @@
             }
             catch (SQLException e)
             {
-                throw new AMQException("Error writing Exchange with name " + exchange.getName() + " to database: " + e, e);
+                throw new AMQException("Error writing Exchange with name " + exchange.getNameShortString() + " to database: " + e, e);
             }
         }
 
@@ -631,11 +634,11 @@
         {
             conn = newConnection();
             PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE);
-            stmt.setString(1, exchange.getName().toString());
+            stmt.setString(1, exchange.getNameShortString().toString());
             int results = stmt.executeUpdate();
             if(results == 0)
             {
-                throw new AMQException("Exchange " + exchange.getName() + " not found");
+                throw new AMQException("Exchange " + exchange.getNameShortString() + " not found");
             }
             else
             {
@@ -645,7 +648,7 @@
         }
         catch (SQLException e)
         {
-            throw new AMQException("Error writing deleting with name " + exchange.getName() + " from database: " + e, e);
+            throw new AMQException("Error writing deleting with name " + exchange.getNameShortString() + " from database: " + e, e);
         }
         finally
         {
@@ -677,8 +680,8 @@
                 conn = newConnection();
 
                 PreparedStatement stmt = conn.prepareStatement(FIND_BINDING);
-                stmt.setString(1, exchange.getName().toString() );
-                stmt.setString(2, queue.getName().toString());
+                stmt.setString(1, exchange.getNameShortString().toString() );
+                stmt.setString(2, queue.getNameShortString().toString());
                 stmt.setString(3, routingKey == null ? null : routingKey.toString());
 
                 ResultSet rs = stmt.executeQuery();
@@ -687,8 +690,8 @@
                 if (!rs.next())
                 {
                     stmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
-                    stmt.setString(1, exchange.getName().toString() );
-                    stmt.setString(2, queue.getName().toString());
+                    stmt.setString(1, exchange.getNameShortString().toString() );
+                    stmt.setString(2, queue.getNameShortString().toString());
                     stmt.setString(3, routingKey == null ? null : routingKey.toString());
                     if(args != null)
                     {
@@ -713,8 +716,8 @@
             }
             catch (SQLException e)
             {
-                throw new AMQException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange "
-                    + exchange.getName() + " to database: " + e, e);
+                throw new AMQException("Error writing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
+                    + exchange.getNameShortString() + " to database: " + e, e);
             }
             finally
             {
@@ -748,23 +751,23 @@
             conn = newConnection();
             // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
             PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
-            stmt.setString(1, exchange.getName().toString() );
-            stmt.setString(2, queue.getName().toString());
+            stmt.setString(1, exchange.getNameShortString().toString() );
+            stmt.setString(2, queue.getNameShortString().toString());
             stmt.setString(3, routingKey == null ? null : routingKey.toString());
 
 
             if(stmt.executeUpdate() != 1)
             {
-                 throw new AMQException("Queue binding for queue with name " + queue.getName() + " to exchange "
-                + exchange.getName() + "  not found");
+                 throw new AMQException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange "
+                + exchange.getNameShortString() + "  not found");
             }
             conn.commit();
             stmt.close();
         }
         catch (SQLException e)
         {
-            throw new AMQException("Error removing binding for AMQQueue with name " + queue.getName() + " to exchange "
-                + exchange.getName() + " in database: " + e, e);
+            throw new AMQException("Error removing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
+                + exchange.getNameShortString() + " in database: " + e, e);
         }
         finally
         {
@@ -801,7 +804,7 @@
                 Connection conn = newConnection();
 
                 PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
-                stmt.setString(1, queue.getName().toString());
+                stmt.setString(1, queue.getNameShortString().toString());
 
                 ResultSet rs = stmt.executeQuery();
 
@@ -816,7 +819,7 @@
                                      ? null
                                      : queue.getPrincipalHolder().getPrincipal().getName();
 
-                    stmt.setString(1, queue.getName().toString());
+                    stmt.setString(1, queue.getNameShortString().toString());
                     stmt.setString(2, owner);
 
                     stmt.execute();
@@ -830,7 +833,7 @@
             }
             catch (SQLException e)
             {
-                throw new AMQException("Error writing AMQQueue with name " + queue.getName() + " to database: " + e, e);
+                throw new AMQException("Error writing AMQQueue with name " + queue.getNameShortString() + " to database: " + e, e);
             }
         }
     }
@@ -843,7 +846,7 @@
 
     public void removeQueue(final AMQQueue queue) throws AMQException
     {
-        AMQShortString name = queue.getName();
+        AMQShortString name = queue.getNameShortString();
         _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
         Connection conn = null;
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Sun Jan 31 00:31:49 2010
@@ -31,6 +31,11 @@
 public interface DurableConfigurationStore
 {
 
+    public static interface Source
+    {
+        DurableConfigurationStore getDurableConfigurationStore();
+    }
+
     /**
      * Called after instantiation in order to configure the message store. A particular implementation can define
      * whatever parameters it wants.

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Sun Jan 31 00:31:49 2010
@@ -20,41 +20,49 @@
  */
 package org.apache.qpid.server.subscription;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.Map;
-
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.configuration.SubscriptionConfig;
+import org.apache.qpid.server.configuration.SubscriptionConfigType;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.SubscriptionActor;
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
  * that was given out by the broker and the channel id. <p/>
  */
-public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener
+public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener,
+                                                  SubscriptionConfig
 {
 
     private StateListener _stateListener = new StateListener()
@@ -85,6 +93,7 @@
     private final long _subscriptionID = idGenerator.getAndIncrement();
     private LogSubject _logSubject;
     private LogActor _logActor;
+    private UUID _id;
 
 
     static final class BrowserSubscription extends SubscriptionImpl
@@ -152,6 +161,12 @@
             return false;
         }
 
+        @Override
+        public boolean isExplicitAcknowledge()
+        {
+            return false;
+        }
+
         /**
          * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
          * thread safe.
@@ -318,9 +333,12 @@
             _autoClose = false;
         }
 
-
     }
 
+    public ConfigStore getConfigStore()
+    {
+        return getQueue().getConfigStore();
+    }
 
 
     public synchronized void setQueue(AMQQueue queue, boolean exclusive)
@@ -331,6 +349,9 @@
         }
         _queue = queue;
 
+        _id = getConfigStore().createId();
+        getConfigStore().addConfiguredObject(this);
+
         _logSubject = new SubscriptionLogSubject(this);
         _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
 
@@ -414,7 +435,7 @@
     public boolean hasInterest(QueueEntry entry)
     {
 
-        
+
 
 
         //check that the message hasn't been rejected
@@ -505,7 +526,7 @@
         {
             _stateChangeLock.unlock();
         }
-
+        getConfigStore().removeConfiguredObject(this);
 
         //Log Subscription closed
         CurrentActor.get().message(_logSubject, SubscriptionMessages.SUB_CLOSE());
@@ -689,4 +710,60 @@
     }
 
     abstract boolean isBrowser();
+
+    public String getCreditMode()
+    {
+        return "WINDOW";
+    }
+
+    public SessionConfig getSessionConfig()
+    {
+        return getChannel();
+    }
+
+    public boolean isBrowsing()
+    {
+        return isBrowser();
+    }
+
+    public boolean isExplicitAcknowledge()
+    {
+        return true;
+    }
+
+    public UUID getId()
+    {
+        return _id;
+    }
+
+    public boolean isDurable()
+    {
+        return false;
+    }
+
+    public SubscriptionConfigType getConfigType()
+    {
+        return SubscriptionConfigType.getInstance();
+    }
+
+    public boolean isExclusive()
+    {
+        return getQueue().hasExclusiveSubscriber();
+    }
+
+    public ConfiguredObject getParent()
+    {
+        return getSessionConfig();
+    }
+
+    public String getName()
+    {
+        return String.valueOf(_consumerTag);
+    }
+
+    public Map<String, Object> getArguments()
+    {
+        return null;
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Sun Jan 31 00:31:49 2010
@@ -36,11 +36,12 @@
 import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderProperties;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQTypedValue;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.transport.*;
 
@@ -50,12 +51,10 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
+import java.util.*;
 import java.nio.ByteBuffer;
 
-public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener
+public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig
 {
 
     private static final AtomicLong idGenerator = new AtomicLong(0);
@@ -98,6 +97,9 @@
     private LogSubject _logSubject;
     private LogActor _logActor;
     private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+    private UUID _id;
+    private String _traceExclude;
+    private String _trace;
 
 
     public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode,
@@ -116,7 +118,6 @@
         _creditManager.addStateListener(this);
         _state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED);
 
-
     }
 
     public void setNoLocal(boolean noLocal)
@@ -146,6 +147,11 @@
             throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
         }
         _queue = queue;
+        Map<String, Object> arguments = queue.getArguments() == null ? Collections.EMPTY_MAP : queue.getArguments();
+        _traceExclude = (String) arguments.get("qpid.trace.exclude");
+        _trace = (String) arguments.get("qpid.trace.id");
+        _id = getConfigStore().createId();
+        getConfigStore().addConfiguredObject(this);
         _logSubject = new SubscriptionLogSubject(this);
         _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
 
@@ -235,6 +241,7 @@
                 }
             }
             _creditManager.removeListener(this);
+            getConfigStore().removeConfiguredObject(this);
         }
         finally
         {
@@ -245,6 +252,11 @@
 
     }
 
+    public ConfigStore getConfigStore()
+    {
+        return getQueue().getConfigStore();
+    }
+
     public void creditStateChanged(boolean hasCredit)
     {
 
@@ -290,6 +302,9 @@
 
         MessageTransfer xfr;
 
+        DeliveryProperties deliveryProps;
+        MessageProperties messageProps = null;
+
         if(serverMsg instanceof MessageTransferMessage)
         {
 
@@ -316,11 +331,15 @@
                 }
                 else
                 {
+                    if(header instanceof MessageProperties)
+                    {
+                        messageProps = (MessageProperties) header;
+                    }
                     newHeaders.add(header);
                 }
             }
 
-            DeliveryProperties deliveryProps = new DeliveryProperties();
+            deliveryProps = new DeliveryProperties();
             if(origDeliveryProps != null)
             {
                 if(origDeliveryProps.hasDeliveryMode())
@@ -343,21 +362,33 @@
                 {
                     deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
                 }
+                if(origDeliveryProps.hasTimestamp())
+                {
+                    deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
+                }
+
 
             }
 
             deliveryProps.setRedelivered(entry.isRedelivered());
 
             newHeaders.add(deliveryProps);
+
+            if(_trace != null && messageProps == null)
+            {
+                messageProps = new MessageProperties();
+                newHeaders.add(messageProps);
+            }
+
             Header header = new Header(newHeaders);
 
             xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
         }
-        else
+        else if(serverMsg instanceof AMQMessage)
         {
             AMQMessage message_0_8 = (AMQMessage) serverMsg;
-            DeliveryProperties deliveryProps = new DeliveryProperties();
-            MessageProperties messageProps = new MessageProperties();
+            deliveryProps = new DeliveryProperties();
+            messageProps = new MessageProperties();
 
             int size = (int) message_0_8.getSize();
             ByteBuffer body = ByteBuffer.allocate(size);
@@ -399,9 +430,52 @@
                 messageProps.setUserId(properties.getUserId().getBytes());
             }
 
+            FieldTable fieldTable = properties.getHeaders();
+
+            final Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
+
+
+            messageProps.setApplicationHeaders(appHeaders);
+
+            Header header = new Header(headers);
+            xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
+        }
+        else
+        {
+
+            deliveryProps = new DeliveryProperties();
+            messageProps = new MessageProperties();
+
+            int size = (int) serverMsg.getSize();
+            ByteBuffer body = ByteBuffer.allocate(size);
+            serverMsg.getContent(body, 0);
+            body.flip();
+
+            Struct[] headers = new Struct[] { deliveryProps, messageProps };
+
+
+            deliveryProps.setExpiration(serverMsg.getExpiration());
+            deliveryProps.setImmediate(serverMsg.isImmediate());
+            deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
+            deliveryProps.setRedelivered(entry.isRedelivered());
+            deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
+            deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
+
+            messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
+            messageProps.setContentLength(size);
+            messageProps.setContentType(serverMsg.getMessageHeader().getMimeType());
+            if(serverMsg.getMessageHeader().getCorrelationId() != null)
+            {
+                messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
+            }
+
+
+            // TODO - ReplyTo
+
+
             final Map<String, Object> appHeaders = new HashMap<String, Object>();
 
-            properties.getHeaders().processOverElements(
+            /*properties.getHeaders().processOverElements(
                     new FieldTable.FieldTableElementProcessor()
                     {
 
@@ -424,39 +498,98 @@
 
 
             messageProps.setApplicationHeaders(appHeaders);
-
+*/
             Header header = new Header(headers);
             xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
         }
 
-        if(_acceptMode == MessageAcceptMode.NONE)
+        boolean excludeDueToFederation = false;
+
+        if(_trace != null)
         {
-            xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
+            if(!messageProps.hasApplicationHeaders())
+            {
+                messageProps.setApplicationHeaders(new HashMap<String,Object>());
+            }
+            Map<String,Object> appHeaders = messageProps.getApplicationHeaders();
+            String trace = (String) appHeaders.get("x-qpid.trace");
+            if(trace == null)
+            {
+                trace = _trace;
+            }
+            else
+            {
+                if(_traceExclude != null)
+                {
+                    excludeDueToFederation = Arrays.asList(trace.split(",")).contains(_traceExclude);
+                }
+                trace+=","+_trace;
+            }
+            appHeaders.put("x-qpid.trace",trace);
         }
-        else if(_flowMode == MessageFlowMode.WINDOW)
+
+        if(!excludeDueToFederation)
         {
-            xfr.setCompletionListener(new Method.CompletionListener()
-                                        {
-                                            public void onComplete(Method method)
+            if(_acceptMode == MessageAcceptMode.NONE)
+            {
+                xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
+            }
+            else if(_flowMode == MessageFlowMode.WINDOW)
+            {
+                xfr.setCompletionListener(new Method.CompletionListener()
                                             {
-                                                restoreCredit(entry);
-                                            }
-                                        });
-        }
+                                                public void onComplete(Method method)
+                                                {
+                                                    restoreCredit(entry);
+                                                }
+                                            });
+            }
 
 
-        _postIdSettingAction._xfr = xfr;
-        if(_acceptMode == MessageAcceptMode.EXPLICIT)
-        {
-            _postIdSettingAction._action = new ExplicitAcceptDispositionChangeListener(entry, this);
+            _postIdSettingAction._xfr = xfr;
+            if(_acceptMode == MessageAcceptMode.EXPLICIT)
+            {
+                _postIdSettingAction._action = new ExplicitAcceptDispositionChangeListener(entry, this);
+            }
+            else
+            {
+                _postIdSettingAction._action = new ImplicitAcceptDispositionChangeListener(entry, this);
+            }
+
+            _session.sendMessage(xfr, _postIdSettingAction);
+
+            if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+            {
+                forceDequeue(entry, false);
+            }
         }
         else
         {
-            _postIdSettingAction._action = new ImplicitAcceptDispositionChangeListener(entry, this);
+            forceDequeue(entry, _flowMode == MessageFlowMode.WINDOW);
+
         }
+    }
+
+    private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
+    {
+        ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+        txn.dequeue(entry.getQueue(),entry.getMessage(),
+                                new ServerTransaction.Action()
+                            {
+                                public void postCommit()
+                                {
+                                    if(restoreCredit)
+                                    {
+                                        restoreCredit(entry);
+                                    }
+                                    entry.discard();
+                                }
 
-        _session.sendMessage(xfr, _postIdSettingAction);
+                                public void onRollback()
+                                {
 
+                                }
+                            });
     }
 
     void reject(QueueEntry entry)
@@ -660,4 +793,59 @@
     }
 
 
+    public SessionConfig getSessionConfig()
+    {
+        return getSession();
+    }
+
+    public boolean isBrowsing()
+    {
+        return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
+    }
+
+    public boolean isExclusive()
+    {
+        return getQueue().hasExclusiveSubscriber();
+    }
+
+    public ConfiguredObject getParent()
+    {
+        return getSessionConfig();
+    }
+
+    public boolean isDurable()
+    {
+        return false;
+    }
+
+    public SubscriptionConfigType getConfigType()
+    {
+        return SubscriptionConfigType.getInstance();
+    }
+
+    public boolean isExplicitAcknowledge()
+    {
+        return _acceptMode == MessageAcceptMode.EXPLICIT;
+    }
+
+    public String getCreditMode()
+    {
+        return _flowMode.toString();
+    }
+
+    public UUID getId()
+    {
+        return _id;
+    }
+
+    public String getName()
+    {
+        return _destination;
+    }
+
+    public Map<String, Object> getArguments()
+    {
+        //TODO
+        return Collections.EMPTY_MAP;
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Sun Jan 31 00:31:49 2010
@@ -20,12 +20,20 @@
  */
 package org.apache.qpid.server.transport;
 
+import org.apache.qpid.server.configuration.ConnectionConfig;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.Method;
-import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class ServerConnection extends Connection
 {
+    private ConnectionConfig _config;
+    private Runnable _onOpenTask;
+
+    public ServerConnection()
+    {
+    }
+
     @Override
     protected void invoke(Method method)
     {
@@ -36,6 +44,10 @@
     protected void setState(State state)
     {
         super.setState(state);
+        if(state == State.OPEN && _onOpenTask != null)
+        {
+            _onOpenTask.run();
+        }
     }
 
     @Override
@@ -61,4 +73,19 @@
     {
         _virtualHost = virtualHost;
     }
+
+    public void setConnectionConfig(final ConnectionConfig config)
+    {
+        _config = config;
+    }
+
+    public ConnectionConfig getConfig()
+    {
+        return _config;
+    }
+
+    public void onOpen(final Runnable task)
+    {
+        _onOpenTask = task;
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Sun Jan 31 00:31:49 2010
@@ -40,7 +40,7 @@
     public ServerConnectionDelegate(IApplicationRegistry appRegistry,
                                     String localFQDN)
     {
-        this(Collections.EMPTY_MAP, Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
+        this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
     }
 
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Sun Jan 31 00:31:49 2010
@@ -20,32 +20,55 @@
  */
 package org.apache.qpid.server.transport;
 
-import org.apache.qpid.transport.*;
+import com.sun.security.auth.UserPrincipal;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.ConnectionConfig;
+import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.configuration.SessionConfigType;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.subscription.Subscription_0_10;
-import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
-import org.apache.qpid.server.security.PrincipalHolder;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionDelegate;
+import static org.apache.qpid.util.Serial.gt;
 
-import java.util.*;
+import java.lang.ref.WeakReference;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ConcurrentHashMap;
-import java.security.Principal;
-import java.lang.ref.WeakReference;
 
-import static org.apache.qpid.util.Serial.*;
-import com.sun.security.auth.UserPrincipal;
-
-public class ServerSession extends Session implements PrincipalHolder
+public class ServerSession extends Session implements PrincipalHolder, SessionConfig
 {
     private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
 
+    private final UUID _id;
+    private ConnectionConfig _connectionConfig;
+
     public static interface MessageDispositionChangeListener
     {
         public void onAccept();
@@ -78,37 +101,41 @@
 
     private final WeakReference<Session> _reference;
 
-
-    ServerSession(Connection connection, Binary name, long expiry)
+    ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
     {
-        super(connection, name, expiry);
-
-        _transaction = new AutoCommitTransaction(this.getMessageStore());
-        _principal = new UserPrincipal(connection.getAuthorizationID());
-        _reference = new WeakReference(this);
+        this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
     }
 
-    ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
+    public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
     {
         super(connection, delegate, name, expiry);
+        _connectionConfig = connConfig;        
         _transaction = new AutoCommitTransaction(this.getMessageStore());
         _principal = new UserPrincipal(connection.getAuthorizationID());
         _reference = new WeakReference(this);
+        _id = getConfigStore().createId();
+        getConfigStore().addConfiguredObject(this);
+    }
+
+    private ConfigStore getConfigStore()
+    {
+        return getConnectionConfig().getConfigStore();
     }
 
+
     @Override
     protected boolean isFull(int id)
     {
         return isCommandsFull(id);
     }
 
-    public void enqueue(final ServerMessage message, final ArrayList<AMQQueue> queues)
+    public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
     {
 
             _transaction.enqueue(queues,message, new ServerTransaction.Action()
             {
 
-                AMQQueue[] _queues = queues.toArray(new AMQQueue[queues.size()]);
+                BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
 
                 public void postCommit()
                 {
@@ -243,7 +270,7 @@
             Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
             Iterator<Range> rangeIter = ranges.iterator();
 
-            if(rangeIter.hasNext())                               
+            if(rangeIter.hasNext())
             {
                 Range range = rangeIter.next();
 
@@ -290,6 +317,8 @@
         }
         _messageDispositionListenerMap.clear();
 
+        getConfigStore().removeConfiguredObject(this);
+
         for (Task task : _taskList)
         {
             task.doTask(this);
@@ -391,8 +420,61 @@
 
     public MessageStore getMessageStore()
     {
-        return ((ServerConnection)getConnection()).getVirtualHost().getMessageStore();
+        return getVirtualHost().getMessageStore();
     }
 
+    public VirtualHost getVirtualHost()
+    {
+        return (VirtualHost) _connectionConfig.getVirtualHost();
+    }
+
+    public UUID getId()
+    {
+        return _id;
+    }
+
+    public SessionConfigType getConfigType()
+    {
+        return SessionConfigType.getInstance();
+    }
 
+    public ConfiguredObject getParent()
+    {
+        return getVirtualHost();
+    }
+
+    public boolean isDurable()
+    {
+        return false;
+    }
+
+    public boolean isAttached()
+    {
+        return true;
+    }
+
+    public long getDetachedLifespan()
+    {
+        return 0;
+    }
+
+    public Long getExpiryTime()
+    {
+        return null;
+    }
+
+    public Long getMaxClientRate()
+    {
+        return null;
+    }
+
+    public ConnectionConfig getConnectionConfig()
+    {
+        return _connectionConfig;
+    }
+
+    public String getSessionName()
+    {
+        return getName().toString();
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Sun Jan 31 00:31:49 2010
@@ -28,6 +28,7 @@
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.message.MessageMetaData_0_10;
 import org.apache.qpid.server.subscription.Subscription_0_10;
@@ -42,6 +43,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
+import java.nio.ByteBuffer;
 
 public class ServerSessionDelegate extends SessionDelegate
 {
@@ -218,11 +220,15 @@
         MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
         final MessageStore store = getVirtualHost(ssn).getMessageStore();
         StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
-        storeMessage.addContent(0,xfr.getBody());
+        ByteBuffer body = xfr.getBody();
+        if(body != null)
+        {
+            storeMessage.addContent(0, body);
+        }
         storeMessage.flushToStore();
         MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
 
-        ArrayList<AMQQueue> queues = exchange.route(message);
+        ArrayList<? extends BaseQueue> queues = exchange.route(message);
 
 
 
@@ -355,7 +361,7 @@
             else
             {
                 // TODO - check exchange has same properties
-                if(!exchange.getType().toString().equals(method.getType()))
+                if(!exchange.getTypeShortString().toString().equals(method.getType()))
                 {
                     exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
                 }
@@ -419,7 +425,7 @@
             }
             else
             {
-                if(!exchange.getType().toString().equals(method.getType()))
+                if(!exchange.getTypeShortString().toString().equals(method.getType()))
                 {
                     exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
                 }
@@ -525,7 +531,7 @@
         if(exchange != null)
         {
             result.setDurable(exchange.isDurable());
-            result.setType(exchange.getType().toString());
+            result.setType(exchange.getTypeShortString().toString());
             result.setNotFound(false);
         }
         else
@@ -582,30 +588,23 @@
                                                                            + "' to Queue: '" + method.getQueue()
                                                                            + "' not allowed");
             }
-            else if(exchange.getType().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
+            else if(exchange.getTypeShortString().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
             {
                 exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header");
             }
             else
             {
-                try
-                {
-                    AMQShortString routingKey = new AMQShortString(method.getBindingKey());
-                    FieldTable fieldTable = FieldTable.convertToFieldTable(method.getArguments());
+                AMQShortString routingKey = new AMQShortString(method.getBindingKey());
+                FieldTable fieldTable = FieldTable.convertToFieldTable(method.getArguments());
 
-                    if (!exchange.isBound(routingKey, fieldTable, queue))
-                    {
-                        queue.bind(exchange, routingKey, fieldTable);
+                if (!exchange.isBound(routingKey, fieldTable, queue))
+                {
+                    virtualHost.getBindingFactory().addBinding(method.getBindingKey(), queue, exchange, method.getArguments());
 
-                    }
-                    else
-                    {
-                        // todo
-                    }
                 }
-                catch (AMQException e)
+                else
                 {
-                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    // todo
                 }
             }
 
@@ -649,14 +648,7 @@
             }
             else
             {
-                try
-                {
-                    queue.unBind(exchange, new AMQShortString(method.getBindingKey()), null);
-                }
-                catch (AMQException e)
-                {
-                    throw new RuntimeException(e);
-                }
+                virtualHost.getBindingFactory().removeBinding(method.getBindingKey(), queue, exchange, null);
             }
         }
 
@@ -827,7 +819,7 @@
                         {
                             queue.setDeleteOnNoConsumers(true);
                         }
-                        
+
                         final String alternateExchangeName = method.getAlternateExchange();
                         if(alternateExchangeName != null && alternateExchangeName.length() != 0)
                         {
@@ -870,11 +862,12 @@
 
                         if (autoRegister)
                         {
+
                             ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
 
                             Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
 
-                            queue.bind(defaultExchange, new AMQShortString(queueName), null);
+                            virtualHost.getBindingFactory().addBinding(queueName, queue, defaultExchange, null);
 
                         }
 
@@ -1114,7 +1107,7 @@
 
         if(queue != null)
         {
-            result.setQueue(queue.getName().toString());
+            result.setQueue(queue.getNameShortString().toString());
             result.setDurable(queue.isDurable());
             result.setExclusive(queue.isExclusive());
             result.setAutoDelete(queue.isAutoDelete());

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Sun Jan 31 00:31:49 2010
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.TransactionLog;
@@ -46,7 +47,7 @@
         postCommitAction.postCommit();
     }
 
-    public void dequeue(AMQQueue queue, EnqueableMessage message, Action postCommitAction)
+    public void dequeue(BaseQueue queue, EnqueableMessage message, Action postCommitAction)
     {
 
         try
@@ -105,7 +106,7 @@
     }
 
 
-    public void enqueue(AMQQueue queue, EnqueableMessage message, Action postCommitAction)
+    public void enqueue(BaseQueue queue, EnqueableMessage message, Action postCommitAction)
     {
         try
         {
@@ -128,7 +129,7 @@
 
     }
 
-    public void enqueue(List<AMQQueue> queues, EnqueableMessage message, Action postCommitAction)
+    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postCommitAction)
     {
         try
         {
@@ -137,7 +138,7 @@
             {
                 TransactionLog.Transaction txn = _transactionLog.newTransaction();
                 Long id = message.getMessageNumber();
-                for(AMQQueue q : queues)
+                for(BaseQueue q : queues)
                 {
                     if(q.isDurable())
                     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Sun Jan 31 00:31:49 2010
@@ -2,6 +2,7 @@
 
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.TransactionLog;
@@ -28,7 +29,7 @@
         _postCommitActions.add(postCommitAction);
     }
 
-    public void dequeue(AMQQueue queue, EnqueableMessage message, Action postCommitAction)
+    public void dequeue(BaseQueue queue, EnqueableMessage message, Action postCommitAction)
     {
         if(message.isPersistent() && queue.isDurable())
         {
@@ -113,7 +114,7 @@
         }
     }
 
-    public void enqueue(AMQQueue queue, EnqueableMessage message, Action postCommitAction)
+    public void enqueue(BaseQueue queue, EnqueableMessage message, Action postCommitAction)
     {
         if(message.isPersistent() && queue.isDurable())
         {
@@ -132,7 +133,7 @@
 
     }
 
-    public void enqueue(List<AMQQueue> queues, EnqueableMessage message, Action postCommitAction)
+    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postCommitAction)
     {
 
 
@@ -140,7 +141,7 @@
         {
             if(_transaction == null)
             {
-                for(AMQQueue queue : queues)
+                for(BaseQueue queue : queues)
                 {
                     if(queue.isDurable())
                     {
@@ -155,7 +156,7 @@
 
             try
             {
-                for(AMQQueue queue : queues)
+                for(BaseQueue queue : queues)
                 {
                     if(queue.isDurable())
                     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Sun Jan 31 00:31:49 2010
@@ -20,15 +20,12 @@
  */
 package org.apache.qpid.server.txn;
 
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
 
-import java.util.List;
-import java.util.SortedSet;
 import java.util.Collection;
+import java.util.List;
 
 public interface ServerTransaction
 {
@@ -45,13 +42,13 @@
         public void onRollback();
     }
 
-    void dequeue(AMQQueue queue, EnqueableMessage message, Action postCommitAction);
+    void dequeue(BaseQueue queue, EnqueableMessage message, Action postCommitAction);
 
     void dequeue(Collection<QueueEntry> ackedMessages, Action postCommitAction);
 
-    void enqueue(AMQQueue queue, EnqueableMessage message, Action postCommitAction);
+    void enqueue(BaseQueue queue, EnqueableMessage message, Action postCommitAction);
 
-    void enqueue(List<AMQQueue> queues, EnqueableMessage message, Action postCommitAction);
+    void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postCommitAction);
 
 
     void commit();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Sun Jan 31 00:31:49 2010
@@ -21,7 +21,10 @@
 package org.apache.qpid.server.virtualhost;
 
 import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.federation.BrokerLink;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfig;
+import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.exchange.ExchangeFactory;
@@ -31,8 +34,13 @@
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.access.ACLManager;
 import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.binding.BindingFactory;
 
-public interface VirtualHost
+import java.util.UUID;
+import java.util.TimerTask;
+
+public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig
 {
     IConnectionRegistry getConnectionRegistry();
 
@@ -59,4 +67,24 @@
     void close() throws Exception;
 
     ManagedObject getManagedObject();
+
+    UUID getBrokerId();
+
+    void scheduleTask(long period, TimerTask task);
+
+
+    IApplicationRegistry getApplicationRegistry();
+
+    BindingFactory getBindingFactory();
+
+    void createBrokerConnection(String transport,
+                                String host,
+                                int port,
+                                String vhost,
+                                boolean durable,
+                                String authMechanism, String username, String password);
+
+    ConfigStore getConfigStore();
+
+    void removeBrokerConnection(BrokerLink brokerLink);
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Sun Jan 31 00:31:49 2010
@@ -34,10 +34,10 @@
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
 import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.binding.BindingFactory;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.AMQException;
@@ -215,7 +215,7 @@
             if (queue == null)
             {
                 _logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: "
-                    + exchange.getName());
+                    + exchange.getNameShortString());
             }
             else
             {
@@ -227,10 +227,18 @@
                     argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit());
                 }
 
-                _logger.info("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + queueName
-                    + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")");
+                BindingFactory bf = _virtualHost.getBindingFactory();
 
-                queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT);
+                Map<String, Object> argumentMap = FieldTable.convertToMap(argumentsFT);
+
+                if(bf.getBinding(bindingKey, queue, exchange, argumentMap) == null)
+                {
+
+                    _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queueName
+                        + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")");
+
+                    bf.restoreBinding(bindingKey, queue, exchange, argumentMap);
+                }
 
             }
         }
@@ -271,7 +279,7 @@
 
                     if (_logger.isDebugEnabled())
                     {
-                        _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getName());
+                        _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getNameShortString());
                     }
 
                     Integer count = _queueRecoveries.get(queueName);
@@ -286,7 +294,7 @@
                 }
                 else
                 {
-                    _logger.warn("Message id " + messageId + " referenced in log as enqueue in queue " + queue.getName() + " is unknwon, entry will be discarded");
+                    _logger.warn("Message id " + messageId + " referenced in log as enqueue in queue " + queue.getNameShortString() + " is unknwon, entry will be discarded");
                     TransactionLog.Transaction txn = _transactionLog.newTransaction();
                     txn.dequeueMessage(queue, messageId);
                     txn.commitTranAsync();
@@ -333,7 +341,7 @@
             CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1005(entry.getValue(), entry.getKey()));
 
             CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1006(entry.getKey(), true));
-        }        
+        }
 
         CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1006(null, false));
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Sun Jan 31 00:31:49 2010
@@ -20,19 +20,21 @@
  */
 package org.apache.qpid.server.virtualhost;
 
-import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQBrokerManagerMBean;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.VirtualHostMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.binding.BindingFactory;
+import org.apache.qpid.server.configuration.BrokerConfig;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
 import org.apache.qpid.server.configuration.ExchangeConfiguration;
 import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfigType;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.connection.ConnectionRegistry;
 import org.apache.qpid.server.connection.IConnectionRegistry;
@@ -41,6 +43,11 @@
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.federation.BrokerLink;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.VirtualHostMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -48,14 +55,15 @@
 import org.apache.qpid.server.queue.DefaultQueueRegistry;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.access.ACLManager;
 import org.apache.qpid.server.security.access.Accessable;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 
 import javax.management.NotCompliantMBeanException;
 import java.util.Collections;
@@ -63,6 +71,9 @@
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
 
 public class VirtualHostImpl implements Accessable, VirtualHost
 {
@@ -88,9 +99,17 @@
 
     private ACLManager _accessManager;
 
-    private final Timer _houseKeepingTimer;
+    private final Timer _timer;
+    private final IApplicationRegistry _appRegistry;
     private VirtualHostConfiguration _configuration;
     private DurableConfigurationStore _durableConfigurationStore;
+    private BindingFactory _bindingFactory;
+    private BrokerConfig _broker;
+    private UUID _id;
+
+
+    private final long _createTime = System.currentTimeMillis();
+    private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
 
     public void setAccessableName(String name)
     {
@@ -113,6 +132,26 @@
         return _configuration;
     }
 
+    public UUID getId()
+    {
+        return _id;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public VirtualHostConfigType getConfigType()
+    {
+        return VirtualHostConfigType.getInstance();
+    }
+
+    public ConfiguredObject getParent()
+    {
+        return getBroker();
+    }
+
+    public boolean isDurable()
+    {
+        return false;
+    }
+
     /**
      * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
      * implementaion of an Exchange MBean should extend this class.
@@ -141,23 +180,28 @@
 
     } // End of MBean class
 
-    /**
-     * Normal Constructor
-     *
-     * @param hostConfig
-     *
-     * @throws Exception
-     */
-    public VirtualHostImpl(VirtualHostConfiguration hostConfig) throws Exception
+
+
+    public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
     {
-        this(hostConfig, null);
+        this(appRegistry, hostConfig, null);
     }
 
+
     public VirtualHostImpl(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
     {
+        this(ApplicationRegistry.getInstance(),hostConfig,store);
+    }
+
+    private VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
+    {
+        _appRegistry = appRegistry;
+        _broker = appRegistry.getBroker();
         _configuration = hostConfig;
         _name = hostConfig.getName();
 
+        _id = appRegistry.getConfigStore().createId();
+
         CurrentActor.get().message(VirtualHostMessages.VHT_CREATED(_name));
 
         if (_name == null || _name.length() == 0)
@@ -169,7 +213,7 @@
 
         _connectionRegistry = new ConnectionRegistry(this);
 
-        _houseKeepingTimer = new Timer("Queue-housekeeping-" + _name, true);
+        _timer = new Timer("TimerThread-" + _name + ":", true);
 
         _queueRegistry = new DefaultQueueRegistry(this);
 
@@ -178,6 +222,7 @@
 
         _exchangeRegistry = new DefaultExchangeRegistry(this);
 
+
         //Create a temporary RT to store the durable entries from the config file
         // so we can replay them in to the real _RT after it has been loaded.
         /// This should be removed after the _RT has been fully split from the the TL
@@ -189,6 +234,8 @@
         // This needs to be after the RT has been defined as it creates the default durable exchanges.
         _exchangeRegistry.initialise();
 
+        _bindingFactory = new BindingFactory(this);
+
         initialiseModel(hostConfig);
 
         if (store != null)
@@ -205,9 +252,11 @@
             initialiseMessageStore(hostConfig);
         }
 
+
+
         //Now that the RT has been initialised loop through the persistent queues/exchanges created from the config
         // file and write them in to the new routing Table.
-        for (StartupRoutingTable.CreateQueueTuple cqt : configFileRT.queue)
+/*        for (StartupRoutingTable.CreateQueueTuple cqt : configFileRT.queue)
         {
             getDurableConfigurationStore().createQueue(cqt.queue, cqt.arguments);
         }
@@ -220,7 +269,7 @@
         for (StartupRoutingTable.CreateBindingTuple cbt : configFileRT.bindings)
         {
             getDurableConfigurationStore().bindQueue(cbt.exchange, cbt.routingKey, cbt.queue, cbt.arguments);
-        }
+        }*/
 
         _authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, hostConfig);
 
@@ -250,7 +299,7 @@
                         }
                         catch (Exception e)
                         {
-                            _logger.error("Exception in housekeeping for queue: " + q.getName().toString(), e);
+                            _logger.error("Exception in housekeeping for queue: " + q.getNameShortString().toString(), e);
                             //Don't throw exceptions as this will stop the
                             // house keeping task from running.
                         }
@@ -258,9 +307,8 @@
                 }
             }
 
-            _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
-                                                   period / 2,
-                                                   period);
+            final TimerTask expiredMessagesTask = new RemoveExpiredMessagesTask();
+            scheduleTask(period, expiredMessagesTask);
 
             class ForceChannelClosuresTask extends TimerTask
             {
@@ -272,6 +320,12 @@
         }
     }
 
+    public void scheduleTask(final long period, final TimerTask task)
+    {
+        _timer.scheduleAtFixedRate(task, period / 2, period);
+    }
+
+
     private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception
     {
         String messageStoreClass = hostConfig.getMessageStoreClass();
@@ -377,7 +431,7 @@
         List routingKeys = queueConfiguration.getRoutingKeys();
         if (routingKeys == null || routingKeys.isEmpty())
         {
-            routingKeys = Collections.singletonList(queue.getName());
+            routingKeys = Collections.singletonList(queue.getNameShortString());
         }
 
         for (Object routingKeyNameObj : routingKeys)
@@ -387,12 +441,12 @@
             {
                 _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this);
             }
-            queue.bind(exchange, routingKey, null);
+            _bindingFactory.addBinding(routingKey.toString(), queue, exchange, null);
         }
 
         if (exchange != _exchangeRegistry.getDefaultExchange())
         {
-            queue.bind(_exchangeRegistry.getDefaultExchange(), queue.getName(), null);
+            _bindingFactory.addBinding(queue.getNameShortString().toString(), queue, exchange, null);
         }
     }
 
@@ -401,6 +455,26 @@
         return _name;
     }
 
+    public BrokerConfig getBroker()
+    {
+        return _broker;
+    }
+
+    public String getFederationTag()
+    {
+        return _broker.getFederationTag();
+    }
+
+    public void setBroker(final BrokerConfig broker)
+    {
+        _broker = broker;
+    }
+
+    public long getCreateTime()
+    {
+        return _createTime;
+    }
+
     public QueueRegistry getQueueRegistry()
     {
         return _queueRegistry;
@@ -457,9 +531,9 @@
         }
 
         //Stop Housekeeping
-        if (_houseKeepingTimer != null)
+        if (_timer != null)
         {
-            _houseKeepingTimer.cancel();
+            _timer.cancel();
         }
 
         //Close MessageStore
@@ -481,6 +555,59 @@
         return _virtualHostMBean;
     }
 
+    public UUID getBrokerId()
+    {
+        return _appRegistry.getBrokerId();
+    }
+
+    public IApplicationRegistry getApplicationRegistry()
+    {
+        return _appRegistry;
+    }
+
+    public BindingFactory getBindingFactory()
+    {
+        return _bindingFactory;
+    }
+
+    public void createBrokerConnection(final String transport,
+                                       final String host,
+                                       final int port,
+                                       final String vhost,
+                                       final boolean durable,
+                                       final String authMechanism,
+                                       final String username,
+                                       final String password)
+    {
+        BrokerLink blink = new BrokerLink(this, transport, host, port, vhost, durable, authMechanism, username, password);
+        _links.putIfAbsent(blink,blink);
+        getConfigStore().addConfiguredObject(blink);
+    }
+
+    public void removeBrokerConnection(final String transport,
+                                       final String host,
+                                       final int port,
+                                       final String vhost)
+    {
+        removeBrokerConnection(new BrokerLink(this, transport, host, port, vhost, false, null,null,null));
+
+    }
+
+    public void removeBrokerConnection(BrokerLink blink)
+    {
+        blink = _links.get(blink);
+        if(blink != null)
+        {
+            blink.close();
+            getConfigStore().removeConfiguredObject(blink);
+        }
+    }
+
+    public ConfigStore getConfigStore()
+    {
+        return getApplicationRegistry().getConfigStore();
+    }
+
     /**
      * Temporary Startup RT class to record the creation of persistent queues / exchanges.
      *

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java Sun Jan 31 00:31:49 2010
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.virtualhost;
 
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.configuration.ConfigStore;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -52,7 +53,7 @@
 
     public VirtualHost getVirtualHost(String name)
     {
-        if(name == null || name.trim().length() == 0 )
+        if(name == null || name.trim().length() == 0 || "/".equals(name.trim()))
         {
             name = getDefaultVirtualHostName();
         }
@@ -60,6 +61,11 @@
         return _registry.get(name);
     }
 
+    public VirtualHost getDefaultVirtualHost()
+    {
+        return getVirtualHost(getDefaultVirtualHostName());
+    }
+
     private String getDefaultVirtualHostName()
     {
         return _defaultVirtualHostName;
@@ -80,4 +86,9 @@
     {
         return _applicationRegistry;
     }
+
+    public ConfigStore getConfigStore()
+    {
+        return _applicationRegistry.getConfigStore();
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java Sun Jan 31 00:31:49 2010
@@ -219,7 +219,7 @@
 
         _console.println("");
 
-        _console.println(BOILER_PLATE);        
+        _console.println(BOILER_PLATE);
 
         runCLI();
     }
@@ -495,13 +495,13 @@
                 if (_exchange != null)
                 {
                     status.append("[");
-                    status.append(_exchange.getName());
+                    status.append(_exchange.getNameShortString());
                     status.append("]");
 
                     if (_queue != null)
                     {
                         status.append("->'");
-                        status.append(_queue.getName());
+                        status.append(_queue.getNameShortString());
                         status.append("'");
 
                         if (_msgids != null)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org