You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC

svn commit: r1525101 [16/21] - in /qpid/branches/linearstore/qpid: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2...

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Fri Sep 20 18:59:30 2013
@@ -35,6 +35,8 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -45,6 +47,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.JsonParseException;
@@ -54,6 +57,7 @@ import org.codehaus.jackson.map.ObjectMa
 abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
 {
     private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
+    private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION";
 
     private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES";
 
@@ -67,17 +71,27 @@ abstract public class AbstractJDBCMessag
     private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
 
     private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
+    private static final int DEFAULT_CONFIG_VERSION = 0;
 
     public static String[] ALL_TABLES = new String[] { DB_VERSION_TABLE_NAME, LINKS_TABLE_NAME, BRIDGES_TABLE_NAME, XID_ACTIONS_TABLE_NAME,
-        XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME };
+        XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME };
 
-    private static final int DB_VERSION = 6;
+    private static final int DB_VERSION = 7;
 
     private final AtomicLong _messageId = new AtomicLong(0);
     private AtomicBoolean _closed = new AtomicBoolean(false);
 
     private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
     private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
+    private static final String SELECT_FROM_DB_VERSION = "SELECT version FROM " + DB_VERSION_TABLE_NAME;
+    private static final String UPDATE_DB_VERSION = "UPDATE " + DB_VERSION_TABLE_NAME + " SET version = ?";
+
+
+    private static final String CREATE_CONFIG_VERSION_TABLE = "CREATE TABLE "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version int not null )";
+    private static final String INSERT_INTO_CONFIG_VERSION = "INSERT INTO "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
+    private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME;
+    private static final String UPDATE_CONFIG_VERSION = "UPDATE " + CONFIGURATION_VERSION_TABLE_NAME + " SET version = ?";
+
 
     private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
     private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
@@ -155,6 +169,7 @@ abstract public class AbstractJDBCMessag
     private MessageStoreRecoveryHandler _messageRecoveryHandler;
     private TransactionLogRecoveryHandler _tlogRecoveryHandler;
     private ConfigurationRecoveryHandler _configRecoveryHandler;
+    private VirtualHost _virtualHost;
 
     public AbstractJDBCMessageStore()
     {
@@ -162,48 +177,143 @@ abstract public class AbstractJDBCMessag
     }
 
     @Override
-    public void configureConfigStore(String name,
-                                     ConfigurationRecoveryHandler configRecoveryHandler,
-                                     VirtualHost virtualHost) throws Exception
+    public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler configRecoveryHandler) throws Exception
     {
         _stateManager.attainState(State.INITIALISING);
         _configRecoveryHandler = configRecoveryHandler;
-
-        commonConfiguration(name, virtualHost);
+        _virtualHost = virtualHost;
 
     }
 
     @Override
-    public void configureMessageStore(String name,
-                                      MessageStoreRecoveryHandler recoveryHandler,
+    public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
                                       TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
     {
+        if(_stateManager.isInState(State.INITIAL))
+        {
+            _stateManager.attainState(State.INITIALISING);
+        }
+
+        _virtualHost = virtualHost;
         _tlogRecoveryHandler = tlogRecoveryHandler;
         _messageRecoveryHandler = recoveryHandler;
 
+        completeInitialisation();
+    }
+
+    private void completeInitialisation() throws ClassNotFoundException, SQLException, AMQStoreException
+    {
+        commonConfiguration();
+
         _stateManager.attainState(State.INITIALISED);
     }
 
     @Override
     public void activate() throws Exception
     {
+        if(_stateManager.isInState(State.INITIALISING))
+        {
+            completeInitialisation();
+        }
         _stateManager.attainState(State.ACTIVATING);
 
         // this recovers durable exchanges, queues, and bindings
-        recoverConfiguration(_configRecoveryHandler);
-        recoverMessages(_messageRecoveryHandler);
-        TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler);
-        recoverXids(dtxrh);
+        if(_configRecoveryHandler != null)
+        {
+            recoverConfiguration(_configRecoveryHandler);
+        }
+        if(_messageRecoveryHandler != null)
+        {
+            recoverMessages(_messageRecoveryHandler);
+        }
+        if(_tlogRecoveryHandler != null)
+        {
+            TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler);
+            recoverXids(dtxrh);
+
+        }
 
         _stateManager.attainState(State.ACTIVE);
     }
 
-    private void commonConfiguration(String name, VirtualHost virtualHost)
-            throws ClassNotFoundException, SQLException
+    private void commonConfiguration()
+            throws ClassNotFoundException, SQLException, AMQStoreException
     {
-        implementationSpecificConfiguration(name, virtualHost);
+        implementationSpecificConfiguration(_virtualHost.getName(), _virtualHost);
         createOrOpenDatabase();
+        upgradeIfNecessary();
+    }
 
+    protected void upgradeIfNecessary() throws SQLException, AMQStoreException
+    {
+        Connection conn = newAutoCommitConnection();
+        try
+        {
+
+            PreparedStatement statement = conn.prepareStatement(SELECT_FROM_DB_VERSION);
+            try
+            {
+                ResultSet rs = statement.executeQuery();
+                try
+                {
+                    if(!rs.next())
+                    {
+                        throw new AMQStoreException(DB_VERSION_TABLE_NAME + " does not contain the database version");
+                    }
+                    int version = rs.getInt(1);
+                    switch (version)
+                    {
+                        case 6:
+                            upgradeFromV6();
+                        case DB_VERSION:
+                            return;
+                        default:
+                            throw new AMQStoreException("Unknown database version: " + version);
+                    }
+                }
+                finally
+                {
+                    rs.close();
+                }
+            }
+            finally
+            {
+                statement.close();
+            }
+        }
+        finally
+        {
+            conn.close();
+        }
+
+    }
+
+    private void upgradeFromV6() throws SQLException
+    {
+        updateDbVersion(7);
+    }
+
+    private void updateDbVersion(int newVersion) throws SQLException
+    {
+        Connection conn = newAutoCommitConnection();
+        try
+        {
+
+            PreparedStatement statement = conn.prepareStatement(UPDATE_DB_VERSION);
+            try
+            {
+                statement.setInt(1,newVersion);
+                statement.execute();
+            }
+            finally
+            {
+                statement.close();
+            }
+        }
+        finally
+        {
+            conn.close();
+        }
     }
 
     protected abstract void implementationSpecificConfiguration(String name,
@@ -222,6 +332,7 @@ abstract public class AbstractJDBCMessag
         Connection conn = newAutoCommitConnection();
 
         createVersionTable(conn);
+        createConfigVersionTable(conn);
         createConfiguredObjectsTable(conn);
         createQueueEntryTable(conn);
         createMetaDataTable(conn);
@@ -258,7 +369,33 @@ abstract public class AbstractJDBCMessag
                 pstmt.close();
             }
         }
+    }
 
+    private void createConfigVersionTable(final Connection conn) throws SQLException
+    {
+        if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+            try
+            {
+                stmt.execute(CREATE_CONFIG_VERSION_TABLE);
+            }
+            finally
+            {
+                stmt.close();
+            }
+
+            PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_CONFIG_VERSION);
+            try
+            {
+                pstmt.setInt(1, DEFAULT_CONFIG_VERSION);
+                pstmt.execute();
+            }
+            finally
+            {
+                pstmt.close();
+            }
+        }
     }
 
     private void createConfiguredObjectsTable(final Connection conn) throws SQLException
@@ -278,6 +415,8 @@ abstract public class AbstractJDBCMessag
         }
     }
 
+
+
     private void createQueueEntryTable(final Connection conn) throws SQLException
     {
         if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
@@ -456,10 +595,10 @@ abstract public class AbstractJDBCMessag
     {
         try
         {
-            recoveryHandler.beginConfigurationRecovery(this);
+            recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
             loadConfiguredObjects(recoveryHandler);
 
-            recoveryHandler.completeConfigurationRecovery();
+            setConfigVersion(recoveryHandler.completeConfigurationRecovery());
         }
         catch (SQLException e)
         {
@@ -467,6 +606,67 @@ abstract public class AbstractJDBCMessag
         }
     }
 
+    private void setConfigVersion(int version) throws SQLException
+    {
+        Connection conn = newAutoCommitConnection();
+        try
+        {
+
+            PreparedStatement stmt = conn.prepareStatement(UPDATE_CONFIG_VERSION);
+            try
+            {
+                stmt.setInt(1, version);
+                stmt.execute();
+
+            }
+            finally
+            {
+                stmt.close();
+            }
+        }
+        finally
+        {
+            conn.close();
+        }
+    }
+
+    private int getConfigVersion() throws SQLException
+    {
+        Connection conn = newAutoCommitConnection();
+        try
+        {
+
+            Statement stmt = conn.createStatement();
+            try
+            {
+                ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION);
+                try
+                {
+
+                    if(rs.next())
+                    {
+                        return rs.getInt(1);
+                    }
+                    return DEFAULT_CONFIG_VERSION;
+                }
+                finally
+                {
+                    rs.close();
+                }
+
+            }
+            finally
+            {
+                stmt.close();
+            }
+        }
+        finally
+        {
+            conn.close();
+        }
+
+    }
+
     @Override
     public void close() throws Exception
     {
@@ -895,6 +1095,11 @@ abstract public class AbstractJDBCMessag
 
     }
 
+    protected boolean isConfigStoreOnly()
+    {
+        return _messageRecoveryHandler == null;
+    }
+
     private static final class ConnectionWrapper
     {
         private final Connection _connection;
@@ -1055,8 +1260,8 @@ abstract public class AbstractJDBCMessag
                         ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
                         buf.position(1);
                         buf = buf.slice();
-                        MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
-                        StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+                        MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+                        StorableMessageMetaData metaData = type.createMetaData(buf);
                         StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
                         messageHandler.message(message);
                     }
@@ -1307,8 +1512,8 @@ abstract public class AbstractJDBCMessag
                         ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
                         buf.position(1);
                         buf = buf.slice();
-                        MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
-                        StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+                        MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+                        StorableMessageMetaData metaData = type.createMetaData(buf);
 
                         return metaData;
                     }
@@ -1804,15 +2009,35 @@ abstract public class AbstractJDBCMessag
             Connection conn = newAutoCommitConnection();
             try
             {
-                PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
-                try
-                {
-                    stmt.setString(1, id.toString());
-                    results = stmt.executeUpdate();
-                }
-                finally
+                results = removeConfiguredObject(id, conn);
+            }
+            finally
+            {
+                conn.close();
+            }
+        }
+        catch (SQLException e)
+        {
+            throw new AMQStoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e);
+        }
+        return results;
+    }
+
+    public UUID[] removeConfiguredObjects(UUID... objects) throws AMQStoreException
+    {
+        Collection<UUID> removed = new ArrayList<UUID>(objects.length);
+        try
+        {
+
+            Connection conn = newAutoCommitConnection();
+            try
+            {
+                for(UUID id : objects)
                 {
-                    stmt.close();
+                    if(removeConfiguredObject(id, conn) != 0)
+                    {
+                        removed.add(id);
+                    }
                 }
             }
             finally
@@ -1822,7 +2047,22 @@ abstract public class AbstractJDBCMessag
         }
         catch (SQLException e)
         {
-            throw new AMQStoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e);
+            throw new AMQStoreException("Error deleting of configured objects " + Arrays.asList(objects) + " from database: " + e.getMessage(), e);
+        }
+        return removed.toArray(new UUID[removed.size()]);
+    }
+
+    private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException
+    {
+        final int results;PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
+        try
+        {
+            stmt.setString(1, id.toString());
+            results = stmt.executeUpdate();
+        }
+        finally
+        {
+            stmt.close();
         }
         return results;
     }
@@ -1836,52 +2076,121 @@ abstract public class AbstractJDBCMessag
                 Connection conn = newAutoCommitConnection();
                 try
                 {
-                    PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
-                    try
+                    updateConfiguredObject(configuredObject, false, conn);
+                }
+                finally
+                {
+                    conn.close();
+                }
+            }
+            catch (SQLException e)
+            {
+                throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+            }
+        }
+    }
+
+    @Override
+    public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+    {
+        update(false, records);
+    }
+
+    public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException
+    {
+        if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.ACTIVATING))
+        {
+            try
+            {
+                Connection conn = newConnection();
+                try
+                {
+                    for(ConfiguredObjectRecord record : records)
                     {
-                        stmt.setString(1, configuredObject.getId().toString());
-                        ResultSet rs = stmt.executeQuery();
+                        updateConfiguredObject(record, createIfNecessary, conn);
+                    }
+                    conn.commit();
+                }
+                finally
+                {
+                    conn.close();
+                }
+            }
+            catch (SQLException e)
+            {
+                throw new AMQStoreException("Error updating configured objects in database: " + e.getMessage(), e);
+            }
+
+        }
+
+    }
+
+    private void updateConfiguredObject(ConfiguredObjectRecord configuredObject,
+                                        boolean createIfNecessary,
+                                        Connection conn)
+            throws SQLException, AMQStoreException
+    {
+            PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+            try
+            {
+                stmt.setString(1, configuredObject.getId().toString());
+                ResultSet rs = stmt.executeQuery();
+                try
+                {
+                    if (rs.next())
+                    {
+                        PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
                         try
                         {
-                            if (rs.next())
+                            stmt2.setString(1, configuredObject.getType());
+                            if (configuredObject.getAttributes() != null)
                             {
-                                PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
-                                try
-                                {
-                                    stmt2.setString(1, configuredObject.getType());
-                                    if (configuredObject.getAttributes() != null)
-                                    {
-                                        byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes(
-                                                configuredObject.getAttributes());
-                                        ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
-                                        stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
-                                    }
-                                    else
-                                    {
-                                        stmt2.setNull(2, Types.BLOB);
-                                    }
-                                    stmt2.setString(3, configuredObject.getId().toString());
-                                    stmt2.execute();
-                                }
-                                finally
-                                {
-                                    stmt2.close();
-                                }
+                                byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes(
+                                        configuredObject.getAttributes());
+                                ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+                                stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
+                            }
+                            else
+                            {
+                                stmt2.setNull(2, Types.BLOB);
                             }
+                            stmt2.setString(3, configuredObject.getId().toString());
+                            stmt2.execute();
                         }
                         finally
                         {
-                            rs.close();
+                            stmt2.close();
                         }
                     }
-                    finally
+                    else if(createIfNecessary)
                     {
-                        stmt.close();
+                        PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+                        try
+                        {
+                            insertStmt.setString(1, configuredObject.getId().toString());
+                            insertStmt.setString(2, configuredObject.getType());
+                            if(configuredObject.getAttributes() == null)
+                            {
+                                insertStmt.setNull(3, Types.BLOB);
+                            }
+                            else
+                            {
+                                final Map<String, Object> attributes = configuredObject.getAttributes();
+                                byte[] attributesAsBytes = new ObjectMapper().writeValueAsBytes(attributes);
+                                ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+                                insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+                            }
+                            insertStmt.execute();
+                        }
+                        finally
+                        {
+                            insertStmt.close();
+                        }
                     }
                 }
                 finally
                 {
-                    conn.close();
+                    rs.close();
                 }
             }
             catch (JsonMappingException e)
@@ -1896,11 +2205,11 @@ abstract public class AbstractJDBCMessag
             {
                 throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
             }
-            catch (SQLException e)
+            finally
             {
-                throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+                stmt.close();
             }
-        }
+
     }
 
     private ConfiguredObjectRecord loadConfiguredObject(final UUID id) throws AMQStoreException

Propchange: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java:r1501885-1525056

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Fri Sep 20 18:59:30 2013
@@ -28,10 +28,14 @@ import java.util.UUID;
 
 public interface ConfigurationRecoveryHandler
 {
-    void beginConfigurationRecovery(DurableConfigurationStore store);
+    void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion);
 
     void configuredObject(UUID id, String type, Map<String, Object> attributes);
 
-    void completeConfigurationRecovery();
+    /**
+     *
+     * @return the model version of the configuration
+     */
+    int completeConfigurationRecovery();
 
 }

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java Fri Sep 20 18:59:30 2013
@@ -60,4 +60,29 @@ public class ConfiguredObjectRecord
         return "ConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", attributes=" + _attributes + "]";
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if(this == o)
+        {
+            return true;
+        }
+        if(o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        ConfiguredObjectRecord that = (ConfiguredObjectRecord) o;
+
+        return _type.equals(that._type) && _id.equals(that._id) && _attributes.equals(that._attributes);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = _id.hashCode();
+        result = 31 * result + _type.hashCode();
+        result = 31 * result + _attributes.hashCode();
+        return result;
+    }
 }

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Fri Sep 20 18:59:30 2013
@@ -45,15 +45,13 @@ public interface DurableConfigurationSto
      *
      *
      *
-     * @param name             The name to be used by this store
-     * @param recoveryHandler  Handler to be called as the store recovers on start up
+     *
+     *
      * @param virtualHost
+     * @param recoveryHandler  Handler to be called as the store recovers on start up
      * @throws Exception If any error occurs that means the store is unable to configure itself.
      */
-    void configureConfigStore(String name,
-                              ConfigurationRecoveryHandler recoveryHandler,
-                              VirtualHost virtualHost) throws Exception;
-
+    void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception;
 
 
     /**
@@ -77,6 +75,8 @@ public interface DurableConfigurationSto
      */
     void remove(UUID id, String type) throws AMQStoreException;
 
+    public UUID[] removeConfiguredObjects(UUID... objects) throws AMQStoreException;
+
 
     /**
      * Updates the specified object in the persistent store, IF it is already present. If the object
@@ -91,4 +91,9 @@ public interface DurableConfigurationSto
     void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException;
 
 
+    public void update(ConfiguredObjectRecord... records) throws AMQStoreException;
+    public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException;
+
+
+    void close() throws Exception;
 }

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java Fri Sep 20 18:59:30 2013
@@ -20,10 +20,14 @@
  */
 package org.apache.qpid.server.store;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import java.util.Set;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -32,60 +36,69 @@ import org.apache.qpid.server.model.Exch
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
 
 public class DurableConfigurationStoreHelper
 {
 
+    private static final String BINDING = Binding.class.getSimpleName();
+    private static final String EXCHANGE = Exchange.class.getSimpleName();
+    private static final String QUEUE = Queue.class.getSimpleName();
+    private static final Set<String> QUEUE_ARGUMENTS_EXCLUDES = new HashSet<String>(Arrays.asList(Queue.NAME,
+                                                                                                  Queue.OWNER,
+                                                                                                  Queue.EXCLUSIVE,
+                                                                                                  Queue.ALTERNATE_EXCHANGE));
+
     public static void updateQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException
     {
         Map<String, Object> attributesMap = new LinkedHashMap<String, Object>();
         attributesMap.put(Queue.NAME, queue.getName());
-        attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner()));
+        attributesMap.put(Queue.OWNER, queue.getOwner());
         attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
+
         if (queue.getAlternateExchange() != null)
         {
             attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
         }
-        else
-        {
-            attributesMap.remove(Queue.ALTERNATE_EXCHANGE);
-        }
-        if (attributesMap.containsKey(Queue.ARGUMENTS))
-        {
-            // We wouldn't need this if createQueueConfiguredObject took only AMQQueue
-            Map<String, Object> currentArgs = (Map<String, Object>) attributesMap.get(Queue.ARGUMENTS);
-            currentArgs.putAll(queue.getArguments());
-        }
-        else
+
+        Collection<String> availableAttrs = queue.getAvailableAttributes();
+
+        for(String attrName : availableAttrs)
         {
-            attributesMap.put(Queue.ARGUMENTS, queue.getArguments());
+            if(!QUEUE_ARGUMENTS_EXCLUDES.contains(attrName))
+            {
+                attributesMap.put(attrName, queue.getAttribute(attrName));
+            }
         }
-        store.update(queue.getId(), Queue.class.getName(), attributesMap);
+
+        store.update(queue.getId(), QUEUE, attributesMap);
     }
 
-    public static void createQueue(DurableConfigurationStore store, AMQQueue queue, FieldTable arguments)
+    public static void createQueue(DurableConfigurationStore store, AMQQueue queue)
             throws AMQStoreException
     {
         Map<String, Object> attributesMap = new HashMap<String, Object>();
         attributesMap.put(Queue.NAME, queue.getName());
-        attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner()));
+        attributesMap.put(Queue.OWNER, queue.getOwner());
         attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
         if (queue.getAlternateExchange() != null)
         {
             attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
         }
-        // TODO KW i think the arguments could come from the queue itself removing the need for the parameter arguments.
-        // It would also do away with the need for the if/then/else within updateQueueConfiguredObject
-        if (arguments != null)
+
+        for(String attrName : queue.getAvailableAttributes())
         {
-            attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(arguments));
+            if(!QUEUE_ARGUMENTS_EXCLUDES.contains(attrName))
+            {
+                attributesMap.put(attrName, queue.getAttribute(attrName));
+            }
         }
-        store.create(queue.getId(),Queue.class.getName(),attributesMap);
+        store.create(queue.getId(), QUEUE,attributesMap);
     }
 
     public static void removeQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException
     {
-        store.remove(queue.getId(), Queue.class.getName());
+        store.remove(queue.getId(), QUEUE);
     }
 
     public static void createExchange(DurableConfigurationStore store, org.apache.qpid.server.exchange.Exchange exchange)
@@ -93,10 +106,10 @@ public class DurableConfigurationStoreHe
     {
         Map<String, Object> attributesMap = new HashMap<String, Object>();
         attributesMap.put(Exchange.NAME, exchange.getName());
-        attributesMap.put(Exchange.TYPE, AMQShortString.toString(exchange.getTypeShortString()));
+        attributesMap.put(Exchange.TYPE, exchange.getTypeName());
         attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name()
                 : LifetimePolicy.PERMANENT.name());
-        store.create(exchange.getId(), Exchange.class.getName(), attributesMap);
+        store.create(exchange.getId(), EXCHANGE, attributesMap);
 
     }
 
@@ -104,7 +117,7 @@ public class DurableConfigurationStoreHe
     public static void removeExchange(DurableConfigurationStore store, org.apache.qpid.server.exchange.Exchange exchange)
             throws AMQStoreException
     {
-        store.remove(exchange.getId(),Exchange.class.getName());
+        store.remove(exchange.getId(), EXCHANGE);
     }
 
     public static void createBinding(DurableConfigurationStore store, org.apache.qpid.server.binding.Binding binding)
@@ -119,14 +132,14 @@ public class DurableConfigurationStoreHe
         {
             attributesMap.put(Binding.ARGUMENTS, arguments);
         }
-        store.create(binding.getId(), Binding.class.getName(), attributesMap);
+        store.create(binding.getId(), BINDING, attributesMap);
     }
 
 
     public static void removeBinding(DurableConfigurationStore store, org.apache.qpid.server.binding.Binding binding)
                 throws AMQStoreException
     {
-        store.remove(binding.getId(), Binding.class.getName());
+        store.remove(binding.getId(), BINDING);
     }
 
 }

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Fri Sep 20 18:59:30 2013
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.server.store;
 
-import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.model.VirtualHost;
 
 /**
  * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
@@ -33,13 +33,14 @@ public interface MessageStore
      * whatever parameters it wants.
      *
      *
-     * @param name             The name to be used by this store
+     *
+     *
+     * @param virtualHost
      * @param messageRecoveryHandler  Handler to be called as the store recovers on start up
      * @param tlogRecoveryHandler
      * @throws Exception If any error occurs that means the store is unable to configure itself.
      */
-    void configureMessageStore(String name,
-                               MessageStoreRecoveryHandler messageRecoveryHandler,
+    void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
                                TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception;
 
     void activate() throws Exception;

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java Fri Sep 20 18:59:30 2013
@@ -61,7 +61,8 @@ public class MessageStoreCreator
         MessageStoreFactory factory = _factories.get(storeType.toLowerCase());
         if (factory == null)
         {
-            throw new IllegalConfigurationException("Unknown store type: " + storeType);
+            throw new IllegalConfigurationException("Unknown store type: " + storeType
+                                                    + ". Supported types: " + _factories.keySet());
         }
         return factory.createMessageStore();
     }

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java Fri Sep 20 18:59:30 2013
@@ -21,14 +21,13 @@ package org.apache.qpid.server.store;
 
 import java.util.Map;
 import java.util.UUID;
+import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.model.VirtualHost;
 
 public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
 {
     @Override
-    public void configureConfigStore(String name,
-                                     ConfigurationRecoveryHandler recoveryHandler,
-                                     VirtualHost virtualHost) throws Exception
+    public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception
     {
     }
 
@@ -38,18 +37,34 @@ public abstract class NullMessageStore i
     }
 
     @Override
+    public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+    {
+    }
+
+    @Override
+    public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException
+    {
+    }
+
+
+    @Override
     public void remove(UUID id, String type)
     {
     }
 
     @Override
+    public UUID[] removeConfiguredObjects(final UUID... objects) throws AMQStoreException
+    {
+        return objects;
+    }
+
+    @Override
     public void create(UUID id, String type, Map<String, Object> attributes)
     {
     }
 
     @Override
-    public void configureMessageStore(String name,
-                                      MessageStoreRecoveryHandler recoveryHandler,
+    public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
                                       TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
     {
     }

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java Fri Sep 20 18:59:30 2013
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.store;
 
 import java.nio.ByteBuffer;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
 
 public interface StorableMessageMetaData
 {
@@ -34,3 +35,4 @@ public interface StorableMessageMetaData
 
     boolean isPersistent();
 }
+

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Fri Sep 20 18:59:30 2013
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.subscription;
 
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.logging.LogActor;
@@ -29,6 +30,8 @@ import org.apache.qpid.server.queue.Queu
 
 public interface Subscription
 {
+    AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
+
     LogActor getLogActor();
 
     boolean isTransient();

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Fri Sep 20 18:59:30 2013
@@ -38,7 +38,7 @@ import java.util.List;
 /**
  * An implementation of ServerTransaction where each enqueue/dequeue
  * operation takes place within it own transaction.
- * 
+ *
  * Since there is no long-lived transaction, the commit and rollback methods of
  * this implementation are empty.
  */
@@ -98,13 +98,13 @@ public class AsyncAutoCommitTransaction 
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+                    _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
                 }
 
                 txn = _messageStore.newTransaction();
                 txn.dequeueMessage(queue, message);
                 future = txn.commitTranAsync();
-                
+
                 txn = null;
             }
             else
@@ -172,7 +172,7 @@ public class AsyncAutoCommitTransaction 
                 {
                     if (_logger.isDebugEnabled())
                     {
-                        _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+                        _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
                     }
 
                     if(txn == null)
@@ -220,7 +220,7 @@ public class AsyncAutoCommitTransaction 
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+                    _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
                 }
 
                 txn = _messageStore.newTransaction();
@@ -262,19 +262,19 @@ public class AsyncAutoCommitTransaction 
                     {
                         if (_logger.isDebugEnabled())
                         {
-                            _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+                            _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
                         }
                         if (txn == null)
                         {
                             txn = _messageStore.newTransaction();
                         }
-                        
+
                         txn.enqueueMessage(queue, message);
 
 
                     }
                 }
-                
+
             }
             StoreFuture future;
             if (txn != null)
@@ -320,8 +320,8 @@ public class AsyncAutoCommitTransaction 
                 }
             });
         }
-    }    
-    
+    }
+
     public void commit()
     {
     }

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Fri Sep 20 18:59:30 2013
@@ -37,7 +37,7 @@ import java.util.List;
 /**
  * An implementation of ServerTransaction where each enqueue/dequeue
  * operation takes place within it own transaction.
- * 
+ *
  * Since there is no long-lived transaction, the commit and rollback methods of
  * this implementation are empty.
  */
@@ -82,7 +82,7 @@ public class AutoCommitTransaction imple
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+                    _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
                 }
 
                 txn = _messageStore.newTransaction();
@@ -119,7 +119,7 @@ public class AutoCommitTransaction imple
                 {
                     if (_logger.isDebugEnabled())
                     {
-                        _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+                        _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
                     }
 
                     if(txn == null)
@@ -161,7 +161,7 @@ public class AutoCommitTransaction imple
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+                    _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
                 }
 
                 txn = _messageStore.newTransaction();
@@ -199,19 +199,19 @@ public class AutoCommitTransaction imple
                     {
                         if (_logger.isDebugEnabled())
                         {
-                            _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+                            _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
                         }
                         if (txn == null)
                         {
                             txn = _messageStore.newTransaction();
                         }
-                        
+
                         txn.enqueueMessage(queue, message);
 
 
                     }
                 }
-                
+
             }
             if (txn != null)
             {
@@ -240,8 +240,8 @@ public class AutoCommitTransaction imple
     public void commit(final Runnable immediatePostTransactionAction)
     {
         immediatePostTransactionAction.run();
-    }    
-    
+    }
+
     public void commit()
     {
     }

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Fri Sep 20 18:59:30 2013
@@ -1,5 +1,5 @@
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,16 +7,16 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 package org.apache.qpid.server.txn;
 
@@ -39,7 +39,7 @@ import java.util.List;
 /**
  * A concrete implementation of ServerTransaction where enqueue/dequeue
  * operations share a single long-lived transaction.
- * 
+ *
  * The caller is responsible for invoking commit() (or rollback()) as necessary.
  */
 public class LocalTransaction implements ServerTransaction
@@ -103,7 +103,7 @@ public class LocalTransaction implements
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+                    _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
                 }
 
                 beginTranIfNecessary();
@@ -135,7 +135,7 @@ public class LocalTransaction implements
                 {
                     if (_logger.isDebugEnabled())
                     {
-                        _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+                        _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
                     }
 
                     beginTranIfNecessary();
@@ -207,7 +207,7 @@ public class LocalTransaction implements
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+                    _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
                 }
 
                 beginTranIfNecessary();
@@ -238,7 +238,7 @@ public class LocalTransaction implements
                     {
                         if (_logger.isDebugEnabled())
                         {
-                            _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString() );
+                            _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() );
                         }
 
                         beginTranIfNecessary();

Propchange: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:r1501885-1525056

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Fri Sep 20 18:59:30 2013
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.server.configuration.ExchangeConfiguration;
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -43,7 +44,6 @@ import org.apache.qpid.server.exchange.D
 import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeInUseException;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.VirtualHostMessages;
@@ -51,7 +51,7 @@ import org.apache.qpid.server.model.UUID
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
+import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.DefaultQueueRegistry;
@@ -61,9 +61,11 @@ import org.apache.qpid.server.stats.Stat
 import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
+import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
 import org.apache.qpid.server.store.Event;
 import org.apache.qpid.server.store.EventListener;
 import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
 
 public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
 {
@@ -96,6 +98,7 @@ public abstract class AbstractVirtualHos
     private final ConnectionRegistry _connectionRegistry;
 
     private final DtxRegistry _dtxRegistry;
+    private final AMQQueueFactory _queueFactory;
 
     private volatile State _state = State.INITIALISING;
 
@@ -137,11 +140,14 @@ public abstract class AbstractVirtualHos
 
         _houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount());
 
+
         _queueRegistry = new DefaultQueueRegistry(this);
 
+        _queueFactory = new AMQQueueFactory(this, _queueRegistry);
+
         _exchangeFactory = new DefaultExchangeFactory(this);
 
-        _exchangeRegistry = new DefaultExchangeRegistry(this);
+        _exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry);
 
         initialiseStatistics();
 
@@ -299,12 +305,12 @@ public abstract class AbstractVirtualHos
 
     private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException
     {
-        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
+        AMQQueue queue = _queueFactory.createAMQQueueImpl(queueConfiguration);
         String queueName = queue.getName();
 
         if (queue.isDurable())
         {
-            DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue, null);
+            DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue);
         }
 
         //get the exchange name (returns default exchange name if none was specified)
@@ -429,12 +435,108 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
+    public AMQQueue getQueue(String name)
+    {
+        return _queueRegistry.getQueue(name);
+    }
+
+    @Override
+    public AMQQueue getQueue(UUID id)
+    {
+        return _queueRegistry.getQueue(id);
+    }
+
+    @Override
+    public Collection<AMQQueue> getQueues()
+    {
+        return _queueRegistry.getQueues();
+    }
+
+    @Override
+    public int removeQueue(AMQQueue queue) throws AMQException
+    {
+        synchronized (getQueueRegistry())
+        {
+            int purged = queue.delete();
+
+            getQueueRegistry().unregisterQueue(queue.getName());
+            if (queue.isDurable() && !queue.isAutoDelete())
+            {
+                DurableConfigurationStore store = getDurableConfigurationStore();
+                DurableConfigurationStoreHelper.removeQueue(store, queue);
+            }
+            return purged;
+        }
+    }
+
+    @Override
+    public AMQQueue createQueue(UUID id,
+                                String queueName,
+                                boolean durable,
+                                String owner,
+                                boolean autoDelete,
+                                boolean exclusive,
+                                boolean deleteOnNoConsumer,
+                                Map<String, Object> arguments) throws AMQException
+    {
+
+        if (queueName == null)
+        {
+            throw new IllegalArgumentException("Queue name must not be null");
+        }
+
+                // Access check
+        if (!getSecurityManager().authoriseCreateQueue(autoDelete,
+                                                       durable,
+                                                       exclusive,
+                                                       null,
+                                                       null,
+                                                       queueName,
+                                                       owner))
+        {
+            String description = "Permission denied: queue-name '" + queueName + "'";
+            throw new AMQSecurityException(description);
+        }
+
+        synchronized (_queueRegistry)
+        {
+            if(_queueRegistry.getQueue(queueName) != null)
+            {
+                throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName));
+            }
+            if(id == null)
+            {
+
+                id = UUIDGenerator.generateExchangeUUID(queueName, getName());
+                while(_queueRegistry.getQueue(id) != null)
+                {
+                    id = UUID.randomUUID();
+                }
+
+            }
+            else if(_queueRegistry.getQueue(id) != null)
+            {
+                throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName));
+            }
+            return _queueFactory.createQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer,
+                    arguments);
+        }
+
+    }
+
+    @Override
     public Exchange getExchange(String name)
     {
         return _exchangeRegistry.getExchange(name);
     }
 
     @Override
+    public Exchange getExchange(UUID id)
+    {
+        return _exchangeRegistry.getExchange(id);
+    }
+
+    @Override
     public Exchange getDefaultExchange()
     {
         return _exchangeRegistry.getDefaultExchange();
@@ -514,7 +616,7 @@ public abstract class AbstractVirtualHos
 
         for(ExchangeType type : getExchangeTypes())
         {
-            if(type.getDefaultExchangeName().toString().equals( exchange.getName() ))
+            if(type.getDefaultExchangeName().equals( exchange.getName() ))
             {
                 throw new RequiredExchangeException(exchange.getName());
             }
@@ -564,6 +666,18 @@ public abstract class AbstractVirtualHos
                 _logger.error("Failed to close message store", e);
             }
         }
+        if (getDurableConfigurationStore() != null)
+        {
+            //Remove MessageStore Interface should not throw Exception
+            try
+            {
+                getDurableConfigurationStore().close();
+            }
+            catch (Exception e)
+            {
+                _logger.error("Failed to close message store", e);
+            }
+        }
     }
 
 
@@ -745,6 +859,22 @@ public abstract class AbstractVirtualHos
         }
     }
 
+    protected Map<String, DurableConfiguredObjectRecoverer> getDurableConfigurationRecoverers()
+    {
+        DurableConfiguredObjectRecoverer[] recoverers = {
+          new QueueRecoverer(this, getExchangeRegistry(), _queueFactory),
+          new ExchangeRecoverer(getExchangeRegistry(), getExchangeFactory()),
+          new BindingRecoverer(this, getExchangeRegistry())
+        };
+
+        final Map<String, DurableConfiguredObjectRecoverer> recovererMap= new HashMap<String, DurableConfiguredObjectRecoverer>();
+        for(DurableConfiguredObjectRecoverer recoverer : recoverers)
+        {
+            recovererMap.put(recoverer.getType(), recoverer);
+        }
+        return recovererMap;
+    }
+
     private class VirtualHostHouseKeepingTask extends HouseKeepingTask
     {
         public VirtualHostHouseKeepingTask()
@@ -766,8 +896,7 @@ public abstract class AbstractVirtualHos
                     q.checkMessageStatus();
                 } catch (Exception e)
                 {
-                    _logger.error("Exception in housekeeping for queue: "
-                            + q.getNameShortString().toString(), e);
+                    _logger.error("Exception in housekeeping for queue: " + q.getName(), e);
                     //Don't throw exceptions as this will stop the
                     // house keeping task from running.
                 }

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java Fri Sep 20 18:59:30 2013
@@ -23,7 +23,9 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationRecoverer;
 import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.DurableConfigurationStoreCreator;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreCreator;
 import org.apache.qpid.server.store.OperationalLoggingListener;
@@ -68,7 +70,7 @@ public class StandardVirtualHost extends
 
         final
         MessageStoreLogSubject
-                storeLogSubject = new MessageStoreLogSubject(this, messageStore.getClass().getSimpleName());
+                storeLogSubject = new MessageStoreLogSubject(getName(), messageStore.getClass().getSimpleName());
         OperationalLoggingListener.listen(messageStore, storeLogSubject);
 
         return messageStore;
@@ -77,7 +79,14 @@ public class StandardVirtualHost extends
     private DurableConfigurationStore initialiseConfigurationStore(VirtualHost virtualHost) throws Exception
     {
         DurableConfigurationStore configurationStore;
-        if(getMessageStore() instanceof DurableConfigurationStore)
+        final Object storeTypeAttr = virtualHost.getAttribute(VirtualHost.CONFIG_STORE_TYPE);
+        String storeType = storeTypeAttr == null ? null : String.valueOf(storeTypeAttr);
+
+        if(storeType != null)
+        {
+            configurationStore = new DurableConfigurationStoreCreator().createMessageStore(storeType);
+        }
+        else if(getMessageStore() instanceof DurableConfigurationStore)
         {
             configurationStore = (DurableConfigurationStore) getMessageStore();
         }
@@ -96,11 +105,13 @@ public class StandardVirtualHost extends
 
         _durableConfigurationStore = initialiseConfigurationStore(virtualHost);
 
-        VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory());
+        DurableConfigurationRecoverer configRecoverer =
+                new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
+                                                  new DefaultUpgraderProvider(this, getExchangeRegistry()));
+        _durableConfigurationStore.configureConfigStore(virtualHost, configRecoverer);
 
-        _durableConfigurationStore.configureConfigStore(getName(), recoveryHandler, virtualHost);
-
-        _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler);
+        VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory());
+        _messageStore.configureMessageStore(virtualHost, recoveryHandler, recoveryHandler);
 
         initialiseModel(hostConfig);
 
@@ -109,25 +120,6 @@ public class StandardVirtualHost extends
         attainActivation();
     }
 
-
-    protected void closeStorage()
-    {
-        //Close MessageStore
-        if (_messageStore != null)
-        {
-            //Remove MessageStore Interface should not throw Exception
-            try
-            {
-                getMessageStore().close();
-            }
-            catch (Exception e)
-            {
-                getLogger().error("Failed to close message store", e);
-            }
-        }
-    }
-
-
     @Override
     public MessageStore getMessageStore()
     {

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java Fri Sep 20 18:59:30 2013
@@ -19,20 +19,14 @@ package org.apache.qpid.server.virtualho
  *
  */
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
 import org.apache.qpid.server.plugin.MessageStoreFactory;
 import org.apache.qpid.server.plugin.VirtualHostFactory;
 import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.store.MessageStoreConstants;
 import org.apache.qpid.server.store.MessageStoreCreator;
 
@@ -89,17 +83,6 @@ public class StandardVirtualHostFactory 
                 factory.validateAttributes(attributes);
             }
         }
-        // TODO - each store type should validate its own attributes
-        if(!((String) storeType).equalsIgnoreCase(MemoryMessageStore.TYPE))
-        {
-        /*    Object storePath = attributes.get(STORE_PATH_ATTRIBUTE);
-            if(!(storePath instanceof String))
-            {
-                throw new IllegalArgumentException("Attribute '"+ STORE_PATH_ATTRIBUTE
-                                                               +"' is required and must be of type String.");
-
-            }*/
-        }
 
     }
 

Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Sep 20 18:59:30 2013
@@ -21,18 +21,18 @@
 package org.apache.qpid.server.virtualhost;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ScheduledFuture;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.common.Closeable;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeInUseException;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
+import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.stats.StatisticsGatherer;
@@ -48,7 +48,23 @@ public interface VirtualHost extends Dur
 
     String getName();
 
-    QueueRegistry getQueueRegistry();
+    AMQQueue getQueue(String name);
+
+    AMQQueue getQueue(UUID id);
+
+    Collection<AMQQueue> getQueues();
+
+    int removeQueue(AMQQueue queue) throws AMQException;
+
+    AMQQueue createQueue(UUID id,
+                         String queueName,
+                         boolean durable,
+                         String owner,
+                         boolean autoDelete,
+                         boolean exclusive,
+                         boolean deleteOnNoConsumer,
+                         Map<String, Object> arguments) throws AMQException;
+
 
     Exchange createExchange(UUID id,
                             String exchange,
@@ -61,6 +77,8 @@ public interface VirtualHost extends Dur
     void removeExchange(Exchange exchange, boolean force) throws AMQException;
 
     Exchange getExchange(String name);
+    Exchange getExchange(UUID id);
+
 
     Exchange getDefaultExchange();
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org