You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2012/04/17 17:04:38 UTC
svn commit: r1327128 [4/5] - in
/qpid/branches/java-config-and-management/qpid/java: ./
amqp-1-0-client-jms/ amqp-1-0-client/ amqp-1-0-common/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server...
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java?rev=1327128&r1=1327127&r2=1327128&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java Tue Apr 17 15:04:34 2012
@@ -29,6 +29,7 @@ import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.Driver;
@@ -51,12 +52,15 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.federation.Bridge;
import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.ConfiguredObjectHelper;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.EventManager;
@@ -89,12 +93,9 @@ public class DerbyMessageStore implement
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
- private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE";
- private static final String QUEUE_TABLE_NAME = "QPID_QUEUE";
- private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS";
- private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY";
+ private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES";
- private static final String META_DATA_TABLE_NAME = "QPID_META_DATA";
+ private static final String META_DATA_TABLE_NAME = "QPID_MESSAGE_METADATA";
private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
private static final String LINKS_TABLE_NAME = "QPID_LINKS";
@@ -103,7 +104,9 @@ public class DerbyMessageStore implement
private static final String XID_TABLE_NAME = "QPID_XIDS";
private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
- private static final int DB_VERSION = 3;
+ private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
+
+ private static final int DB_VERSION = 6;
@@ -119,38 +122,23 @@ public class DerbyMessageStore implement
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
- private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
- private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), exclusive SMALLINT not null, arguments blob, PRIMARY KEY ( name ))";
- private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
- private static final String SELECT_FROM_QUEUE = "SELECT name, owner, exclusive, arguments FROM " + QUEUE_TABLE_NAME;
- private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
- private static final String UPDATE_QUEUE_EXCLUSIVITY = "UPDATE " + QUEUE_TABLE_NAME + " SET exclusive = ? WHERE name = ?";
- private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
- private static final String SELECT_FROM_BINDINGS =
- "SELECT exchange_name, queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " ORDER BY exchange_name";
- private static final String FIND_BINDING =
- "SELECT * FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ? ";
- private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
- private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
- private static final String FIND_EXCHANGE = "SELECT name FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
- private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )";
- private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?";
- private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner, exclusive, arguments) VALUES (?, ?, ?, ?)";
- private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
-
- private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
- private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)";
- private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?";
- private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_name, message_id";
-
-
- private static final String CREATE_META_DATA_TABLE = "CREATE TABLE "+META_DATA_TABLE_NAME+" ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )";
- private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, offset int not null, last_byte int not null, content blob , PRIMARY KEY (message_id, offset) )";
-
- private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, offset, last_byte, content ) values (?, ?, ?, ?)";
- private static final String SELECT_FROM_MESSAGE_CONTENT =
- "SELECT offset, content FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? AND last_byte > ? AND offset < ? ORDER BY message_id, offset";
- private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
+ private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_id varchar(36) not null, message_id bigint not null, PRIMARY KEY (queue_id, message_id) )";
+ private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
+ private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
+ private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id";
+
+
+ private static final String CREATE_META_DATA_TABLE = "CREATE TABLE " + META_DATA_TABLE_NAME
+ + " ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )";
+ private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE " + MESSAGE_CONTENT_TABLE_NAME
+ + " ( message_id bigint not null, content blob , PRIMARY KEY (message_id) )";
+
+ private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME
+ + "( message_id, content ) values (?, ?)";
+ private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME
+ + " WHERE message_id = ?";
+ private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME
+ + " WHERE message_id = ?";
private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";;
private static final String SELECT_FROM_META_DATA =
@@ -214,18 +202,32 @@ public class DerbyMessageStore implement
private static final String CREATE_XID_ACTIONS_TABLE =
"CREATE TABLE "+XID_ACTIONS_TABLE_NAME+" ( format bigint not null,"
+ " global_id varchar(64) for bit data not null, branch_id varchar(64) for bit data not null, " +
- "action_type char not null, queue_name varchar(255) not null, message_id bigint not null" +
+ "action_type char not null, queue_id varchar(36) not null, message_id bigint not null" +
", PRIMARY KEY ( " +
- "format, global_id, branch_id, action_type, queue_name, message_id))";
+ "format, global_id, branch_id, action_type, queue_id, message_id))";
private static final String INSERT_INTO_XID_ACTIONS =
"INSERT INTO "+XID_ACTIONS_TABLE_NAME+" ( format, global_id, branch_id, action_type, " +
- "queue_name, message_id ) values (?,?,?,?,?,?) ";
+ "queue_id, message_id ) values (?,?,?,?,?,?) ";
private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME
+ " WHERE format = ? and global_id = ? and branch_id = ?";
private static final String SELECT_ALL_FROM_XID_ACTIONS =
- "SELECT action_type, queue_name, message_id FROM " + XID_ACTIONS_TABLE_NAME +
+ "SELECT action_type, queue_id, message_id FROM " + XID_ACTIONS_TABLE_NAME +
" WHERE format = ? and global_id = ? and branch_id = ?";
+ private static final String CREATE_CONFIGURED_OBJECTS_TABLE = "CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " ( id VARCHAR(36) not null, object_type varchar(255), attributes blob, PRIMARY KEY (id))";
+ private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " ( id, object_type, attributes) VALUES (?,?,?)";
+ private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " set object_type =?, attributes = ? where id = ?";
+ private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " where id = ?";
+ private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " where id = ?";
+ private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME;
+
+ private final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
private final StateManager _stateManager;
@@ -244,6 +246,8 @@ public class DerbyMessageStore implement
_stateManager = new StateManager(_eventManager);
}
+ private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper();
+
@Override
public void configureConfigStore(String name,
ConfigurationRecoveryHandler configRecoveryHandler,
@@ -323,9 +327,7 @@ public class DerbyMessageStore implement
Connection conn = newAutoCommitConnection();
createVersionTable(conn);
- createExchangeTable(conn);
- createQueueTable(conn);
- createBindingsTable(conn);
+ createConfiguredObjectsTable(conn);
createQueueEntryTable(conn);
createMetaDataTable(conn);
createMessageContentTable(conn);
@@ -366,31 +368,14 @@ public class DerbyMessageStore implement
}
-
- private void createExchangeTable(final Connection conn) throws SQLException
- {
- if(!tableExists(EXCHANGE_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute(CREATE_EXCHANGE_TABLE);
- }
- finally
- {
- stmt.close();
- }
- }
- }
-
- private void createQueueTable(final Connection conn) throws SQLException
+ private void createConfiguredObjectsTable(final Connection conn) throws SQLException
{
- if(!tableExists(QUEUE_TABLE_NAME, conn))
+ if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
- stmt.execute(CREATE_QUEUE_TABLE);
+ stmt.execute(CREATE_CONFIGURED_OBJECTS_TABLE);
}
finally
{
@@ -399,23 +384,6 @@ public class DerbyMessageStore implement
}
}
- private void createBindingsTable(final Connection conn) throws SQLException
- {
- if(!tableExists(BINDINGS_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute(CREATE_BINDINGS_TABLE);
- }
- finally
- {
- stmt.close();
- }
- }
-
- }
-
private void createQueueEntryTable(final Connection conn) throws SQLException
{
if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
@@ -433,7 +401,7 @@ public class DerbyMessageStore implement
}
- private void createMetaDataTable(final Connection conn) throws SQLException
+ private void createMetaDataTable(final Connection conn) throws SQLException
{
if(!tableExists(META_DATA_TABLE_NAME, conn))
{
@@ -558,26 +526,25 @@ public class DerbyMessageStore implement
private void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
{
-
try
{
+ List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this);
- recoverQueues(qrh);
+ _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
- List<String> exchanges = loadExchanges(erh);
+ _configuredObjectHelper.recoverExchanges(erh, configuredObjects);
+
ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery();
- recoverBindings(brh, exchanges);
+ _configuredObjectHelper.recoverBindings(brh, configuredObjects);
+
ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
recoverBrokerLinks(lrh);
}
catch (SQLException e)
{
-
throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
}
-
-
}
private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
@@ -718,176 +685,6 @@ public class DerbyMessageStore implement
}
- private void recoverQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
- try
- {
-
- while(rs.next())
- {
- String queueName = rs.getString(1);
- _logger.debug("Got queue " + queueName);
- String owner = rs.getString(2);
- boolean exclusive = rs.getBoolean(3);
- Blob argumentsAsBlob = rs.getBlob(4);
-
- byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
- FieldTable arguments;
- if(dataAsBytes.length > 0)
- {
-
- try
- {
- arguments = new FieldTable(new DataInputStream(new ByteArrayInputStream(dataAsBytes)),dataAsBytes.length);
- }
- catch (IOException e)
- {
- throw new RuntimeException("IO Exception should not be thrown",e);
- }
- }
- else
- {
- arguments = null;
- }
-
- qrh.queue(queueName, owner, exclusive, arguments);
-
- }
-
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
-
-
- private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws SQLException
- {
-
- List<String> exchanges = new ArrayList<String>();
- Connection conn = null;
- try
- {
- conn = newAutoCommitConnection();
-
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
- try
- {
- while(rs.next())
- {
- String exchangeName = rs.getString(1);
- String type = rs.getString(2);
- boolean autoDelete = rs.getShort(3) != 0;
-
- exchanges.add(exchangeName);
-
- erh.exchange(exchangeName, type, autoDelete);
-
- }
- return exchanges;
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- if(conn != null)
- {
- conn.close();
- }
- }
-
- }
-
- private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws SQLException
- {
- _logger.info("Recovering bindings...");
-
- Connection conn = null;
- try
- {
- conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
-
- try
- {
- ResultSet rs = stmt.executeQuery();
-
- try
- {
-
- while(rs.next())
- {
- String exchangeName = rs.getString(1);
- String queueName = rs.getString(2);
- String bindingKey = rs.getString(3);
- Blob arguments = rs.getBlob(4);
- java.nio.ByteBuffer buf;
-
- if(arguments != null && arguments.length() != 0)
- {
- byte[] argumentBytes = arguments.getBytes(1, (int) arguments.length());
- buf = java.nio.ByteBuffer.wrap(argumentBytes);
- }
- else
- {
- buf = null;
- }
-
- brh.binding(exchangeName, queueName, bindingKey, buf);
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- }
- finally
- {
- if(conn != null)
- {
- conn.close();
- }
- }
- }
-
-
-
@Override
public void close() throws Exception
{
@@ -999,60 +796,8 @@ public class DerbyMessageStore implement
{
if (_stateManager.isInState(State.ACTIVE))
{
- try
- {
- Connection conn = newAutoCommitConnection();
-
- try
- {
-
-
- PreparedStatement stmt = conn.prepareStatement(FIND_EXCHANGE);
- try
- {
- stmt.setString(1, exchange.getNameShortString().toString());
- ResultSet rs = stmt.executeQuery();
- try
- {
-
- // If we don't have any data in the result set then we can add this exchange
- if (!rs.next())
- {
-
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_EXCHANGE);
- try
- {
- insertStmt.setString(1, exchange.getName().toString());
- insertStmt.setString(2, exchange.getTypeShortString().asString());
- insertStmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0);
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error writing Exchange with name " + exchange.getNameShortString() + " to database: " + e.getMessage(), e);
- }
+ ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange);
+ insertConfiguredObject(configuredObject);
}
}
@@ -1060,293 +805,72 @@ public class DerbyMessageStore implement
@Override
public void removeExchange(Exchange exchange) throws AMQStoreException
{
-
- try
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE);
- try
- {
- stmt.setString(1, exchange.getNameShortString().toString());
- int results = stmt.executeUpdate();
- stmt.close();
- if(results == 0)
- {
- throw new AMQStoreException("Exchange " + exchange.getNameShortString() + " not found");
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error deleting Exchange with name " + exchange.getNameShortString() + " from database: " + e.getMessage(), e);
- }
- }
-
- @Override
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
- throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
+ int results = removeConfiguredObject(exchange.getId());
+ if (results == 0)
{
- try
- {
- Connection conn = newAutoCommitConnection();
-
- try
- {
-
- PreparedStatement stmt = conn.prepareStatement(FIND_BINDING);
- try
- {
- stmt.setString(1, exchange.getNameShortString().toString() );
- stmt.setString(2, queue.getNameShortString().toString());
- stmt.setString(3, routingKey == null ? null : routingKey.toString());
-
- ResultSet rs = stmt.executeQuery();
- try
- {
- // If this binding is not already in the store then create it.
- if (!rs.next())
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
- try
- {
- insertStmt.setString(1, exchange.getNameShortString().toString() );
- insertStmt.setString(2, queue.getNameShortString().toString());
- insertStmt.setString(3, routingKey == null ? null : routingKey.toString());
- if(args != null)
- {
- // TODO - In Java 6 we could use create/set Blob
- byte[] bytes = args.getDataAsBytes();
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
- insertStmt.setBinaryStream(4, bis, bytes.length);
- }
- else
- {
- insertStmt.setNull(4, Types.BLOB);
- }
-
- insertStmt.executeUpdate();
- }
- finally
- {
- insertStmt.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
- + exchange.getNameShortString() + " to database: " + e.getMessage(), e);
- }
-
+ throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + exchange.getId() + " not found");
}
-
-
}
@Override
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
+ public void bindQueue(Binding binding)
throws AMQStoreException
{
- Connection conn = null;
- PreparedStatement stmt = null;
-
- try
- {
- conn = newAutoCommitConnection();
- // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
- stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
- stmt.setString(1, exchange.getNameShortString().toString() );
- stmt.setString(2, queue.getNameShortString().toString());
- stmt.setString(3, routingKey == null ? null : routingKey.toString());
-
- int result = stmt.executeUpdate();
-
- if(result != 1)
- {
- throw new AMQStoreException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange "
- + exchange.getNameShortString() + " not found");
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error removing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
- + exchange.getNameShortString() + " in database: " + e.getMessage(), e);
- }
- finally
- {
- closePreparedStatement(stmt);
- closeConnection(conn);
- }
- }
-
- @Override
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- createQueue(queue, null);
- }
-
- @Override
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
- {
- _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
-
if (_stateManager.isInState(State.ACTIVE))
{
- try
- {
- Connection conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
- try
- {
- stmt.setString(1, queue.getNameShortString().toString());
- ResultSet rs = stmt.executeQuery();
- try
- {
-
- // If we don't have any data in the result set then we can add this queue
- if (!rs.next())
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_QUEUE);
-
- try
- {
- String owner = queue.getOwner() == null ? null : queue.getOwner().toString();
-
- insertStmt.setString(1, queue.getNameShortString().toString());
- insertStmt.setString(2, owner);
- insertStmt.setBoolean(3,queue.isExclusive());
-
- final byte[] underlying;
- if(arguments != null)
- {
- underlying = arguments.getDataAsBytes();
- }
- else
- {
- underlying = new byte[0];
- }
-
- ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
- insertStmt.setBinaryStream(4,bis,underlying.length);
-
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- conn.close();
-
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error writing AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e);
- }
+ ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding);
+ insertConfiguredObject(configuredObject);
}
}
- /**
- * Updates the specified queue in the persistent store, IF it is already present. If the queue
- * is not present in the store, it will not be added.
- *
- * NOTE: Currently only updates the exclusivity.
- *
- * @param queue The queue to update the entry for.
- * @throws AMQStoreException If the operation fails for any reason.
- */
@Override
- public void updateQueue(final AMQQueue queue) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- try
- {
- Connection conn = newAutoCommitConnection();
-
- try
- {
- PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
- try
- {
- stmt.setString(1, queue.getNameShortString().toString());
-
- ResultSet rs = stmt.executeQuery();
- try
- {
- if (rs.next())
- {
- PreparedStatement stmt2 = conn.prepareStatement(UPDATE_QUEUE_EXCLUSIVITY);
- try
- {
- stmt2.setBoolean(1,queue.isExclusive());
- stmt2.setString(2, queue.getNameShortString().toString());
-
- stmt2.execute();
- }
- finally
- {
- stmt2.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
+ public void unbindQueue(Binding binding)
+ throws AMQStoreException
+ {
+ int results = removeConfiguredObject(binding.getId());
+ if (results == 0)
+ {
+ throw new AMQStoreException("Binding " + binding + " not found");
+ }
+ }
+
+ @Override
+ public void createQueue(AMQQueue queue) throws AMQStoreException
+ {
+ createQueue(queue, null);
+ }
+
+ @Override
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
+ {
+ _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
+
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ ConfiguredObjectRecord queueConfiguredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments);
+ insertConfiguredObject(queueConfiguredObject);
+ }
+ }
+
+ /**
+ * Updates the specified queue in the persistent store, IF it is already present. If the queue
+ * is not present in the store, it will not be added.
+ *
+ * NOTE: Currently only updates the exclusivity.
+ *
+ * @param queue The queue to update the entry for.
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ @Override
+ public void updateQueue(final AMQQueue queue) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(queue.getId());
+ if (queueConfiguredObject != null)
{
- throw new AMQStoreException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e);
+ ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueConfiguredObject);
+ updateConfiguredObject(newQueueRecord);
}
}
@@ -1410,31 +934,11 @@ public class DerbyMessageStore implement
{
AMQShortString name = queue.getNameShortString();
_logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
- Connection conn = null;
- PreparedStatement stmt = null;
- try
- {
- conn = newAutoCommitConnection();
- stmt = conn.prepareStatement(DELETE_FROM_QUEUE);
- stmt.setString(1, name.toString());
- int results = stmt.executeUpdate();
-
- if (results == 0)
- {
- throw new AMQStoreException("Queue " + name + " not found");
- }
- }
- catch (SQLException e)
+ int results = removeConfiguredObject(queue.getId());
+ if (results == 0)
{
- throw new AMQStoreException("Error deleting AMQQueue with name " + name + " from database: " + e.getMessage(), e);
+ throw new AMQStoreException("Queue " + name + " with id " + queue.getId() + " not found");
}
- finally
- {
- closePreparedStatement(stmt);
- closeConnection(conn);
- }
-
-
}
@Override
@@ -1676,8 +1180,6 @@ public class DerbyMessageStore implement
public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
{
- String name = queue.getResourceName();
-
Connection conn = connWrapper.getConnection();
@@ -1685,13 +1187,13 @@ public class DerbyMessageStore implement
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
+ _logger.debug("Enqueuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + queue.getId()+ "[Connection" + conn + "]");
}
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
try
{
- stmt.setString(1,name);
+ stmt.setString(1, queue.getId().toString());
stmt.setLong(2,messageId);
stmt.executeUpdate();
}
@@ -1703,7 +1205,7 @@ public class DerbyMessageStore implement
catch (SQLException e)
{
_logger.error("Failed to enqueue: " + e.getMessage(), e);
- throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name
+ throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId()
+ " to database", e);
}
@@ -1711,8 +1213,6 @@ public class DerbyMessageStore implement
public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
{
- String name = queue.getResourceName();
-
Connection conn = connWrapper.getConnection();
@@ -1722,7 +1222,7 @@ public class DerbyMessageStore implement
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
try
{
- stmt.setString(1,name);
+ stmt.setString(1, queue.getId().toString());
stmt.setLong(2,messageId);
int results = stmt.executeUpdate();
@@ -1730,12 +1230,14 @@ public class DerbyMessageStore implement
if(results != 1)
{
- throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name);
+ throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ + " with id " + queue.getId());
}
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeuing message " + messageId + " on queue " + name );
+ _logger.debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ + " with id " + queue.getId());
}
}
finally
@@ -1746,8 +1248,8 @@ public class DerbyMessageStore implement
catch (SQLException e)
{
_logger.error("Failed to dequeue: " + e.getMessage(), e);
- throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + name
- + " from database", e);
+ throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ + " with id " + queue.getId() + " from database", e);
}
}
@@ -1840,7 +1342,7 @@ public class DerbyMessageStore implement
stmt.setString(4, "E");
for(Transaction.Record record : enqueues)
{
- stmt.setString(5, record.getQueue().getResourceName());
+ stmt.setString(5, record.getQueue().getId().toString());
stmt.setLong(6, record.getMessage().getMessageNumber());
stmt.executeUpdate();
}
@@ -1851,7 +1353,7 @@ public class DerbyMessageStore implement
stmt.setString(4, "D");
for(Transaction.Record record : dequeues)
{
- stmt.setString(5, record.getQueue().getResourceName());
+ stmt.setString(5, record.getQueue().getId().toString());
stmt.setLong(6, record.getMessage().getMessageNumber());
stmt.executeUpdate();
}
@@ -2081,9 +1583,9 @@ public class DerbyMessageStore implement
while(rs.next())
{
- String queueName = rs.getString(1);
+ String id = rs.getString(1);
long messageId = rs.getLong(2);
- queueEntryHandler.queueEntry(queueName,messageId);
+ queueEntryHandler.queueEntry(UUID.fromString(id), messageId);
}
}
finally
@@ -2137,13 +1639,13 @@ public class DerbyMessageStore implement
private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueableMessage
{
- private final String _queueName;
private long _messageNumber;
+ private UUID _queueId;
- public RecordImpl(String queueName, long messageNumber)
+ public RecordImpl(UUID queueId, long messageNumber)
{
- _queueName = queueName;
_messageNumber = messageNumber;
+ _queueId = queueId;
}
@Override
@@ -2177,9 +1679,9 @@ public class DerbyMessageStore implement
}
@Override
- public String getResourceName()
+ public UUID getId()
{
- return _queueName;
+ return _queueId;
}
}
@@ -2237,10 +1739,10 @@ public class DerbyMessageStore implement
{
String actionType = rs.getString(1);
- String queueName = rs.getString(2);
+ UUID queueId = UUID.fromString(rs.getString(2));
long messageId = rs.getLong(3);
- RecordImpl record = new RecordImpl(queueName, messageId);
+ RecordImpl record = new RecordImpl(queueId, messageId);
List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
records.add(record);
}
@@ -2319,11 +1821,11 @@ public class DerbyMessageStore implement
}
- private void addContent(Connection conn, long messageId, int offset, ByteBuffer src)
+ private void addContent(Connection conn, long messageId, ByteBuffer src)
{
if(_logger.isDebugEnabled())
{
- _logger.debug("Adding content chunk offset " + offset + " for message " +messageId);
+ _logger.debug("Adding content for message " +messageId);
}
PreparedStatement stmt = null;
@@ -2336,20 +1838,15 @@ public class DerbyMessageStore implement
stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
- stmt.setInt(2, offset);
- stmt.setInt(3, offset+chunkData.length);
-
-
- // TODO in Java 6 we could just use blobs
ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
- stmt.setBinaryStream(4, bis, chunkData.length);
+ stmt.setBinaryStream(2, bis, chunkData.length);
stmt.executeUpdate();
}
catch (SQLException e)
{
closeConnection(conn);
- throw new RuntimeException("Error adding content chunk offset " + offset + " for message " + messageId + ": " + e.getMessage(), e);
+ throw new RuntimeException("Error adding content for message " + messageId + ": " + e.getMessage(), e);
}
finally
{
@@ -2370,33 +1867,32 @@ public class DerbyMessageStore implement
stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
- stmt.setInt(2, offset);
- stmt.setInt(3, offset+dst.remaining());
ResultSet rs = stmt.executeQuery();
int written = 0;
- while(rs.next())
+ if (rs.next())
{
- int offsetInMessage = rs.getInt(1);
- Blob dataAsBlob = rs.getBlob(2);
+
+ Blob dataAsBlob = rs.getBlob(1);
final int size = (int) dataAsBlob.length();
byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
- int posInArray = offset + written - offsetInMessage;
- int count = size - posInArray;
- if(count > dst.remaining())
+ if (offset > size)
{
- count = dst.remaining();
+ throw new RuntimeException("Offset " + offset + " is greater than message size " + size
+ + " for message id " + messageId + "!");
+
}
- dst.put(dataAsBytes,posInArray,count);
- written+=count;
- if(dst.remaining() == 0)
+ written = size - offset;
+ if(written > dst.remaining())
{
- break;
+ written = dst.remaining();
}
+
+ dst.put(dataAsBytes, offset, written);
}
return written;
@@ -2635,7 +2131,7 @@ public class DerbyMessageStore implement
try
{
storeMetaData(conn, _messageId, _metaData);
- DerbyMessageStore.this.addContent(conn, _messageId, 0,
+ DerbyMessageStore.this.addContent(conn, _messageId,
_data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
}
finally
@@ -2699,4 +2195,255 @@ public class DerbyMessageStore implement
{
return _storeLocation;
}
+
+ private void insertConfiguredObject(ConfiguredObjectRecord configuredObject) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ // If we don't have any data in the result set then we can add this configured object
+ if (!rs.next())
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+ try
+ {
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
+ {
+ insertStmt.setNull(3, Types.BLOB);
+ }
+ else
+ {
+ byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET);
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+ }
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private int removeConfiguredObject(UUID id) throws AMQStoreException
+ {
+ int results = 0;
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ stmt.setString(1, id.toString());
+ results = stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e);
+ }
+ return results;
+ }
+
+ private void updateConfiguredObject(final ConfiguredObjectRecord configuredObject) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ if (rs.next())
+ {
+ PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
+ try
+ {
+ stmt2.setString(1, configuredObject.getType());
+ if (configuredObject.getAttributes() != null)
+ {
+ byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET);
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
+ }
+ else
+ {
+ stmt2.setNull(2, Types.BLOB);
+ }
+ stmt2.setString(3, configuredObject.getId().toString());
+ stmt2.execute();
+ }
+ finally
+ {
+ stmt2.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private ConfiguredObjectRecord loadConfiguredObject(final UUID id) throws AMQStoreException
+ {
+ ConfiguredObjectRecord result = null;
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, id.toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ if (rs.next())
+ {
+ String type = rs.getString(1);
+ Blob blob = rs.getBlob(2);
+ String attributes = null;
+ if (blob != null)
+ {
+ attributes = blobToString(blob);
+ }
+ result = new ConfiguredObjectRecord(id, type, attributes);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+ + e.getMessage(), e);
+ }
+ return result;
+ }
+
+ private String blobToString(Blob blob) throws SQLException
+ {
+ byte[] bytes = blob.getBytes(1, (int)blob.length());
+ return new String(bytes, UTF8_CHARSET);
+ }
+
+ private List<ConfiguredObjectRecord> loadConfiguredObjects() throws SQLException
+ {
+ ArrayList<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>();
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ String id = rs.getString(1);
+ String objectType = rs.getString(2);
+ String attributes = blobToString(rs.getBlob(3));
+ results.add(new ConfiguredObjectRecord(UUID.fromString(id), objectType, attributes));
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ return results;
+ }
}
\ No newline at end of file
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1327128&r1=1327127&r2=1327128&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Tue Apr 17 15:04:34 2012
@@ -40,6 +40,7 @@ import org.apache.qpid.server.flow.Windo
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.MessageMetaData_0_10;
import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.BaseQueue;
@@ -683,13 +684,12 @@ public class ServerSessionDelegate exten
{
String exchangeName = method.getExchange();
VirtualHost virtualHost = getVirtualHost(session);
- Exchange exchange = getExchange(session, exchangeName);
+ ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
//we must check for any unsupported arguments present and throw not-implemented
if(method.hasArguments())
{
Map<String,Object> args = method.getArguments();
-
//QPID-3392: currently we don't support any!
if(!args.isEmpty())
{
@@ -697,120 +697,113 @@ public class ServerSessionDelegate exten
return;
}
}
-
- if(method.getPassive())
+ synchronized(exchangeRegistry)
{
- if(exchange == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '"+exchangeName+"'");
-
- }
- else
- {
- if(!exchange.getTypeShortString().toString().equals(method.getType()) && (method.getType() != null && method.getType().length() > 0))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() +".");
- }
- }
+ Exchange exchange = getExchange(session, exchangeName);
- }
- else
- {
- if (exchange == null)
+ if(method.getPassive())
{
- if(exchangeName.startsWith("amq."))
+ if(exchange == null)
{
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to declare exchange: " + exchangeName +
- " which begins with reserved prefix 'amq.'.");
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'");
}
- else if(exchangeName.startsWith("qpid."))
+ else
{
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to declare exchange: " + exchangeName +
- " which begins with reserved prefix 'qpid.'.");
+ if (!exchange.getTypeShortString().toString().equals(method.getType())
+ && (method.getType() != null && method.getType().length() > 0))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() + ".");
+ }
}
- else
+ }
+ else
+ {
+ if (exchange == null)
{
- ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
- ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
-
-
-
- try
+ if (exchangeName.startsWith("amq."))
{
-
- exchange = exchangeFactory.createExchange(method.getExchange(),
- method.getType(),
- method.getDurable(),
- method.getAutoDelete());
-
- String alternateExchangeName = method.getAlternateExchange();
- boolean validAlternate;
- if(alternateExchangeName != null && alternateExchangeName.length() != 0)
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
+ + exchangeName + " which begins with reserved prefix 'amq.'.");
+ }
+ else if (exchangeName.startsWith("qpid."))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
+ + exchangeName + " which begins with reserved prefix 'qpid.'.");
+ }
+ else
+ {
+ ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+ try
{
- Exchange alternate = getExchange(session, alternateExchangeName);
- if(alternate == null)
+ exchange = exchangeFactory.createExchange(method.getExchange(),
+ method.getType(),
+ method.getDurable(),
+ method.getAutoDelete());
+ String alternateExchangeName = method.getAlternateExchange();
+ boolean validAlternate;
+ if(alternateExchangeName != null && alternateExchangeName.length() != 0)
{
- validAlternate = false;
+ Exchange alternate = getExchange(session, alternateExchangeName);
+ if(alternate == null)
+ {
+ validAlternate = false;
+ }
+ else
+ {
+ exchange.setAlternateExchange(alternate);
+ validAlternate = true;
+ }
}
else
{
- exchange.setAlternateExchange(alternate);
validAlternate = true;
}
- }
- else
- {
- validAlternate = true;
- }
-
- if(validAlternate)
- {
- if (exchange.isDurable())
+ if(validAlternate)
{
- DurableConfigurationStore store = virtualHost.getMessageStore();
- store.createExchange(exchange);
+ if (exchange.isDurable())
+ {
+ DurableConfigurationStore store = virtualHost.getMessageStore();
+ store.createExchange(exchange);
+ }
+ exchangeRegistry.registerExchange(exchange);
}
-
- exchangeRegistry.registerExchange(exchange);
+ else
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND,
+ "Unknown alternate exchange " + alternateExchangeName);
+ }
+ }
+ catch(AMQUnknownExchangeType e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
}
- else
+ catch (AMQException e)
{
- exception(session, method, ExecutionErrorCode.NOT_FOUND,
- "Unknown alternate exchange " + alternateExchangeName);
+ exception(session, method, e, "Cannot declare exchange '" + exchangeName);
}
}
- catch(AMQUnknownExchangeType e)
+ }
+ else
+ {
+ if(!exchange.getTypeShortString().toString().equals(method.getType()))
{
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to redeclare exchange: " + exchangeName
+ + " of type " + exchange.getTypeShortString()
+ + " to " + method.getType() +".");
}
- catch (AMQException e)
+ else if(method.hasAlternateExchange()
+ && (exchange.getAlternateExchange() == null ||
+ !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
{
- exception(session, method, e, "Cannot declare exchange '" + exchangeName);
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to change alternate exchange of: " + exchangeName
+ + " from " + exchange.getAlternateExchange()
+ + " to " + method.getAlternateExchange() +".");
}
}
}
- else
- {
- if(!exchange.getTypeShortString().toString().equals(method.getType()))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to redeclare exchange: " + exchangeName
- + " of type " + exchange.getTypeShortString()
- + " to " + method.getType() +".");
- }
- else if(method.hasAlternateExchange()
- && (exchange.getAlternateExchange() == null ||
- !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to change alternate exchange of: " + exchangeName
- + " from " + exchange.getAlternateExchange()
- + " to " + method.getAlternateExchange() +".");
- }
- }
-
}
}
@@ -1396,8 +1389,8 @@ public class ServerSessionDelegate exten
{
String owner = body.getExclusive() ? session.getClientID() : null;
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(),
- body.getExclusive(), virtualHost, body.getArguments());
+ final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateUUID(), queueName, body.getDurable(), owner,
+ body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments());
return queue;
}
Propchange: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:r1327001-1327003
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1327128&r1=1327127&r2=1327128&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Tue Apr 17 15:04:34 2012
@@ -100,7 +100,7 @@ public class VirtualHostConfigRecoveryHa
return this;
}
- public void queue(String queueName, String owner, boolean exclusive, FieldTable arguments)
+ public void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments)
{
try
{
@@ -108,7 +108,7 @@ public class VirtualHostConfigRecoveryHa
if (q == null)
{
- q = AMQQueueFactory.createAMQQueueImpl(queueName, true, owner, false, exclusive, _virtualHost,
+ q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost,
FieldTable.convertToMap(arguments));
_virtualHost.getQueueRegistry().registerQueue(q);
}
@@ -130,7 +130,7 @@ public class VirtualHostConfigRecoveryHa
return this;
}
- public void exchange(String exchangeName, String type, boolean autoDelete)
+ public void exchange(UUID id, String exchangeName, String type, boolean autoDelete)
{
try
{
@@ -139,7 +139,7 @@ public class VirtualHostConfigRecoveryHa
exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS);
if (exchange == null)
{
- exchange = _virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
+ exchange = _virtualHost.getExchangeFactory().createExchange(id, exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
_virtualHost.getExchangeRegistry().registerExchange(exchange);
}
}
@@ -212,7 +212,7 @@ public class VirtualHostConfigRecoveryHa
}
for(Transaction.Record record : enqueues)
{
- final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName());
+ final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId());
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
@@ -265,13 +265,13 @@ public class VirtualHostConfigRecoveryHa
StringBuilder xidString = xidAsString(id);
CurrentActor.get().message(_logSubject,
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getQueue().getResourceName()));
+ record.getQueue().getId().toString()));
}
}
for(Transaction.Record record : dequeues)
{
- final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName());
+ final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId());
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
@@ -315,7 +315,7 @@ public class VirtualHostConfigRecoveryHa
StringBuilder xidString = xidAsString(id);
CurrentActor.get().message(_logSubject,
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getQueue().getResourceName()));
+ record.getQueue().getId().toString()));
}
}
@@ -354,21 +354,22 @@ public class VirtualHostConfigRecoveryHa
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
}
- public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf)
+ @Override
+ public void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingKey, ByteBuffer buf)
{
try
{
- Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName);
+ Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeId);
if (exchange == null)
{
- _logger.error("Unknown exchange: " + exchangeName + ", cannot bind queue : " + queueName);
+ _logger.error("Unknown exchange id " + exchangeId + ", cannot bind queue with id " + queueId);
return;
}
-
- AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(new AMQShortString(queueName));
+
+ AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId);
if (queue == null)
{
- _logger.error("Unknown queue: " + queueName + ", cannot be bound to exchange: " + exchangeName);
+ _logger.error("Unknown queue id " + queueId + ", cannot be bound to exchange: " + exchange.getName());
}
else
{
@@ -392,10 +393,10 @@ public class VirtualHostConfigRecoveryHa
if(bf.getBinding(bindingKey, queue, exchange, argumentMap) == null)
{
- _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queueName
+ _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queue.getName()
+ ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")");
- bf.restoreBinding(bindingKey, queue, exchange, argumentMap);
+ bf.restoreBinding(bindingId, bindingKey, queue, exchange, argumentMap);
}
}
}
@@ -417,16 +418,14 @@ public class VirtualHostConfigRecoveryHa
}
- public void queueEntry(final String queueName, long messageId)
+ public void queueEntry(final UUID queueId, long messageId)
{
- AMQShortString queueNameShortString = new AMQShortString(queueName);
-
- AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
-
+ AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId);
try
{
if(queue != null)
{
+ String queueName = queue.getName();
ServerMessage message = _recoveredMessages.get(messageId);
_unusedMessages.remove(messageId);
@@ -436,7 +435,7 @@ public class VirtualHostConfigRecoveryHa
if (_logger.isDebugEnabled())
{
- _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getNameShortString());
+ _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName);
}
Integer count = _queueRecoveries.get(queueName);
@@ -451,7 +450,7 @@ public class VirtualHostConfigRecoveryHa
}
else
{
- _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded");
+ _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded");
Transaction txn = _store.newTransaction();
txn.dequeueMessage(queue, new DummyMessage(messageId));
txn.commitTranAsync();
@@ -459,15 +458,15 @@ public class VirtualHostConfigRecoveryHa
}
else
{
- _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded");
+ _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded");
Transaction txn = _store.newTransaction();
TransactionLogResource mockQueue =
new TransactionLogResource()
{
-
- public String getResourceName()
+ @Override
+ public UUID getId()
{
- return queueName;
+ return queueId;
}
};
txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
@@ -479,9 +478,6 @@ public class VirtualHostConfigRecoveryHa
{
throw new RuntimeException(e);
}
-
-
-
}
public DtxRecordRecoveryHandler completeQueueEntryRecovery()
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1327128&r1=1327127&r2=1327128&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Tue Apr 17 15:04:34 2012
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
@@ -45,6 +46,7 @@ import org.apache.qpid.server.message.AM
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -274,7 +276,7 @@ public class AbstractHeadersExchangeTest
public TestQueue(AMQShortString name) throws AMQException
{
- super(name, false, new AMQShortString("test"), true, false,ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), Collections.EMPTY_MAP);
+ super(UUIDGenerator.generateUUID(), name, false, new AMQShortString("test"), true, false,ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), Collections.EMPTY_MAP);
ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test").getQueueRegistry().registerQueue(this);
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java?rev=1327128&r1=1327127&r2=1327128&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java Tue Apr 17 15:04:34 2012
@@ -26,6 +26,7 @@ import org.apache.qpid.exchange.Exchange
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.management.common.mbeans.ManagedExchange;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -52,7 +53,7 @@ public class ExchangeMBeanTest extends
public void testGeneralProperties() throws Exception
{
DirectExchange exchange = new DirectExchange();
- exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
+ exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
@@ -66,7 +67,7 @@ public class ExchangeMBeanTest extends
public void testDirectExchangeMBean() throws Exception
{
DirectExchange exchange = new DirectExchange();
- exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
+ exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
@@ -81,7 +82,7 @@ public class ExchangeMBeanTest extends
public void testTopicExchangeMBean() throws Exception
{
TopicExchange exchange = new TopicExchange();
- exchange.initialise(_virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true);
+ exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
@@ -96,7 +97,7 @@ public class ExchangeMBeanTest extends
public void testHeadersExchangeMBean() throws Exception
{
HeadersExchange exchange = new HeadersExchange();
- exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
+ exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
@@ -118,7 +119,7 @@ public class ExchangeMBeanTest extends
public void testHeadersExchangeMBeanMatchPropertyNoValue() throws Exception
{
HeadersExchange exchange = new HeadersExchange();
- exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
+ exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
@@ -136,7 +137,7 @@ public class ExchangeMBeanTest extends
public void testInvalidHeaderBindingMalformed() throws Exception
{
HeadersExchange exchange = new HeadersExchange();
- exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
+ exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java?rev=1327128&r1=1327127&r2=1327128&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java Tue Apr 17 15:04:34 2012
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.queue;
+import java.util.UUID;
+
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.AMQException;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org