You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC
svn commit: r1525101 [16/21] - in /qpid/branches/linearstore/qpid: ./ bin/
cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/
cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2...
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Fri Sep 20 18:59:30 2013
@@ -35,6 +35,8 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -45,6 +47,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.AMQQueue;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonParseException;
@@ -54,6 +57,7 @@ import org.codehaus.jackson.map.ObjectMa
abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
{
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
+ private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION";
private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES";
@@ -67,17 +71,27 @@ abstract public class AbstractJDBCMessag
private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
+ private static final int DEFAULT_CONFIG_VERSION = 0;
public static String[] ALL_TABLES = new String[] { DB_VERSION_TABLE_NAME, LINKS_TABLE_NAME, BRIDGES_TABLE_NAME, XID_ACTIONS_TABLE_NAME,
- XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME };
+ XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME };
- private static final int DB_VERSION = 6;
+ private static final int DB_VERSION = 7;
private final AtomicLong _messageId = new AtomicLong(0);
private AtomicBoolean _closed = new AtomicBoolean(false);
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
+ private static final String SELECT_FROM_DB_VERSION = "SELECT version FROM " + DB_VERSION_TABLE_NAME;
+ private static final String UPDATE_DB_VERSION = "UPDATE " + DB_VERSION_TABLE_NAME + " SET version = ?";
+
+
+ private static final String CREATE_CONFIG_VERSION_TABLE = "CREATE TABLE "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version int not null )";
+ private static final String INSERT_INTO_CONFIG_VERSION = "INSERT INTO "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
+ private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME;
+ private static final String UPDATE_CONFIG_VERSION = "UPDATE " + CONFIGURATION_VERSION_TABLE_NAME + " SET version = ?";
+
private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
@@ -155,6 +169,7 @@ abstract public class AbstractJDBCMessag
private MessageStoreRecoveryHandler _messageRecoveryHandler;
private TransactionLogRecoveryHandler _tlogRecoveryHandler;
private ConfigurationRecoveryHandler _configRecoveryHandler;
+ private VirtualHost _virtualHost;
public AbstractJDBCMessageStore()
{
@@ -162,48 +177,143 @@ abstract public class AbstractJDBCMessag
}
@Override
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler configRecoveryHandler,
- VirtualHost virtualHost) throws Exception
+ public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler configRecoveryHandler) throws Exception
{
_stateManager.attainState(State.INITIALISING);
_configRecoveryHandler = configRecoveryHandler;
-
- commonConfiguration(name, virtualHost);
+ _virtualHost = virtualHost;
}
@Override
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
+ public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
+ if(_stateManager.isInState(State.INITIAL))
+ {
+ _stateManager.attainState(State.INITIALISING);
+ }
+
+ _virtualHost = virtualHost;
_tlogRecoveryHandler = tlogRecoveryHandler;
_messageRecoveryHandler = recoveryHandler;
+ completeInitialisation();
+ }
+
+ private void completeInitialisation() throws ClassNotFoundException, SQLException, AMQStoreException
+ {
+ commonConfiguration();
+
_stateManager.attainState(State.INITIALISED);
}
@Override
public void activate() throws Exception
{
+ if(_stateManager.isInState(State.INITIALISING))
+ {
+ completeInitialisation();
+ }
_stateManager.attainState(State.ACTIVATING);
// this recovers durable exchanges, queues, and bindings
- recoverConfiguration(_configRecoveryHandler);
- recoverMessages(_messageRecoveryHandler);
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler);
- recoverXids(dtxrh);
+ if(_configRecoveryHandler != null)
+ {
+ recoverConfiguration(_configRecoveryHandler);
+ }
+ if(_messageRecoveryHandler != null)
+ {
+ recoverMessages(_messageRecoveryHandler);
+ }
+ if(_tlogRecoveryHandler != null)
+ {
+ TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler);
+ recoverXids(dtxrh);
+
+ }
_stateManager.attainState(State.ACTIVE);
}
- private void commonConfiguration(String name, VirtualHost virtualHost)
- throws ClassNotFoundException, SQLException
+ private void commonConfiguration()
+ throws ClassNotFoundException, SQLException, AMQStoreException
{
- implementationSpecificConfiguration(name, virtualHost);
+ implementationSpecificConfiguration(_virtualHost.getName(), _virtualHost);
createOrOpenDatabase();
+ upgradeIfNecessary();
+ }
+ protected void upgradeIfNecessary() throws SQLException, AMQStoreException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+
+ PreparedStatement statement = conn.prepareStatement(SELECT_FROM_DB_VERSION);
+ try
+ {
+ ResultSet rs = statement.executeQuery();
+ try
+ {
+ if(!rs.next())
+ {
+ throw new AMQStoreException(DB_VERSION_TABLE_NAME + " does not contain the database version");
+ }
+ int version = rs.getInt(1);
+ switch (version)
+ {
+ case 6:
+ upgradeFromV6();
+ case DB_VERSION:
+ return;
+ default:
+ throw new AMQStoreException("Unknown database version: " + version);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ statement.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ }
+
+ private void upgradeFromV6() throws SQLException
+ {
+ updateDbVersion(7);
+ }
+
+ private void updateDbVersion(int newVersion) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+
+ PreparedStatement statement = conn.prepareStatement(UPDATE_DB_VERSION);
+ try
+ {
+ statement.setInt(1,newVersion);
+ statement.execute();
+ }
+ finally
+ {
+ statement.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
}
protected abstract void implementationSpecificConfiguration(String name,
@@ -222,6 +332,7 @@ abstract public class AbstractJDBCMessag
Connection conn = newAutoCommitConnection();
createVersionTable(conn);
+ createConfigVersionTable(conn);
createConfiguredObjectsTable(conn);
createQueueEntryTable(conn);
createMetaDataTable(conn);
@@ -258,7 +369,33 @@ abstract public class AbstractJDBCMessag
pstmt.close();
}
}
+ }
+ private void createConfigVersionTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_CONFIG_VERSION_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_CONFIG_VERSION);
+ try
+ {
+ pstmt.setInt(1, DEFAULT_CONFIG_VERSION);
+ pstmt.execute();
+ }
+ finally
+ {
+ pstmt.close();
+ }
+ }
}
private void createConfiguredObjectsTable(final Connection conn) throws SQLException
@@ -278,6 +415,8 @@ abstract public class AbstractJDBCMessag
}
}
+
+
private void createQueueEntryTable(final Connection conn) throws SQLException
{
if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
@@ -456,10 +595,10 @@ abstract public class AbstractJDBCMessag
{
try
{
- recoveryHandler.beginConfigurationRecovery(this);
+ recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
loadConfiguredObjects(recoveryHandler);
- recoveryHandler.completeConfigurationRecovery();
+ setConfigVersion(recoveryHandler.completeConfigurationRecovery());
}
catch (SQLException e)
{
@@ -467,6 +606,67 @@ abstract public class AbstractJDBCMessag
}
}
+ private void setConfigVersion(int version) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+
+ PreparedStatement stmt = conn.prepareStatement(UPDATE_CONFIG_VERSION);
+ try
+ {
+ stmt.setInt(1, version);
+ stmt.execute();
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ private int getConfigVersion() throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION);
+ try
+ {
+
+ if(rs.next())
+ {
+ return rs.getInt(1);
+ }
+ return DEFAULT_CONFIG_VERSION;
+ }
+ finally
+ {
+ rs.close();
+ }
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ }
+
@Override
public void close() throws Exception
{
@@ -895,6 +1095,11 @@ abstract public class AbstractJDBCMessag
}
+ protected boolean isConfigStoreOnly()
+ {
+ return _messageRecoveryHandler == null;
+ }
+
private static final class ConnectionWrapper
{
private final Connection _connection;
@@ -1055,8 +1260,8 @@ abstract public class AbstractJDBCMessag
ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
buf.position(1);
buf = buf.slice();
- MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
- StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+ MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+ StorableMessageMetaData metaData = type.createMetaData(buf);
StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
messageHandler.message(message);
}
@@ -1307,8 +1512,8 @@ abstract public class AbstractJDBCMessag
ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
buf.position(1);
buf = buf.slice();
- MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
- StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+ MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+ StorableMessageMetaData metaData = type.createMetaData(buf);
return metaData;
}
@@ -1804,15 +2009,35 @@ abstract public class AbstractJDBCMessag
Connection conn = newAutoCommitConnection();
try
{
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
- try
- {
- stmt.setString(1, id.toString());
- results = stmt.executeUpdate();
- }
- finally
+ results = removeConfiguredObject(id, conn);
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e);
+ }
+ return results;
+ }
+
+ public UUID[] removeConfiguredObjects(UUID... objects) throws AMQStoreException
+ {
+ Collection<UUID> removed = new ArrayList<UUID>(objects.length);
+ try
+ {
+
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ for(UUID id : objects)
{
- stmt.close();
+ if(removeConfiguredObject(id, conn) != 0)
+ {
+ removed.add(id);
+ }
}
}
finally
@@ -1822,7 +2047,22 @@ abstract public class AbstractJDBCMessag
}
catch (SQLException e)
{
- throw new AMQStoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e);
+ throw new AMQStoreException("Error deleting of configured objects " + Arrays.asList(objects) + " from database: " + e.getMessage(), e);
+ }
+ return removed.toArray(new UUID[removed.size()]);
+ }
+
+ private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException
+ {
+ final int results;PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ stmt.setString(1, id.toString());
+ results = stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
}
return results;
}
@@ -1836,52 +2076,121 @@ abstract public class AbstractJDBCMessag
Connection conn = newAutoCommitConnection();
try
{
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
- try
+ updateConfiguredObject(configuredObject, false, conn);
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ update(false, records);
+ }
+
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.ACTIVATING))
+ {
+ try
+ {
+ Connection conn = newConnection();
+ try
+ {
+ for(ConfiguredObjectRecord record : records)
{
- stmt.setString(1, configuredObject.getId().toString());
- ResultSet rs = stmt.executeQuery();
+ updateConfiguredObject(record, createIfNecessary, conn);
+ }
+ conn.commit();
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error updating configured objects in database: " + e.getMessage(), e);
+ }
+
+ }
+
+ }
+
+ private void updateConfiguredObject(ConfiguredObjectRecord configuredObject,
+ boolean createIfNecessary,
+ Connection conn)
+ throws SQLException, AMQStoreException
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ if (rs.next())
+ {
+ PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
try
{
- if (rs.next())
+ stmt2.setString(1, configuredObject.getType());
+ if (configuredObject.getAttributes() != null)
{
- PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
- try
- {
- stmt2.setString(1, configuredObject.getType());
- if (configuredObject.getAttributes() != null)
- {
- byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes(
- configuredObject.getAttributes());
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
- }
- else
- {
- stmt2.setNull(2, Types.BLOB);
- }
- stmt2.setString(3, configuredObject.getId().toString());
- stmt2.execute();
- }
- finally
- {
- stmt2.close();
- }
+ byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes(
+ configuredObject.getAttributes());
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
+ }
+ else
+ {
+ stmt2.setNull(2, Types.BLOB);
}
+ stmt2.setString(3, configuredObject.getId().toString());
+ stmt2.execute();
}
finally
{
- rs.close();
+ stmt2.close();
}
}
- finally
+ else if(createIfNecessary)
{
- stmt.close();
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+ try
+ {
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
+ {
+ insertStmt.setNull(3, Types.BLOB);
+ }
+ else
+ {
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ byte[] attributesAsBytes = new ObjectMapper().writeValueAsBytes(attributes);
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+ }
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
}
}
finally
{
- conn.close();
+ rs.close();
}
}
catch (JsonMappingException e)
@@ -1896,11 +2205,11 @@ abstract public class AbstractJDBCMessag
{
throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
}
- catch (SQLException e)
+ finally
{
- throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ stmt.close();
}
- }
+
}
private ConfiguredObjectRecord loadConfiguredObject(final UUID id) throws AMQStoreException
Propchange: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java:r1501885-1525056
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Fri Sep 20 18:59:30 2013
@@ -28,10 +28,14 @@ import java.util.UUID;
public interface ConfigurationRecoveryHandler
{
- void beginConfigurationRecovery(DurableConfigurationStore store);
+ void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion);
void configuredObject(UUID id, String type, Map<String, Object> attributes);
- void completeConfigurationRecovery();
+ /**
+ *
+ * @return the model version of the configuration
+ */
+ int completeConfigurationRecovery();
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java Fri Sep 20 18:59:30 2013
@@ -60,4 +60,29 @@ public class ConfiguredObjectRecord
return "ConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", attributes=" + _attributes + "]";
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if(this == o)
+ {
+ return true;
+ }
+ if(o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ ConfiguredObjectRecord that = (ConfiguredObjectRecord) o;
+
+ return _type.equals(that._type) && _id.equals(that._id) && _attributes.equals(that._attributes);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _id.hashCode();
+ result = 31 * result + _type.hashCode();
+ result = 31 * result + _attributes.hashCode();
+ return result;
+ }
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Fri Sep 20 18:59:30 2013
@@ -45,15 +45,13 @@ public interface DurableConfigurationSto
*
*
*
- * @param name The name to be used by this store
- * @param recoveryHandler Handler to be called as the store recovers on start up
+ *
+ *
* @param virtualHost
+ * @param recoveryHandler Handler to be called as the store recovers on start up
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
- void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- VirtualHost virtualHost) throws Exception;
-
+ void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception;
/**
@@ -77,6 +75,8 @@ public interface DurableConfigurationSto
*/
void remove(UUID id, String type) throws AMQStoreException;
+ public UUID[] removeConfiguredObjects(UUID... objects) throws AMQStoreException;
+
/**
* Updates the specified object in the persistent store, IF it is already present. If the object
@@ -91,4 +91,9 @@ public interface DurableConfigurationSto
void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException;
+ public void update(ConfiguredObjectRecord... records) throws AMQStoreException;
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException;
+
+
+ void close() throws Exception;
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java Fri Sep 20 18:59:30 2013
@@ -20,10 +20,14 @@
*/
package org.apache.qpid.server.store;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -32,60 +36,69 @@ import org.apache.qpid.server.model.Exch
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
public class DurableConfigurationStoreHelper
{
+ private static final String BINDING = Binding.class.getSimpleName();
+ private static final String EXCHANGE = Exchange.class.getSimpleName();
+ private static final String QUEUE = Queue.class.getSimpleName();
+ private static final Set<String> QUEUE_ARGUMENTS_EXCLUDES = new HashSet<String>(Arrays.asList(Queue.NAME,
+ Queue.OWNER,
+ Queue.EXCLUSIVE,
+ Queue.ALTERNATE_EXCHANGE));
+
public static void updateQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException
{
Map<String, Object> attributesMap = new LinkedHashMap<String, Object>();
attributesMap.put(Queue.NAME, queue.getName());
- attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner()));
+ attributesMap.put(Queue.OWNER, queue.getOwner());
attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
+
if (queue.getAlternateExchange() != null)
{
attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
}
- else
- {
- attributesMap.remove(Queue.ALTERNATE_EXCHANGE);
- }
- if (attributesMap.containsKey(Queue.ARGUMENTS))
- {
- // We wouldn't need this if createQueueConfiguredObject took only AMQQueue
- Map<String, Object> currentArgs = (Map<String, Object>) attributesMap.get(Queue.ARGUMENTS);
- currentArgs.putAll(queue.getArguments());
- }
- else
+
+ Collection<String> availableAttrs = queue.getAvailableAttributes();
+
+ for(String attrName : availableAttrs)
{
- attributesMap.put(Queue.ARGUMENTS, queue.getArguments());
+ if(!QUEUE_ARGUMENTS_EXCLUDES.contains(attrName))
+ {
+ attributesMap.put(attrName, queue.getAttribute(attrName));
+ }
}
- store.update(queue.getId(), Queue.class.getName(), attributesMap);
+
+ store.update(queue.getId(), QUEUE, attributesMap);
}
- public static void createQueue(DurableConfigurationStore store, AMQQueue queue, FieldTable arguments)
+ public static void createQueue(DurableConfigurationStore store, AMQQueue queue)
throws AMQStoreException
{
Map<String, Object> attributesMap = new HashMap<String, Object>();
attributesMap.put(Queue.NAME, queue.getName());
- attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner()));
+ attributesMap.put(Queue.OWNER, queue.getOwner());
attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
if (queue.getAlternateExchange() != null)
{
attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
}
- // TODO KW i think the arguments could come from the queue itself removing the need for the parameter arguments.
- // It would also do away with the need for the if/then/else within updateQueueConfiguredObject
- if (arguments != null)
+
+ for(String attrName : queue.getAvailableAttributes())
{
- attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(arguments));
+ if(!QUEUE_ARGUMENTS_EXCLUDES.contains(attrName))
+ {
+ attributesMap.put(attrName, queue.getAttribute(attrName));
+ }
}
- store.create(queue.getId(),Queue.class.getName(),attributesMap);
+ store.create(queue.getId(), QUEUE,attributesMap);
}
public static void removeQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException
{
- store.remove(queue.getId(), Queue.class.getName());
+ store.remove(queue.getId(), QUEUE);
}
public static void createExchange(DurableConfigurationStore store, org.apache.qpid.server.exchange.Exchange exchange)
@@ -93,10 +106,10 @@ public class DurableConfigurationStoreHe
{
Map<String, Object> attributesMap = new HashMap<String, Object>();
attributesMap.put(Exchange.NAME, exchange.getName());
- attributesMap.put(Exchange.TYPE, AMQShortString.toString(exchange.getTypeShortString()));
+ attributesMap.put(Exchange.TYPE, exchange.getTypeName());
attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name()
: LifetimePolicy.PERMANENT.name());
- store.create(exchange.getId(), Exchange.class.getName(), attributesMap);
+ store.create(exchange.getId(), EXCHANGE, attributesMap);
}
@@ -104,7 +117,7 @@ public class DurableConfigurationStoreHe
public static void removeExchange(DurableConfigurationStore store, org.apache.qpid.server.exchange.Exchange exchange)
throws AMQStoreException
{
- store.remove(exchange.getId(),Exchange.class.getName());
+ store.remove(exchange.getId(), EXCHANGE);
}
public static void createBinding(DurableConfigurationStore store, org.apache.qpid.server.binding.Binding binding)
@@ -119,14 +132,14 @@ public class DurableConfigurationStoreHe
{
attributesMap.put(Binding.ARGUMENTS, arguments);
}
- store.create(binding.getId(), Binding.class.getName(), attributesMap);
+ store.create(binding.getId(), BINDING, attributesMap);
}
public static void removeBinding(DurableConfigurationStore store, org.apache.qpid.server.binding.Binding binding)
throws AMQStoreException
{
- store.remove(binding.getId(), Binding.class.getName());
+ store.remove(binding.getId(), BINDING);
}
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Fri Sep 20 18:59:30 2013
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.store;
-import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.model.VirtualHost;
/**
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
@@ -33,13 +33,14 @@ public interface MessageStore
* whatever parameters it wants.
*
*
- * @param name The name to be used by this store
+ *
+ *
+ * @param virtualHost
* @param messageRecoveryHandler Handler to be called as the store recovers on start up
* @param tlogRecoveryHandler
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
- void configureMessageStore(String name,
- MessageStoreRecoveryHandler messageRecoveryHandler,
+ void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception;
void activate() throws Exception;
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java Fri Sep 20 18:59:30 2013
@@ -61,7 +61,8 @@ public class MessageStoreCreator
MessageStoreFactory factory = _factories.get(storeType.toLowerCase());
if (factory == null)
{
- throw new IllegalConfigurationException("Unknown store type: " + storeType);
+ throw new IllegalConfigurationException("Unknown store type: " + storeType
+ + ". Supported types: " + _factories.keySet());
}
return factory.createMessageStore();
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java Fri Sep 20 18:59:30 2013
@@ -21,14 +21,13 @@ package org.apache.qpid.server.store;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.model.VirtualHost;
public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
{
@Override
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- VirtualHost virtualHost) throws Exception
+ public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception
{
}
@@ -38,18 +37,34 @@ public abstract class NullMessageStore i
}
@Override
+ public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ }
+
+
+ @Override
public void remove(UUID id, String type)
{
}
@Override
+ public UUID[] removeConfiguredObjects(final UUID... objects) throws AMQStoreException
+ {
+ return objects;
+ }
+
+ @Override
public void create(UUID id, String type, Map<String, Object> attributes)
{
}
@Override
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
+ public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java Fri Sep 20 18:59:30 2013
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store;
import java.nio.ByteBuffer;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
public interface StorableMessageMetaData
{
@@ -34,3 +35,4 @@ public interface StorableMessageMetaData
boolean isPersistent();
}
+
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Fri Sep 20 18:59:30 2013
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.subscription;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.logging.LogActor;
@@ -29,6 +30,8 @@ import org.apache.qpid.server.queue.Queu
public interface Subscription
{
+ AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
+
LogActor getLogActor();
boolean isTransient();
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Fri Sep 20 18:59:30 2013
@@ -38,7 +38,7 @@ import java.util.List;
/**
* An implementation of ServerTransaction where each enqueue/dequeue
* operation takes place within it own transaction.
- *
+ *
* Since there is no long-lived transaction, the commit and rollback methods of
* this implementation are empty.
*/
@@ -98,13 +98,13 @@ public class AsyncAutoCommitTransaction
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+ _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
}
txn = _messageStore.newTransaction();
txn.dequeueMessage(queue, message);
future = txn.commitTranAsync();
-
+
txn = null;
}
else
@@ -172,7 +172,7 @@ public class AsyncAutoCommitTransaction
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+ _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
}
if(txn == null)
@@ -220,7 +220,7 @@ public class AsyncAutoCommitTransaction
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
}
txn = _messageStore.newTransaction();
@@ -262,19 +262,19 @@ public class AsyncAutoCommitTransaction
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
}
if (txn == null)
{
txn = _messageStore.newTransaction();
}
-
+
txn.enqueueMessage(queue, message);
}
}
-
+
}
StoreFuture future;
if (txn != null)
@@ -320,8 +320,8 @@ public class AsyncAutoCommitTransaction
}
});
}
- }
-
+ }
+
public void commit()
{
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Fri Sep 20 18:59:30 2013
@@ -37,7 +37,7 @@ import java.util.List;
/**
* An implementation of ServerTransaction where each enqueue/dequeue
* operation takes place within it own transaction.
- *
+ *
* Since there is no long-lived transaction, the commit and rollback methods of
* this implementation are empty.
*/
@@ -82,7 +82,7 @@ public class AutoCommitTransaction imple
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+ _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
}
txn = _messageStore.newTransaction();
@@ -119,7 +119,7 @@ public class AutoCommitTransaction imple
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+ _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
}
if(txn == null)
@@ -161,7 +161,7 @@ public class AutoCommitTransaction imple
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
}
txn = _messageStore.newTransaction();
@@ -199,19 +199,19 @@ public class AutoCommitTransaction imple
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
}
if (txn == null)
{
txn = _messageStore.newTransaction();
}
-
+
txn.enqueueMessage(queue, message);
}
}
-
+
}
if (txn != null)
{
@@ -240,8 +240,8 @@ public class AutoCommitTransaction imple
public void commit(final Runnable immediatePostTransactionAction)
{
immediatePostTransactionAction.run();
- }
-
+ }
+
public void commit()
{
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Fri Sep 20 18:59:30 2013
@@ -1,5 +1,5 @@
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,16 +7,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
package org.apache.qpid.server.txn;
@@ -39,7 +39,7 @@ import java.util.List;
/**
* A concrete implementation of ServerTransaction where enqueue/dequeue
* operations share a single long-lived transaction.
- *
+ *
* The caller is responsible for invoking commit() (or rollback()) as necessary.
*/
public class LocalTransaction implements ServerTransaction
@@ -103,7 +103,7 @@ public class LocalTransaction implements
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+ _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
}
beginTranIfNecessary();
@@ -135,7 +135,7 @@ public class LocalTransaction implements
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+ _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
}
beginTranIfNecessary();
@@ -207,7 +207,7 @@ public class LocalTransaction implements
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
}
beginTranIfNecessary();
@@ -238,7 +238,7 @@ public class LocalTransaction implements
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString() );
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() );
}
beginTranIfNecessary();
Propchange: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:r1501885-1525056
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Fri Sep 20 18:59:30 2013
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -43,7 +44,6 @@ import org.apache.qpid.server.exchange.D
import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
@@ -51,7 +51,7 @@ import org.apache.qpid.server.model.UUID
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
+import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
@@ -61,9 +61,11 @@ import org.apache.qpid.server.stats.Stat
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
+import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
{
@@ -96,6 +98,7 @@ public abstract class AbstractVirtualHos
private final ConnectionRegistry _connectionRegistry;
private final DtxRegistry _dtxRegistry;
+ private final AMQQueueFactory _queueFactory;
private volatile State _state = State.INITIALISING;
@@ -137,11 +140,14 @@ public abstract class AbstractVirtualHos
_houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount());
+
_queueRegistry = new DefaultQueueRegistry(this);
+ _queueFactory = new AMQQueueFactory(this, _queueRegistry);
+
_exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeRegistry = new DefaultExchangeRegistry(this);
+ _exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry);
initialiseStatistics();
@@ -299,12 +305,12 @@ public abstract class AbstractVirtualHos
private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(queueConfiguration);
String queueName = queue.getName();
if (queue.isDurable())
{
- DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue, null);
+ DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue);
}
//get the exchange name (returns default exchange name if none was specified)
@@ -429,12 +435,108 @@ public abstract class AbstractVirtualHos
}
@Override
+ public AMQQueue getQueue(String name)
+ {
+ return _queueRegistry.getQueue(name);
+ }
+
+ @Override
+ public AMQQueue getQueue(UUID id)
+ {
+ return _queueRegistry.getQueue(id);
+ }
+
+ @Override
+ public Collection<AMQQueue> getQueues()
+ {
+ return _queueRegistry.getQueues();
+ }
+
+ @Override
+ public int removeQueue(AMQQueue queue) throws AMQException
+ {
+ synchronized (getQueueRegistry())
+ {
+ int purged = queue.delete();
+
+ getQueueRegistry().unregisterQueue(queue.getName());
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ DurableConfigurationStore store = getDurableConfigurationStore();
+ DurableConfigurationStoreHelper.removeQueue(store, queue);
+ }
+ return purged;
+ }
+ }
+
+ @Override
+ public AMQQueue createQueue(UUID id,
+ String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments) throws AMQException
+ {
+
+ if (queueName == null)
+ {
+ throw new IllegalArgumentException("Queue name must not be null");
+ }
+
+ // Access check
+ if (!getSecurityManager().authoriseCreateQueue(autoDelete,
+ durable,
+ exclusive,
+ null,
+ null,
+ queueName,
+ owner))
+ {
+ String description = "Permission denied: queue-name '" + queueName + "'";
+ throw new AMQSecurityException(description);
+ }
+
+ synchronized (_queueRegistry)
+ {
+ if(_queueRegistry.getQueue(queueName) != null)
+ {
+ throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName));
+ }
+ if(id == null)
+ {
+
+ id = UUIDGenerator.generateExchangeUUID(queueName, getName());
+ while(_queueRegistry.getQueue(id) != null)
+ {
+ id = UUID.randomUUID();
+ }
+
+ }
+ else if(_queueRegistry.getQueue(id) != null)
+ {
+ throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName));
+ }
+ return _queueFactory.createQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer,
+ arguments);
+ }
+
+ }
+
+ @Override
public Exchange getExchange(String name)
{
return _exchangeRegistry.getExchange(name);
}
@Override
+ public Exchange getExchange(UUID id)
+ {
+ return _exchangeRegistry.getExchange(id);
+ }
+
+ @Override
public Exchange getDefaultExchange()
{
return _exchangeRegistry.getDefaultExchange();
@@ -514,7 +616,7 @@ public abstract class AbstractVirtualHos
for(ExchangeType type : getExchangeTypes())
{
- if(type.getDefaultExchangeName().toString().equals( exchange.getName() ))
+ if(type.getDefaultExchangeName().equals( exchange.getName() ))
{
throw new RequiredExchangeException(exchange.getName());
}
@@ -564,6 +666,18 @@ public abstract class AbstractVirtualHos
_logger.error("Failed to close message store", e);
}
}
+ if (getDurableConfigurationStore() != null)
+ {
+ //Remove MessageStore Interface should not throw Exception
+ try
+ {
+ getDurableConfigurationStore().close();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Failed to close message store", e);
+ }
+ }
}
@@ -745,6 +859,22 @@ public abstract class AbstractVirtualHos
}
}
+ protected Map<String, DurableConfiguredObjectRecoverer> getDurableConfigurationRecoverers()
+ {
+ DurableConfiguredObjectRecoverer[] recoverers = {
+ new QueueRecoverer(this, getExchangeRegistry(), _queueFactory),
+ new ExchangeRecoverer(getExchangeRegistry(), getExchangeFactory()),
+ new BindingRecoverer(this, getExchangeRegistry())
+ };
+
+ final Map<String, DurableConfiguredObjectRecoverer> recovererMap= new HashMap<String, DurableConfiguredObjectRecoverer>();
+ for(DurableConfiguredObjectRecoverer recoverer : recoverers)
+ {
+ recovererMap.put(recoverer.getType(), recoverer);
+ }
+ return recovererMap;
+ }
+
private class VirtualHostHouseKeepingTask extends HouseKeepingTask
{
public VirtualHostHouseKeepingTask()
@@ -766,8 +896,7 @@ public abstract class AbstractVirtualHos
q.checkMessageStatus();
} catch (Exception e)
{
- _logger.error("Exception in housekeeping for queue: "
- + q.getNameShortString().toString(), e);
+ _logger.error("Exception in housekeeping for queue: " + q.getName(), e);
//Don't throw exceptions as this will stop the
// house keeping task from running.
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java Fri Sep 20 18:59:30 2013
@@ -23,7 +23,9 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.DurableConfigurationStoreCreator;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreCreator;
import org.apache.qpid.server.store.OperationalLoggingListener;
@@ -68,7 +70,7 @@ public class StandardVirtualHost extends
final
MessageStoreLogSubject
- storeLogSubject = new MessageStoreLogSubject(this, messageStore.getClass().getSimpleName());
+ storeLogSubject = new MessageStoreLogSubject(getName(), messageStore.getClass().getSimpleName());
OperationalLoggingListener.listen(messageStore, storeLogSubject);
return messageStore;
@@ -77,7 +79,14 @@ public class StandardVirtualHost extends
private DurableConfigurationStore initialiseConfigurationStore(VirtualHost virtualHost) throws Exception
{
DurableConfigurationStore configurationStore;
- if(getMessageStore() instanceof DurableConfigurationStore)
+ final Object storeTypeAttr = virtualHost.getAttribute(VirtualHost.CONFIG_STORE_TYPE);
+ String storeType = storeTypeAttr == null ? null : String.valueOf(storeTypeAttr);
+
+ if(storeType != null)
+ {
+ configurationStore = new DurableConfigurationStoreCreator().createMessageStore(storeType);
+ }
+ else if(getMessageStore() instanceof DurableConfigurationStore)
{
configurationStore = (DurableConfigurationStore) getMessageStore();
}
@@ -96,11 +105,13 @@ public class StandardVirtualHost extends
_durableConfigurationStore = initialiseConfigurationStore(virtualHost);
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory());
+ DurableConfigurationRecoverer configRecoverer =
+ new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
+ new DefaultUpgraderProvider(this, getExchangeRegistry()));
+ _durableConfigurationStore.configureConfigStore(virtualHost, configRecoverer);
- _durableConfigurationStore.configureConfigStore(getName(), recoveryHandler, virtualHost);
-
- _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler);
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory());
+ _messageStore.configureMessageStore(virtualHost, recoveryHandler, recoveryHandler);
initialiseModel(hostConfig);
@@ -109,25 +120,6 @@ public class StandardVirtualHost extends
attainActivation();
}
-
- protected void closeStorage()
- {
- //Close MessageStore
- if (_messageStore != null)
- {
- //Remove MessageStore Interface should not throw Exception
- try
- {
- getMessageStore().close();
- }
- catch (Exception e)
- {
- getLogger().error("Failed to close message store", e);
- }
- }
- }
-
-
@Override
public MessageStore getMessageStore()
{
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java Fri Sep 20 18:59:30 2013
@@ -19,20 +19,14 @@ package org.apache.qpid.server.virtualho
*
*/
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
import org.apache.qpid.server.plugin.MessageStoreFactory;
import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.MessageStoreConstants;
import org.apache.qpid.server.store.MessageStoreCreator;
@@ -89,17 +83,6 @@ public class StandardVirtualHostFactory
factory.validateAttributes(attributes);
}
}
- // TODO - each store type should validate its own attributes
- if(!((String) storeType).equalsIgnoreCase(MemoryMessageStore.TYPE))
- {
- /* Object storePath = attributes.get(STORE_PATH_ATTRIBUTE);
- if(!(storePath instanceof String))
- {
- throw new IllegalArgumentException("Attribute '"+ STORE_PATH_ATTRIBUTE
- +"' is required and must be of type String.");
-
- }*/
- }
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Sep 20 18:59:30 2013
@@ -21,18 +21,18 @@
package org.apache.qpid.server.virtualhost;
import java.util.Collection;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeInUseException;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
+import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsGatherer;
@@ -48,7 +48,23 @@ public interface VirtualHost extends Dur
String getName();
- QueueRegistry getQueueRegistry();
+ AMQQueue getQueue(String name);
+
+ AMQQueue getQueue(UUID id);
+
+ Collection<AMQQueue> getQueues();
+
+ int removeQueue(AMQQueue queue) throws AMQException;
+
+ AMQQueue createQueue(UUID id,
+ String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments) throws AMQException;
+
Exchange createExchange(UUID id,
String exchange,
@@ -61,6 +77,8 @@ public interface VirtualHost extends Dur
void removeExchange(Exchange exchange, boolean force) throws AMQException;
Exchange getExchange(String name);
+ Exchange getExchange(UUID id);
+
Exchange getDefaultExchange();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org