You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/25 23:59:05 UTC
svn commit: r829675 [7/11] - in /qpid/trunk/qpid/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/ broker/bin/ broker/src/main/java/org/apac...
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java Sun Oct 25 22:58:57 2009
@@ -31,10 +31,11 @@
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.ACLPluginFactory;
import org.apache.qpid.server.security.access.plugins.AbstractACLPlugin;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.util.NetMatcher;
@@ -57,7 +58,7 @@
return plugin;
}
};
-
+
public class FirewallRule
{
@@ -69,13 +70,13 @@
public FirewallRule(String access, List networks, List hostnames)
{
_access = (access.equals("allow")) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
-
+
if (networks != null && networks.size() > 0)
{
String[] networkStrings = objListToStringArray(networks);
_network = new NetMatcher(networkStrings);
}
-
+
if (hostnames != null && hostnames.size() > 0)
{
int i = 0;
@@ -85,7 +86,7 @@
_hostnamePatterns[i++] = Pattern.compile(hostname);
}
}
-
+
}
private String[] objListToStringArray(List objList)
@@ -147,7 +148,7 @@
thread.run();
long endTime = System.currentTimeMillis() + DNS_TIMEOUT;
-
+
while (System.currentTimeMillis() < endTime && !done.get())
{
try
@@ -176,8 +177,15 @@
private FirewallRule[] _rules;
@Override
- public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost)
+ public AuthzResult authoriseConnect(PrincipalHolder principalHolder, VirtualHost virtualHost)
{
+ if(!(principalHolder instanceof ProtocolEngine))
+ {
+ return AuthzResult.ABSTAIN; // We only deal with tcp sessions
+ }
+
+ ProtocolEngine session = (ProtocolEngine) principalHolder;
+
SocketAddress sockAddr = session.getRemoteAddress();
if (!(sockAddr instanceof InetSocketAddress))
{
@@ -228,7 +236,7 @@
_default = AuthzResult.DENIED;
}
CompositeConfiguration finalConfig = new CompositeConfiguration(config);
-
+
List subFiles = config.getList("xml[@fileName]");
for (Object subFile : subFiles)
{
@@ -236,7 +244,7 @@
}
// all rules must have an access attribute
- int numRules = finalConfig.getList("rule[@access]").size();
+ int numRules = finalConfig.getList("rule[@access]").size();
_rules = new FirewallRule[numRules];
for (int i = 0; i < numRules; i++)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java Sun Oct 25 22:58:57 2009
@@ -215,14 +215,7 @@
_logger.warn("Unable to load access file:" + jmxaccesssFile);
}
- try
- {
- _mbean.register();
- }
- catch (AMQException e)
- {
- _logger.warn("Unable to register user management MBean");
- }
+ _mbean.register();
}
catch (JMException e)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java Sun Oct 25 22:58:57 2009
@@ -31,12 +31,12 @@
{
protected LogSubject _logSubject;
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration hostConfig) throws Exception
+ public void configure(VirtualHost virtualHost) throws Exception
{
_logSubject = new MessageStoreLogSubject(virtualHost, this);
CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1001(this.getClass().getName()));
}
-
+
public void close() throws Exception
{
CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1003());
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Sun Oct 25 22:58:57 2009
@@ -21,28 +21,18 @@
package org.apache.qpid.server.store;
import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.commons.configuration.Configuration;
+
import java.io.ByteArrayInputStream;
import java.io.File;
@@ -56,15 +46,14 @@
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
-public class DerbyMessageStore extends AbstractMessageStore
+public class DerbyMessageStore implements MessageStore
{
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
@@ -80,57 +69,66 @@
private static final String QUEUE_TABLE_NAME = "QPID_QUEUE";
private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS";
private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY";
- private static final String MESSAGE_META_DATA_TABLE_NAME = "QPID_MESSAGE_META_DATA";
+
+ private static final String META_DATA_TABLE_NAME = "QPID_META_DATA";
private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
private static final int DB_VERSION = 1;
- private VirtualHost _virtualHost;
private static Class<Driver> DRIVER_CLASS;
- private final AtomicLong _messageId = new AtomicLong(1);
+ private final AtomicLong _messageId = new AtomicLong(0);
private AtomicBoolean _closed = new AtomicBoolean(false);
private String _connectionURL;
- Map<AMQShortString, Integer> _queueRecoveries = new TreeMap<AMQShortString, Integer>();
-
+ private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
+
private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )";
private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
- private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
- private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )";
- private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )";
private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME;
private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
private static final String SELECT_FROM_BINDINGS =
- "SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?";
+ "SELECT exchange_name, queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " ORDER BY exchange_name";
private static final String FIND_BINDING =
"SELECT * FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ? ";
- private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
- private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )";
private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?";
private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)";
private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
+
+ private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)";
private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?";
- private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)";
- private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)";
- private static final String SELECT_FROM_MESSAGE_META_DATA =
- "SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
+ private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_name, message_id";
+
+
+ private static final String CREATE_META_DATA_TABLE = "CREATE TABLE "+META_DATA_TABLE_NAME+" ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )";
+ private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, offset int not null, last_byte int not null, content blob , PRIMARY KEY (message_id, offset) )";
+
+ private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, offset, last_byte, content ) values (?, ?, ?, ?)";
private static final String SELECT_FROM_MESSAGE_CONTENT =
- "SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?";
- private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME;
- private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
+ "SELECT offset, content FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? AND last_byte > ? AND offset < ? ORDER BY message_id, offset";
+ private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
+
+ private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";;
+ private static final String SELECT_FROM_META_DATA =
+ "SELECT meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
+ private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
+ private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
+
+
+ private LogSubject _logSubject;
+ private boolean _configured;
private enum State
@@ -146,21 +144,82 @@
private State _state = State.INITIAL;
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
{
- super.configure(virtualHost,base,config);
-
stateTransition(State.INITIAL, State.CONFIGURING);
+ _logSubject = logSubject;
+ CurrentActor.get().message(_logSubject, ConfigStoreMessages.CFG_1001(this.getClass().getName()));
- initialiseDriver();
+ if(!_configured)
+ {
+ commonConfiguration(name, storeConfiguration, logSubject);
+ _configured = true;
+ }
+
+ // this recovers durable exchanges, queues, and bindings
+ recover(recoveryHandler);
- _virtualHost = virtualHost;
- _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName());
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ stateTransition(State.RECOVERING, State.STARTED);
+
+ }
+
+
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
+ {
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1001(this.getClass().getName()));
+
+ if(!_configured)
+ {
+
+ _logSubject = logSubject;
+
+ commonConfiguration(name, storeConfiguration, logSubject);
+ _configured = true;
+ }
+
+ recoverMessages(recoveryHandler);
+
+ }
+
+
+
+ public void configureTransactionLog(String name,
+ TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
+ {
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1001(this.getClass().getName()));
+
+ if(!_configured)
+ {
+
+ _logSubject = logSubject;
+
+ commonConfiguration(name, storeConfiguration, logSubject);
+ _configured = true;
+ }
+
+ recoverQueueEntries(recoveryHandler);
+
+ }
+
+
+
+ private void commonConfiguration(String name, Configuration storeConfiguration, LogSubject logSubject)
+ throws ClassNotFoundException, SQLException
+ {
+ initialiseDriver();
//Update to pick up QPID_WORK and use that as the default location not just derbyDB
- final String databasePath = config.getStoreConfiguration().getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")+"/derbyDB");
+
+ final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")+"/derbyDB");
File environmentPath = new File(databasePath);
if (!environmentPath.exists())
@@ -172,17 +231,9 @@
}
}
- CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1002(environmentPath.getAbsolutePath()));
-
- createOrOpenDatabase(databasePath);
-
- // this recovers durable queues and persistent messages
-
- recover();
-
-
- stateTransition(State.RECOVERING, State.STARTED);
+ CurrentActor.get().message(logSubject, MessageStoreMessages.MST_1002(environmentPath.getAbsolutePath()));
+ createOrOpenDatabase(name, databasePath);
}
private static synchronized void initialiseDriver() throws ClassNotFoundException
@@ -193,10 +244,10 @@
}
}
- private void createOrOpenDatabase(final String environmentPath) throws SQLException
+ private void createOrOpenDatabase(String name, final String environmentPath) throws SQLException
{
//fixme this the _vhost name should not be added here.
- _connectionURL = "jdbc:derby:" + environmentPath + "/" + _virtualHost.getName() + ";create=true";
+ _connectionURL = "jdbc:derby:" + environmentPath + "/" + name + ";create=true";
Connection conn = newConnection();
@@ -205,7 +256,7 @@
createQueueTable(conn);
createBindingsTable(conn);
createQueueEntryTable(conn);
- createMessageMetaDataTable(conn);
+ createMetaDataTable(conn);
createMessageContentTable(conn);
conn.close();
@@ -276,12 +327,12 @@
}
- private void createMessageMetaDataTable(final Connection conn) throws SQLException
+ private void createMetaDataTable(final Connection conn) throws SQLException
{
- if(!tableExists(MESSAGE_META_DATA_TABLE_NAME, conn))
+ if(!tableExists(META_DATA_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
- stmt.execute(CREATE_MESSAGE_META_DATA_TABLE);
+ stmt.execute(CREATE_META_DATA_TABLE);
stmt.close();
}
@@ -314,38 +365,22 @@
return exists;
}
- public void recover() throws AMQException
+ public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
{
stateTransition(State.CONFIGURING, State.RECOVERING);
- CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1004(null, false));
-
- StoreContext context = new StoreContext();
try
{
- Map<AMQShortString, AMQQueue> queues = loadQueues();
-
- recoverExchanges();
-
- try
- {
-
- beginTran(context);
+ ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this);
+ List<String> queues = loadQueues(qrh);
- deliverMessages(context, queues);
- commitTran(context);
+ ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
+ List<String> exchanges = loadExchanges(erh);
+ ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery();
+ recoverBindings(brh, exchanges);
+ brh.completeBindingRecovery();
- //Recovery Complete
- CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1006(null, false));
- }
- finally
- {
- if(inTran(context))
- {
- abortTran(context);
- }
- }
}
catch (SQLException e)
@@ -357,53 +392,34 @@
}
- private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException
+ private List<String> loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException, AMQException
{
Connection conn = newConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
- Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>();
+ List<String> queues = new ArrayList<String>();
+
while(rs.next())
{
String queueName = rs.getString(1);
String owner = rs.getString(2);
- AMQShortString queueNameShortString = new AMQShortString(queueName);
+ qrh.queue(queueName, owner, null);
- AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+ queues.add(queueName);
- if (q == null)
- {
- q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost,
- null);
- _virtualHost.getQueueRegistry().registerQueue(q);
- }
-
- queueMap.put(queueNameShortString,q);
-
- CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1004(String.valueOf(q.getName()), true));
- //Record that we have a queue for recovery
- _queueRecoveries.put(new AMQShortString(queueName), 0);
-
- }
- return queueMap;
- }
- private void recoverExchanges() throws AMQException, SQLException
- {
- for (Exchange exchange : loadExchanges())
- {
- recoverExchange(exchange);
}
+ return queues;
}
- private List<Exchange> loadExchanges() throws AMQException, SQLException
+ private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws AMQException, SQLException
{
- List<Exchange> exchanges = new ArrayList<Exchange>();
+ List<String> exchanges = new ArrayList<String>();
Connection conn = null;
try
{
@@ -413,21 +429,15 @@
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
- Exchange exchange;
while(rs.next())
{
String exchangeName = rs.getString(1);
String type = rs.getString(2);
boolean autoDelete = rs.getShort(3) != 0;
- AMQShortString exchangeNameSS = new AMQShortString(exchangeName);
- exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS);
- if (exchange == null)
- {
- exchange = _virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
- _virtualHost.getExchangeRegistry().registerExchange(exchange);
- }
- exchanges.add(exchange);
+ exchanges.add(exchangeName);
+
+ erh.exchange(exchangeName, type, autoDelete);
}
return exchanges;
@@ -443,11 +453,13 @@
}
- private void recoverExchange(Exchange exchange) throws AMQException, SQLException
+ private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws AMQException, SQLException
{
- _logger.info("Recovering durable exchange " + exchange.getName() + " of type " + exchange.getType() + "...");
- QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+
+ _logger.info("Recovering bindings...");
+
+
Connection conn = null;
try
@@ -455,41 +467,29 @@
conn = newConnection();
PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
- stmt.setString(1, exchange.getName().toString());
ResultSet rs = stmt.executeQuery();
while(rs.next())
{
- String queueName = rs.getString(1);
- String bindingKey = rs.getString(2);
- Blob arguments = rs.getBlob(3);
-
+ String exchangeName = rs.getString(1);
+ String queueName = rs.getString(2);
+ String bindingKey = rs.getString(3);
+ Blob arguments = rs.getBlob(4);
+ java.nio.ByteBuffer buf;
- AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName));
- if (queue == null)
+ if(arguments != null && arguments.length() != 0)
{
- _logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: "
- + exchange.getName());
+ byte[] argumentBytes = arguments.getBytes(1, (int) arguments.length());
+ buf = java.nio.ByteBuffer.wrap(argumentBytes);
}
else
{
- _logger.info("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + queueName
- + ", Routing Key: " + bindingKey + ", Arguments: " + arguments
- + ")");
-
- FieldTable argumentsFT = null;
- if(arguments != null)
- {
- byte[] argumentBytes = arguments.getBytes(0, (int) arguments.length());
- ByteBuffer buf = ByteBuffer.wrap(argumentBytes);
- argumentsFT = new FieldTable(buf,arguments.length());
- }
-
- queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT);
-
+ buf = null;
}
+
+ brh.binding(exchangeName, queueName, bindingKey, buf);
}
}
finally
@@ -501,44 +501,48 @@
}
}
+
+
public void close() throws Exception
{
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1003());
_closed.getAndSet(true);
-
- super.close();
}
- public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
+ public StoredMessage addMessage(StorableMessageMetaData metaData)
{
-
- boolean localTx = getOrCreateTransaction(storeContext);
-
- Connection conn = getConnection(storeContext);
- ConnectionWrapper wrapper = (ConnectionWrapper) storeContext.getPayload();
-
-
- if (_logger.isDebugEnabled())
+ if(metaData.isPersistent())
{
- _logger.debug("Message Id: " + messageId + " Removing");
+ return new StoredDerbyMessage(_messageId.incrementAndGet(), metaData);
}
+ else
+ {
+ return new StoredMemoryMessage(_messageId.incrementAndGet(), metaData);
+ }
+ }
- // first we need to look up the header to get the chunk count
- MessageMetaData mmd = getMessageMetaData(storeContext, messageId);
+ public StoredMessage getMessage(long messageNumber)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeMessage(long messageId)
+ {
+ Connection conn = null;
try
{
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
+
+
+ conn = newConnection();
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_META_DATA);
stmt.setLong(1,messageId);
- wrapper.setRequiresCommit();
int results = stmt.executeUpdate();
if (results == 0)
{
- if (localTx)
- {
- abortTran(storeContext);
- }
- throw new AMQException("Message metadata not found for message id " + messageId);
+
+ throw new RuntimeException("Message metadata not found for message id " + messageId);
}
stmt.close();
@@ -551,29 +555,27 @@
stmt.setLong(1,messageId);
results = stmt.executeUpdate();
- if(results != mmd.getContentChunkCount())
- {
- if (localTx)
- {
- abortTran(storeContext);
- }
- throw new AMQException("Unexpected number of content chunks when deleting message. Expected " + mmd.getContentChunkCount() + " but found " + results);
- }
- if (localTx)
- {
- commitTran(storeContext);
- }
+ conn.commit();
+ conn.close();
}
catch (SQLException e)
{
- if ((conn != null) && localTx)
+ if ((conn != null))
{
- abortTran(storeContext);
+ try
+ {
+ conn.rollback();
+ conn.close();
+ }
+ catch (SQLException e1)
+ {
+
+ }
}
- throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
+ throw new RuntimeException("Error removing Message with id " + messageId + " to database: " + e, e);
}
}
@@ -802,8 +804,14 @@
{
stmt = conn.prepareStatement(INSERT_INTO_QUEUE);
+ String owner = queue.getPrincipalHolder() == null
+ ? null
+ : queue.getPrincipalHolder().getPrincipal() == null
+ ? null
+ : queue.getPrincipalHolder().getPrincipal().getName();
+
stmt.setString(1, queue.getName().toString());
- stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString());
+ stmt.setString(2, owner);
stmt.execute();
@@ -873,29 +881,26 @@
}
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public Transaction newTransaction()
{
- AMQShortString name = queue.getName();
+ return new DerbyTransaction();
+ }
+
+ public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQException
+ {
+ String name = queue.getResourceName();
+
+ Connection conn = connWrapper.getConnection();
- boolean localTx = getOrCreateTransaction(context);
- Connection conn = getConnection(context);
- ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
try
{
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
- stmt.setString(1,name.toString());
+ stmt.setString(1,name);
stmt.setLong(2,messageId);
stmt.executeUpdate();
connWrapper.requiresCommit();
- if(localTx)
- {
- commitTran(context);
- }
-
-
-
if (_logger.isDebugEnabled())
{
_logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
@@ -903,10 +908,6 @@
}
catch (SQLException e)
{
- if(localTx)
- {
- abortTran(context);
- }
_logger.error("Failed to enqueue: " + e, e);
throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name
+ " to database", e);
@@ -914,18 +915,18 @@
}
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQException
{
- AMQShortString name = queue.getName();
+ String name = queue.getResourceName();
+
+
+ Connection conn = connWrapper.getConnection();
- boolean localTx = getOrCreateTransaction(context);
- Connection conn = getConnection(context);
- ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
try
{
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
- stmt.setString(1,name.toString());
+ stmt.setString(1,name);
stmt.setLong(2,messageId);
int results = stmt.executeUpdate();
@@ -936,13 +937,6 @@
throw new AMQException("Unable to find message with id " + messageId + " on queue " + name);
}
- if(localTx)
- {
- commitTran(context);
- }
-
-
-
if (_logger.isDebugEnabled())
{
_logger.debug("Dequeuing message " + messageId + " on queue " + name );//+ "[Connection" + conn + "]");
@@ -950,10 +944,6 @@
}
catch (SQLException e)
{
- if(localTx)
- {
- abortTran(context);
- }
_logger.error("Failed to dequeue: " + e, e);
throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name
+ " from database", e);
@@ -987,51 +977,20 @@
}
}
- public void beginTran(StoreContext context) throws AMQException
- {
- if (context.getPayload() != null)
- {
- throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: "
- + context.getPayload());
- }
- else
- {
- try
- {
- Connection conn = newConnection();
-
- context.setPayload(new ConnectionWrapper(conn));
- }
- catch (SQLException e)
- {
- throw new AMQException("Error starting transaction: " + e, e);
- }
- }
- }
-
- public void commitTran(StoreContext context) throws AMQException
+ public void commitTran(ConnectionWrapper connWrapper) throws AMQException
{
- ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
-
- if (connWrapper == null)
- {
- throw new AMQException("Fatal internal error: transactional context is empty at commitTran");
- }
try
{
Connection conn = connWrapper.getConnection();
- if(connWrapper.requiresCommit())
- {
- conn.commit();
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("commit tran completed");
- }
+ conn.commit();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("commit tran completed");
}
+
conn.close();
}
catch (SQLException e)
@@ -1040,14 +999,30 @@
}
finally
{
- context.setPayload(null);
+
}
}
- public void abortTran(StoreContext context) throws AMQException
+ public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQException
{
- ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
+ commitTran(connWrapper);
+ return new StoreFuture()
+ {
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public void waitForCompletion()
+ {
+ }
+ };
+
+ }
+
+ public void abortTran(ConnectionWrapper connWrapper) throws AMQException
+ {
if (connWrapper == null)
{
throw new AMQException("Fatal internal error: transactional context is empty at abortTran");
@@ -1072,272 +1047,261 @@
{
throw new AMQException("Error aborting transaction: " + e, e);
}
- finally
- {
- context.setPayload(null);
- }
+
}
- public boolean inTran(StoreContext context)
+ public Long getNewMessageId()
{
- return context.getPayload() != null;
+ return _messageId.incrementAndGet();
}
- public Long getNewMessageId()
+
+ private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
+ throws SQLException
{
- return _messageId.getAndIncrement();
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA);
+ stmt.setLong(1,messageId);
+
+ final int bodySize = 1 + metaData.getStorableSize();
+ byte[] underlying = new byte[bodySize];
+ underlying[0] = (byte) metaData.getType().ordinal();
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
+ buf.position(1);
+ buf = buf.slice();
+
+ metaData.writeToBuffer(0, buf);
+ ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
+ stmt.setBinaryStream(2,bis,underlying.length);
+ stmt.executeUpdate();
+
}
- public void storeContentBodyChunk(StoreContext context,
- Long messageId,
- int index,
- ContentChunk contentBody,
- boolean lastContentBody) throws AMQException
- {
- boolean localTx = getOrCreateTransaction(context);
- Connection conn = getConnection(context);
- ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
- try
- {
- PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
- stmt.setLong(1,messageId);
- stmt.setInt(2, index);
- byte[] chunkData = new byte[contentBody.getSize()];
- contentBody.getData().duplicate().get(chunkData);
- /* this would be the Java 6 way of doing things
- Blob dataAsBlob = conn.createBlob();
- dataAsBlob.setBytes(1L, chunkData);
- stmt.setBlob(3, dataAsBlob);
- */
- ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
- stmt.setBinaryStream(3, bis, chunkData.length);
- stmt.executeUpdate();
- connWrapper.requiresCommit();
- if(localTx)
- {
- commitTran(context);
- }
- }
- catch (SQLException e)
+
+ private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
+ {
+ Connection conn = newConnection();
+
+ MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin();
+
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
+
+ long maxId = 0;
+
+ while(rs.next())
{
- if(localTx)
+
+ long messageId = rs.getLong(1);
+ Blob dataAsBlob = rs.getBlob(2);
+
+ if(messageId > maxId)
{
- abortTran(context);
+ maxId = messageId;
}
- throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
+ byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
+ StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+ StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false);
+ messageHandler.message(message);
+
+
}
+ _messageId.set(maxId);
+
+ messageHandler.completeMessageRecovery();
}
- public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData mmd)
- throws AMQException
- {
- boolean localTx = getOrCreateTransaction(context);
- Connection conn = getConnection(context);
- ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
- try
- {
+ private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
+ {
+ Connection conn = newConnection();
- PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
- stmt.setLong(1,messageId);
- stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString());
- stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString());
- stmt.setShort(4, mmd.getMessagePublishInfo().isMandatory() ? (short) 1 : (short) 0);
- stmt.setShort(5, mmd.getMessagePublishInfo().isImmediate() ? (short) 1 : (short) 0);
-
- ContentHeaderBody headerBody = mmd.getContentHeaderBody();
- final int bodySize = headerBody.getSize();
- byte[] underlying = new byte[bodySize];
- ByteBuffer buf = ByteBuffer.wrap(underlying);
- headerBody.writePayload(buf);
-/*
- Blob dataAsBlob = conn.createBlob();
- dataAsBlob.setBytes(1L, underlying);
- stmt.setBlob(6, dataAsBlob);
-*/
- ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
- stmt.setBinaryStream(6,bis,underlying.length);
+ TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this);
- stmt.setInt(7, mmd.getContentChunkCount());
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
- stmt.executeUpdate();
- connWrapper.requiresCommit();
- if(localTx)
- {
- commitTran(context);
- }
- }
- catch (SQLException e)
+ while(rs.next())
{
- if(localTx)
- {
- abortTran(context);
- }
- throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
+ String queueName = rs.getString(1);
+ long messageId = rs.getLong(2);
+ queueEntryHandler.queueEntry(queueName,messageId);
}
+
+ queueEntryHandler.completeQueueEntryRecovery();
+
}
- public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+ StorableMessageMetaData getMetaData(long messageId) throws SQLException
{
- boolean localTx = getOrCreateTransaction(context);
- Connection conn = getConnection(context);
-
+ Connection conn = newConnection();
try
{
-
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_META_DATA);
stmt.setLong(1,messageId);
ResultSet rs = stmt.executeQuery();
if(rs.next())
{
- final AMQShortString exchange = new AMQShortString(rs.getString(1));
- final AMQShortString routingKey = rs.getString(2) == null ? null : new AMQShortString(rs.getString(2));
- final boolean mandatory = (rs.getShort(3) != (short)0);
- final boolean immediate = (rs.getShort(4) != (short)0);
- MessagePublishInfo info = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return exchange;
- }
-
- public void setExchange(AMQShortString exchange)
- {
-
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- } ;
- Blob dataAsBlob = rs.getBlob(5);
+ Blob dataAsBlob = rs.getBlob(1);
byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
- ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
+ StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
- ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, dataAsBytes.length);
+ return metaData;
+ }
+ else
+ {
+ throw new RuntimeException("Meta data not found for message with id " + messageId);
+ }
+
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+
+ private void addContent(Connection conn, long messageId, int offset, ByteBuffer src)
+ {
- if(localTx)
- {
- commitTran(context);
- }
- return new MessageMetaData(info, chb, rs.getInt(6));
+ try
+ {
+ final boolean newConnection = conn == null;
+
+ if(newConnection)
+ {
+ conn = newConnection();
}
- else
+
+ src = src.slice();
+
+ byte[] chunkData = new byte[src.limit()];
+ src.duplicate().get(chunkData);
+
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+ stmt.setInt(2, offset);
+ stmt.setInt(3, offset+chunkData.length);
+
+
+ /* this would be the Java 6 way of doing things
+ Blob dataAsBlob = conn.createBlob();
+ dataAsBlob.setBytes(1L, chunkData);
+ stmt.setBlob(3, dataAsBlob);
+ */
+ ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
+ stmt.setBinaryStream(4, bis, chunkData.length);
+ stmt.executeUpdate();
+
+ if(newConnection)
{
- if(localTx)
- {
- abortTran(context);
- }
- throw new AMQException("Metadata not found for message with id " + messageId);
+ conn.commit();
+ conn.close();
}
}
catch (SQLException e)
{
- if(localTx)
+ if(conn != null)
{
- abortTran(context);
+ try
+ {
+ conn.close();
+ }
+ catch (SQLException e1)
+ {
+
+ }
}
- throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
+ throw new RuntimeException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
}
}
- public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
- {
- boolean localTx = getOrCreateTransaction(context);
- Connection conn = getConnection(context);
-
- try
- {
+ public int getContent(long messageId, int offset, ByteBuffer dst)
+ {
+ Connection conn = null;
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
- stmt.setLong(1,messageId);
- stmt.setInt(2, index);
- ResultSet rs = stmt.executeQuery();
- if(rs.next())
- {
- Blob dataAsBlob = rs.getBlob(1);
+ try
+ {
+ conn = newConnection();
- final int size = (int) dataAsBlob.length();
- byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
- final ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+ stmt.setInt(2, offset);
+ stmt.setInt(3, offset+dst.remaining());
+ ResultSet rs = stmt.executeQuery();
- ContentChunk cb = new ContentChunk()
- {
+ int written = 0;
- public int getSize()
- {
- return size;
- }
-
- public ByteBuffer getData()
- {
- return buf;
- }
+ while(rs.next())
+ {
+ int offsetInMessage = rs.getInt(1);
+ Blob dataAsBlob = rs.getBlob(2);
- public void reduceToFit()
- {
+ final int size = (int) dataAsBlob.length();
+ byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
- }
- };
+ int posInArray = offset + written - offsetInMessage;
+ int count = size - posInArray;
+ if(count > dst.remaining())
+ {
+ count = dst.remaining();
+ }
+ dst.put(dataAsBytes,posInArray,count);
+ written+=count;
- if(localTx)
- {
- commitTran(context);
- }
+ if(dst.remaining() == 0)
+ {
+ break;
+ }
+ }
- return cb;
+ conn.close();
+ return written;
- }
- else
- {
- if(localTx)
- {
- abortTran(context);
- }
- throw new AMQException("Message not found for message with id " + messageId);
- }
+ }
+ catch (SQLException e)
+ {
+ if(conn != null)
+ {
+ try
+ {
+ conn.close();
}
- catch (SQLException e)
+ catch (SQLException e1)
{
- if(localTx)
- {
- abortTran(context);
- }
- throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
}
+ }
+
+ throw new RuntimeException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
+ }
@@ -1348,173 +1312,158 @@
return true;
}
- private void checkNotClosed() throws MessageStoreClosedException
- {
- if (_closed.get())
- {
- throw new MessageStoreClosedException();
- }
- }
-
- private static final class ProcessAction
+ private synchronized void stateTransition(State requiredState, State newState) throws AMQException
{
- private final AMQQueue _queue;
- private final StoreContext _context;
- private final AMQMessage _message;
-
- public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message)
- {
- _queue = queue;
- _context = context;
- _message = message;
- }
-
- public void process() throws AMQException
+ if (_state != requiredState)
{
- _queue.enqueue(_context, _message);
+ throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
+ + "; currently in state: " + _state);
}
+ _state = newState;
}
- private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
- throws SQLException, AMQException
+ private class DerbyTransaction implements Transaction
{
- Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>();
- List<ProcessAction> actions = new ArrayList<ProcessAction>();
+ private final ConnectionWrapper _connWrapper;
- final boolean inLocaltran = inTran(context);
- Connection conn = null;
- try
+ private DerbyTransaction()
{
- if(inLocaltran)
+ try
{
- conn = getConnection(context);
+ _connWrapper = new ConnectionWrapper(newConnection());
}
- else
+ catch (SQLException e)
{
- conn = newConnection();
+ throw new RuntimeException(e);
}
+ }
- MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
- long maxId = 1;
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ {
+ DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, messageId);
+ }
- TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ {
+ DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, messageId);
- Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+ }
- while (rs.next())
- {
- AMQShortString queueName = new AMQShortString(rs.getString(1));
+ public void commitTran() throws AMQException
+ {
+ DerbyMessageStore.this.commitTran(_connWrapper);
+ }
- AMQQueue queue = queues.get(queueName);
- if (queue == null)
- {
- queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
+ public StoreFuture commitTranAsync() throws AMQException
+ {
+ return DerbyMessageStore.this.commitTranAsync(_connWrapper);
+ }
- _virtualHost.getQueueRegistry().registerQueue(queue);
- queues.put(queueName, queue);
+ public void abortTran() throws AMQException
+ {
+ DerbyMessageStore.this.abortTran(_connWrapper);
+ }
+ }
- //Log Recovery Start
- CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1004(String.valueOf(queue.getName()), true));
- }
+ private class StoredDerbyMessage implements StoredMessage
+ {
- long messageId = rs.getLong(2);
- maxId = Math.max(maxId, messageId);
- AMQMessage message = msgMap.get(messageId);
+ private final long _messageId;
+ private volatile WeakReference<StorableMessageMetaData> _metaDataRef;
+ private Connection _conn;
- if(message != null)
- {
- message.incrementReference();
- }
- else
- {
- message = new AMQMessage(messageId, this, messageHandleFactory, txnContext);
- msgMap.put(messageId,message);
- }
+ StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
+ {
+ this(messageId, metaData, true);
+ }
- if (_logger.isDebugEnabled())
- {
- _logger.debug("On recovery, delivering " + message.getMessageId() + " to " + queue.getName());
- }
- Integer count = _queueRecoveries.get(queueName);
- if (count == null)
+ StoredDerbyMessage(long messageId,
+ StorableMessageMetaData metaData, boolean persist)
+ {
+ try
+ {
+ _messageId = messageId;
+
+ _metaDataRef = new WeakReference(metaData);
+ if(persist)
{
- count = 0;
+ _conn = newConnection();
+ storeMetaData(_conn, messageId, metaData);
}
-
- _queueRecoveries.put(queueName, ++count);
-
- actions.add(new ProcessAction(queue, context, message));
}
-
- for(ProcessAction action : actions)
+ catch (SQLException e)
{
- action.process();
+ throw new RuntimeException(e);
}
- _messageId.set(maxId + 1);
- }
- catch (SQLException e)
- {
- _logger.error("Error: " + e, e);
- throw e;
}
- finally
+
+ public StorableMessageMetaData getMetaData()
{
- if (inLocaltran && conn != null)
+ StorableMessageMetaData metaData = _metaDataRef.get();
+ if(metaData == null)
{
- conn.close();
+ try
+ {
+ metaData = DerbyMessageStore.this.getMetaData(_messageId);
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException(e);
+ }
+ _metaDataRef = new WeakReference(metaData);
}
+
+ return metaData;
}
- if (_logger.isInfoEnabled())
+ public long getMessageNumber()
{
- _logger.info("Recovered message counts: " + _queueRecoveries);
+ return _messageId;
}
- for(Map.Entry<AMQShortString,Integer> entry : _queueRecoveries.entrySet())
+ public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
{
- CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1005(entry.getValue(), String.valueOf(entry.getKey())));
-
- CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1006(String.valueOf(entry.getKey()), true));
+ DerbyMessageStore.this.addContent(_conn, _messageId, offsetInMessage, src);
}
- // Free the memory
- _queueRecoveries = null;
-
- }
-
- private Connection getConnection(final StoreContext context)
- {
- return ((ConnectionWrapper)context.getPayload()).getConnection();
- }
-
- private boolean getOrCreateTransaction(StoreContext context) throws AMQException
- {
-
- ConnectionWrapper tx = (ConnectionWrapper) context.getPayload();
- if (tx == null)
+ public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
{
- beginTran(context);
- return true;
+ return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
}
- return false;
- }
-
- private synchronized void stateTransition(State requiredState, State newState) throws AMQException
- {
- if (_state != requiredState)
+ public StoreFuture flushToStore()
{
- throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
- + "; currently in state: " + _state);
+ try
+ {
+ if(_conn != null)
+ {
+ _conn.commit();
+ _conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ _conn = null;
+ }
+ return IMMEDIATE_FUTURE;
}
- _state = newState;
+ public void remove()
+ {
+ flushToStore();
+ DerbyMessageStore.this.removeMessage(_messageId);
+ }
}
+
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Sun Oct 25 22:58:57 2009
@@ -20,21 +20,20 @@
*/
package org.apache.qpid.server.store;
-import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.commons.configuration.Configuration;
import java.util.ArrayList;
import java.util.Collections;
@@ -43,9 +42,10 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.nio.ByteBuffer;
/** A simple message store that stores the messages in a threadsafe structure in memory. */
-public class MemoryMessageStore extends AbstractMessageStore
+public class MemoryMessageStore implements MessageStore
{
private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
@@ -53,52 +53,74 @@
private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity";
- protected ConcurrentMap<Long, MessageMetaData> _metaDataMap;
-
- protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap;
private final AtomicLong _messageId = new AtomicLong(1);
private AtomicBoolean _closed = new AtomicBoolean(false);
private LogSubject _logSubject;
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
{
- super.configure(virtualHost,base,config);
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ {
+ }
- int hashtableCapacity = config.getStoreConfiguration().getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
- _log.info("Using capacity " + hashtableCapacity + " for hash tables");
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
- }
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ {
+ }
- public void close() throws Exception
- {
- _closed.getAndSet(true);
- if (_metaDataMap != null)
+ public void commitTran() throws AMQException
{
- _metaDataMap.clear();
- _metaDataMap = null;
}
- if (_contentBodyMap != null)
+
+ public StoreFuture commitTranAsync() throws AMQException
+ {
+ return IMMEDIATE_FUTURE;
+ }
+
+ public void abortTran() throws AMQException
{
- _contentBodyMap.clear();
- _contentBodyMap = null;
}
- super.close();
+ };
+
+ public void configureConfigStore(String name, ConfigurationRecoveryHandler handler, Configuration configuration, LogSubject logSubject) throws Exception
+ {
+ _logSubject = logSubject;
+ CurrentActor.get().message(_logSubject, ConfigStoreMessages.CFG_1001(this.getClass().getName()));
+
+
}
- public void removeMessage(StoreContext context, Long messageId) throws AMQException
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception
{
- checkNotClosed();
- if (_log.isDebugEnabled())
+ if(_logSubject == null)
{
- _log.debug("Removing message with id " + messageId);
+ _logSubject = logSubject;
}
- _metaDataMap.remove(messageId);
- _contentBodyMap.remove(messageId);
+ int hashtableCapacity = config.getInt(name + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
+ _log.info("Using capacity " + hashtableCapacity + " for hash tables");
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1001(this.getClass().getName()));
+ }
+
+ public void close() throws Exception
+ {
+ _closed.getAndSet(true);
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1003());
+
}
+ public StoredMessage addMessage(StorableMessageMetaData metaData)
+ {
+ final long id = _messageId.getAndIncrement();
+ StoredMemoryMessage message = new StoredMemoryMessage(id, metaData);
+
+ return message;
+ }
+
+
public void createExchange(Exchange exchange) throws AMQException
{
@@ -135,35 +157,19 @@
// Not required to do anything
}
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public void configureTransactionLog(String name,
+ TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
{
- // Not required to do anything
+ //To change body of implemented methods use File | Settings | File Templates.
}
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public Transaction newTransaction()
{
- // Not required to do anything
+ return IN_MEMORY_TRANSACTION;
}
- public void beginTran(StoreContext context) throws AMQException
- {
- // Not required to do anything
- }
-
- public void commitTran(StoreContext context) throws AMQException
- {
- // Not required to do anything
- }
-
- public void abortTran(StoreContext context) throws AMQException
- {
- // Not required to do anything
- }
-
- public boolean inTran(StoreContext context)
- {
- return false;
- }
public List<AMQQueue> createQueues() throws AMQException
{
@@ -175,48 +181,6 @@
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody)
- throws AMQException
- {
- checkNotClosed();
- List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
-
- if (bodyList == null && lastContentBody)
- {
- _contentBodyMap.put(messageId, Collections.singletonList(contentBody));
- }
- else
- {
- if (bodyList == null)
- {
- bodyList = new ArrayList<ContentChunk>();
- _contentBodyMap.put(messageId, bodyList);
- }
-
- bodyList.add(index, contentBody);
- }
- }
-
- public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
- throws AMQException
- {
- checkNotClosed();
- _metaDataMap.put(messageId, messageMetaData);
- }
-
- public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
- {
- checkNotClosed();
- return _metaDataMap.get(messageId);
- }
-
- public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
- {
- checkNotClosed();
- List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
- return bodyList.get(index);
- }
-
public boolean isPersistent()
{
return false;
@@ -229,4 +193,6 @@
throw new MessageStoreClosedException();
}
}
+
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Sun Oct 25 22:58:57 2009
@@ -20,53 +20,43 @@
*/
package org.apache.qpid.server.store;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
/**
- * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages, queues
- * and exchanges in a transactional manner.
- *
- * <p/>All message store, remove, enqueue and dequeue operations are carried out against a {@link StoreContext} which
- * encapsulates the transactional context they are performed in. Many such operations can be carried out in a single
- * transaction.
+ * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
*
- * <p/>The storage and removal of queues and exchanges, are not carried out in a transactional context.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities
- * <tr><td> Accept transaction boundary demarcations: Begin, Commit, Abort.
- * <tr><td> Store and remove queues.
- * <tr><td> Store and remove exchanges.
- * <tr><td> Store and remove messages.
- * <tr><td> Bind and unbind queues to exchanges.
- * <tr><td> Enqueue and dequeue messages to queues.
- * <tr><td> Generate message identifiers.
- * </table>
*/
-public interface MessageStore
+public interface MessageStore extends DurableConfigurationStore, TransactionLog
{
+ StoreFuture IMMEDIATE_FUTURE = new StoreFuture()
+ {
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public void waitForCompletion()
+ {
+
+ }
+ };
+
+
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
*
- * @param virtualHost The virtual host using by this store
- * @param base The base element identifier from which all configuration items are relative. For example, if
- * the base element is "store", the all elements used by concrete classes will be "store.foo" etc.
- * @param hostConfig The apache commons configuration object.
+ * @param name The name to be used by this storem
+ * @param recoveryHandler Handler to be called as the store recovers on start up
+ * @param config The apache commons configuration object.
*
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
- void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration hostConfig) throws Exception;
+ void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception;
/**
* Called to close and cleanup any resources used by the message store.
@@ -75,203 +65,16 @@
*/
void close() throws Exception;
- /**
- * Removes the specified message from the store in the given transactional store context.
- *
- * @param storeContext The transactional context to remove the message in.
- * @param messageId Identifies the message to remove.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void removeMessage(StoreContext storeContext, Long messageId) throws AMQException;
-
- /**
- * Makes the specified exchange persistent.
- *
- * @param exchange The exchange to persist.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void createExchange(Exchange exchange) throws AMQException;
-
- /**
- * Removes the specified persistent exchange.
- *
- * @param exchange The exchange to remove.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void removeExchange(Exchange exchange) throws AMQException;
-
- /**
- * Binds the specified queue to an exchange with a routing key.
- *
- * @param exchange The exchange to bind to.
- * @param routingKey The routing key to bind by.
- * @param queue The queue to bind.
- * @param args Additional parameters.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
-
- /**
- * Unbinds the specified from an exchange under a particular routing key.
- *
- * @param exchange The exchange to unbind from.
- * @param routingKey The routing key to unbind.
- * @param queue The queue to unbind.
- * @param args Additonal parameters.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
- /**
- * Makes the specified queue persistent.
- *
- * @param queue The queue to store.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void createQueue(AMQQueue queue) throws AMQException;
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
- /**
- * Makes the specified queue persistent.
- *
- * @param queue The queue to store.
- *
- * @param arguments The additional arguments to the binding
- * @throws AMQException If the operation fails for any reason.
- */
- void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException;
-
- /**
- * Removes the specified queue from the persistent store.
- *
- * @param queue The queue to remove.
- * @throws AMQException If the operation fails for any reason.
- */
- void removeQueue(final AMQQueue queue) throws AMQException;
-
- /**
- * Places a message onto a specified queue, in a given transactional context.
- *
- * @param context The transactional context for the operation.
- * @param queue The queue to place the message on.
- * @param messageId The message to enqueue.
- * @throws AMQException If the operation fails for any reason.
- */
- void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
-
- /**
- * Extracts a message from a specified queue, in a given transactional context.
- *
- * @param context The transactional context for the operation.
- * @param queue The queue to place the message on.
- * @param messageId The message to dequeue.
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
- */
- void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
-
- /**
- * Begins a transactional context.
- *
- * @param context The transactional context to begin.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void beginTran(StoreContext context) throws AMQException;
-
- /**
- * Commits all operations performed within a given transactional context.
- *
- * @param context The transactional context to commit all operations for.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void commitTran(StoreContext context) throws AMQException;
-
- /**
- * Abandons all operations performed within a given transactional context.
- *
- * @param context The transactional context to abandon.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void abortTran(StoreContext context) throws AMQException;
-
- /**
- * Tests a transactional context to see if it has been begun but not yet committed or aborted.
- *
- * @param context The transactional context to test.
- *
- * @return <tt>true</tt> if the transactional context is live, <tt>false</tt> otherwise.
- */
- boolean inTran(StoreContext context);
-
- /**
- * Return a valid, currently unused message id.
- *
- * @return A fresh message id.
- */
- Long getNewMessageId();
-
- /**
- * Stores a chunk of message data.
- *
- * @param context The transactional context for the operation.
- * @param messageId The message to store the data for.
- * @param index The index of the data chunk.
- * @param contentBody The content of the data chunk.
- * @param lastContentBody Flag to indicate that this is the last such chunk for the message.
- *
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
- */
- void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody,
- boolean lastContentBody) throws AMQException;
-
- /**
- * Stores message meta-data.
- *
- * @param context The transactional context for the operation.
- * @param messageId The message to store the data for.
- * @param messageMetaData The message meta data to store.
- *
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
- */
- void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;
-
- /**
- * Retrieves message meta-data.
- *
- * @param context The transactional context for the operation.
- * @param messageId The message to get the meta-data for.
- *
- * @return The message meta data.
- *
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
- */
- MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
-
- /**
- * Retrieves a chunk of message data.
- *
- * @param context The transactional context for the operation.
- * @param messageId The message to get the data chunk for.
- * @param index The offset index of the data chunk within the message.
- *
- * @return A chunk of message data.
- *
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
- */
- ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
/**
* Is this store capable of persisting the data
- *
+ *
* @return true if this store is capable of persisting data
*/
boolean isPersistent();
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Sun Oct 25 22:58:57 2009
@@ -35,6 +35,7 @@
private String _name;
private Object _payload;
+
public StoreContext()
{
_name = "StoreContext";
@@ -68,4 +69,5 @@
{
return "<_name = " + _name + ", _payload = " + _payload + ">";
}
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Sun Oct 25 22:58:57 2009
@@ -46,10 +46,12 @@
AMQQueue getQueue();
QueueEntry.SubscriptionAcquiredState getOwningState();
+ QueueEntry.SubscriptionAssignedState getAssignedState();
+
void setQueue(AMQQueue queue, boolean exclusive);
- AMQChannel getChannel();
+ void setNoLocal(boolean noLocal);
AMQShortString getConsumerTag();
@@ -63,23 +65,24 @@
boolean isClosed();
- boolean isBrowser();
+ boolean acquires();
- void close();
+ boolean seesRequeues();
- boolean filtersMessages();
+ void close();
void send(QueueEntry msg) throws AMQException;
- void queueDeleted(AMQQueue queue);
+ void queueDeleted(AMQQueue queue);
boolean wouldSuspend(QueueEntry msg);
void getSendLock();
+
void releaseSendLock();
- void resend(final QueueEntry entry) throws AMQException;
+ void onDequeue(final QueueEntry queueEntry);
void restoreCredit(final QueueEntry queueEntry);
@@ -87,13 +90,17 @@
public State getState();
- QueueEntry getLastSeenEntry();
+ AMQQueue.Context getQueueContext();
- boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue);
+ void setQueueContext(AMQQueue.Context queueContext);
boolean isActive();
+ void confirmAutoClose();
+
+ public void set(String key, Object value);
+ public Object get(String key);
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org