You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/12/28 14:02:48 UTC
svn commit: r1225178 [4/8] - in /qpid/trunk/qpid/java: ./ bdbstore/src/main/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/
bdbstore/src/test/ bdbstore/src/test/jav...
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java Wed Dec 28 13:02:41 2011
@@ -18,10 +18,7 @@
*/
package org.apache.qpid.server.security.access;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import org.apache.commons.lang.StringUtils;
import org.apache.qpid.framing.AMQShortString;
@@ -35,7 +32,7 @@ import org.apache.qpid.server.queue.AMQQ
* {@link #equals(Object)} and {@link #hashCode()} are intended for use in maps. This is due to the wildcard matching
* described above.
*/
-public class ObjectProperties extends HashMap<ObjectProperties.Property, String>
+public class ObjectProperties
{
/** serialVersionUID */
private static final long serialVersionUID = -1356019341374170495L;
@@ -93,7 +90,9 @@ public class ObjectProperties extends Ha
return properties;
}
}
-
+
+ private final EnumMap<Property, String> _properties = new EnumMap<Property, String>(Property.class);
+
public static List<String> getAllPropertyNames()
{
List<String> properties = new ArrayList<String>();
@@ -113,7 +112,7 @@ public class ObjectProperties extends Ha
{
super();
- putAll(copy);
+ _properties.putAll(copy._properties);
}
public ObjectProperties(String name)
@@ -231,7 +230,7 @@ public class ObjectProperties extends Ha
public List<String> getPropertyNames()
{
List<String> properties = new ArrayList<String>();
- for (Property property : keySet())
+ for (Property property : _properties.keySet())
{
properties.add(property.getName());
}
@@ -240,17 +239,22 @@ public class ObjectProperties extends Ha
public Boolean isSet(Property key)
{
- return containsKey(key) && Boolean.valueOf(get(key));
+ return _properties.containsKey(key) && Boolean.valueOf(_properties.get(key));
}
-
+
+ public String get(Property key)
+ {
+ return _properties.get(key);
+ }
+
public String getName()
{
- return get(Property.NAME);
+ return _properties.get(Property.NAME);
}
public void setName(String name)
{
- put(Property.NAME, name);
+ _properties.put(Property.NAME, name);
}
public void setName(AMQShortString name)
@@ -262,39 +266,38 @@ public class ObjectProperties extends Ha
{
return put(key, value == null ? "" : value.asString());
}
-
- @Override
+
public String put(Property key, String value)
{
- return super.put(key, value == null ? "" : value.trim());
+ return _properties.put(key, value == null ? "" : value.trim());
}
public void put(Property key, Boolean value)
{
if (value != null)
{
- super.put(key, Boolean.toString(value));
+ _properties.put(key, Boolean.toString(value));
}
}
public boolean matches(ObjectProperties properties)
{
- if (properties.keySet().isEmpty())
+ if (properties._properties.keySet().isEmpty())
{
return true;
}
- if (!keySet().containsAll(properties.keySet()))
+ if (!_properties.keySet().containsAll(properties._properties.keySet()))
{
return false;
}
- for (Map.Entry<Property,String> entry : properties.entrySet())
+ for (Map.Entry<Property,String> entry : properties._properties.entrySet())
{
Property key = entry.getKey();
String ruleValue = entry.getValue();
- String thisValue = get(key);
+ String thisValue = _properties.get(key);
if (!valueMatches(thisValue, ruleValue))
{
@@ -315,4 +318,29 @@ public class ObjectProperties extends Ha
&& thisValue.length() > ruleValue.length()
&& thisValue.startsWith(ruleValue.substring(0, ruleValue.length() - 2)));
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ObjectProperties that = (ObjectProperties) o;
+
+ if (_properties != null ? !_properties.equals(that._properties) : that._properties != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _properties != null ? _properties.hashCode() : 0;
+ }
+
+ @Override
+ public String toString()
+ {
+ return _properties.toString();
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Wed Dec 28 13:02:41 2011
@@ -52,6 +52,8 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
/**
@@ -60,7 +62,7 @@ import org.apache.qpid.server.queue.AMQQ
*
* TODO extract the SQL statements into a generic JDBC store
*/
-public class DerbyMessageStore implements MessageStore
+public class DerbyMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
@@ -197,12 +199,16 @@ public class DerbyMessageStore implement
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
- CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
-
if(!_configured)
{
_logSubject = logSubject;
+ }
+
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+
+ if(!_configured)
+ {
commonConfiguration(name, storeConfiguration, logSubject);
_configured = true;
@@ -219,6 +225,11 @@ public class DerbyMessageStore implement
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
+
+ if(!_configured)
+ {
+ _logSubject = logSubject;
+ }
CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
if(!_configured)
@@ -697,7 +708,7 @@ public class DerbyMessageStore implement
if (results == 0)
{
- throw new RuntimeException("Message metadata not found for message id " + messageId);
+ _logger.warn("Message metadata not found for message id " + messageId);
}
if (_logger.isDebugEnabled())
@@ -1678,14 +1689,26 @@ public class DerbyMessageStore implement
}
}
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, messageId);
+ if(message.getStoredMessage() instanceof StoredDerbyMessage)
+ {
+ try
+ {
+ ((StoredDerbyMessage)message.getStoredMessage()).store(_connWrapper.getConnection());
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Exception on enqueuing message " + _messageId, e);
+ }
+ }
+
+ DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, messageId);
+ DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
}
@@ -1709,8 +1732,11 @@ public class DerbyMessageStore implement
{
private final long _messageId;
+ private StorableMessageMetaData _metaData;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
- private Connection _conn;
+ private byte[] _data;
+ private volatile SoftReference<byte[]> _dataRef;
+
StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
{
@@ -1721,27 +1747,19 @@ public class DerbyMessageStore implement
StoredDerbyMessage(long messageId,
StorableMessageMetaData metaData, boolean persist)
{
- try
- {
- _messageId = messageId;
+ _messageId = messageId;
+
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- if(persist)
- {
- _conn = newConnection();
- storeMetaData(_conn, messageId, metaData);
- }
- }
- catch (SQLException e)
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ if(persist)
{
- throw new RuntimeException(e);
+ _metaData = metaData;
}
-
}
public StorableMessageMetaData getMetaData()
{
- StorableMessageMetaData metaData = _metaDataRef.get();
+ StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
if(metaData == null)
{
try
@@ -1765,27 +1783,62 @@ public class DerbyMessageStore implement
public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
{
- DerbyMessageStore.this.addContent(_conn, _messageId, offsetInMessage, src);
+ src = src.slice();
+
+ if(_data == null)
+ {
+ _data = new byte[src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+ src.duplicate().get(_data);
+ }
+ else
+ {
+ byte[] oldData = _data;
+ _data = new byte[oldData.length + src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+
+ System.arraycopy(oldData,0,_data,0,oldData.length);
+ src.duplicate().get(_data, oldData.length, src.remaining());
+ }
+
}
public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
{
- return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
+ {
+ int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+ dst.put(data, offsetInMessage, length);
+ return length;
+ }
+ else
+ {
+ return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ }
+ }
+
+
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ getContent(offsetInMessage, buf);
+ buf.position(0);
+ return buf;
}
- public StoreFuture flushToStore()
+ public synchronized StoreFuture flushToStore()
{
try
{
- if(_conn != null)
+ if(_metaData != null)
{
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Flushing message " + _messageId + " to store");
- }
+ Connection conn = newConnection();
+
+ store(conn);
- _conn.commit();
- _conn.close();
+ conn.commit();
+ conn.close();
}
}
catch (SQLException e)
@@ -1796,16 +1849,34 @@ public class DerbyMessageStore implement
}
throw new RuntimeException(e);
}
- finally
+ return IMMEDIATE_FUTURE;
+ }
+
+ private synchronized void store(final Connection conn) throws SQLException
+ {
+ if(_metaData != null)
{
- _conn = null;
+ try
+ {
+ storeMetaData(conn, _messageId, _metaData);
+ DerbyMessageStore.this.addContent(conn, _messageId, 0,
+ _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ }
+ finally
+ {
+ _metaData = null;
+ _data = null;
+ }
+ }
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Storing message " + _messageId + " to store");
}
- return IMMEDIATE_FUTURE;
}
public void remove()
{
- flushToStore();
DerbyMessageStore.this.removeMessage(_messageId);
}
}
@@ -1839,4 +1910,5 @@ public class DerbyMessageStore implement
}
}
}
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Wed Dec 28 13:02:41 2011
@@ -35,10 +35,12 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
/** A simple message store that stores the messages in a threadsafe structure in memory. */
-public class MemoryMessageStore implements MessageStore
+public class MemoryMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
@@ -53,11 +55,11 @@ public class MemoryMessageStore implemen
private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
{
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Wed Dec 28 13:02:41 2011
@@ -20,14 +20,16 @@
*/
package org.apache.qpid.server.store;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.message.EnqueableMessage;
/**
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
*
*/
-public interface MessageStore extends DurableConfigurationStore, TransactionLog
+public interface MessageStore
{
StoreFuture IMMEDIATE_FUTURE = new StoreFuture()
{
@@ -77,4 +79,69 @@ public interface MessageStore extends Du
boolean isPersistent();
+
+ public static interface Transaction
+ {
+ /**
+ * Places a message onto a specified queue, in a given transactional context.
+ *
+ *
+ *
+ * @param queue The queue to place the message on.
+ * @param message
+ * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason.
+ */
+ void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
+
+ /**
+ * Extracts a message from a specified queue, in a given transactional context.
+ *
+ * @param queue The queue to place the message on.
+ * @param message The message to dequeue.
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
+
+
+ /**
+ * Commits all operations performed within a given transactional context.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void commitTran() throws AMQStoreException;
+
+ /**
+ * Commits all operations performed within a given transactional context.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ StoreFuture commitTranAsync() throws AMQStoreException;
+
+ /**
+ * Abandons all operations performed within a given transactional context.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void abortTran() throws AMQStoreException;
+
+
+
+ }
+
+ public void configureTransactionLog(String name,
+ TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception;
+
+ Transaction newTransaction();
+
+
+
+ public static interface StoreFuture
+ {
+ boolean isComplete();
+
+ void waitForCompletion();
+ }
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Wed Dec 28 13:02:41 2011
@@ -26,15 +26,13 @@ import java.nio.ByteBuffer;
public class StoredMemoryMessage implements StoredMessage
{
private final long _messageNumber;
- private final ByteBuffer _content;
+ private ByteBuffer _content;
private final StorableMessageMetaData _metaData;
public StoredMemoryMessage(long messageNumber, StorableMessageMetaData metaData)
{
_messageNumber = messageNumber;
_metaData = metaData;
- _content = ByteBuffer.allocate(metaData.getContentSize());
-
}
public long getMessageNumber()
@@ -44,26 +42,79 @@ public class StoredMemoryMessage impleme
public void addContent(int offsetInMessage, ByteBuffer src)
{
- src = src.duplicate();
- ByteBuffer dst = _content.duplicate();
- dst.position(offsetInMessage);
- dst.put(src);
+ if(_content == null)
+ {
+ if(offsetInMessage == 0)
+ {
+ _content = src.slice();
+ }
+ else
+ {
+ final int contentSize = _metaData.getContentSize();
+ int size = (contentSize < offsetInMessage + src.remaining())
+ ? offsetInMessage + src.remaining()
+ : contentSize;
+ _content = ByteBuffer.allocate(size);
+ addContent(offsetInMessage, src);
+ }
+ }
+ else
+ {
+ if(_content.limit() >= offsetInMessage + src.remaining())
+ {
+ _content.position(offsetInMessage);
+ _content.put(src);
+ _content.position(0);
+ }
+ else
+ {
+ final int contentSize = _metaData.getContentSize();
+ int size = (contentSize < offsetInMessage + src.remaining())
+ ? offsetInMessage + src.remaining()
+ : contentSize;
+ ByteBuffer oldContent = _content;
+ _content = ByteBuffer.allocate(size);
+ _content.put(oldContent);
+ _content.position(0);
+ addContent(offsetInMessage, src);
+ }
+
+ }
}
public int getContent(int offset, ByteBuffer dst)
{
ByteBuffer src = _content.duplicate();
- src.position(offset);
- src = src.slice();
- if(dst.remaining() < src.limit())
+
+ int oldPosition = src.position();
+
+ src.position(oldPosition + offset);
+
+ int length = dst.remaining() < src.remaining() ? dst.remaining() : src.remaining();
+ src.limit(oldPosition + length);
+
+ dst.put(src);
+
+
+ return length;
+ }
+
+
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer buf = _content.duplicate();
+
+ if(offsetInMessage != 0)
{
- src.limit(dst.remaining());
+ buf.position(offsetInMessage);
+ buf = buf.slice();
}
- dst.put(src);
- return src.limit();
+
+ buf.limit(size);
+ return buf;
}
- public TransactionLog.StoreFuture flushToStore()
+ public MessageStore.StoreFuture flushToStore()
{
return MessageStore.IMMEDIATE_FUTURE;
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java Wed Dec 28 13:02:41 2011
@@ -32,7 +32,9 @@ public interface StoredMessage<M extends
int getContent(int offsetInMessage, ByteBuffer dst);
- TransactionLog.StoreFuture flushToStore();
+ ByteBuffer getContent(int offsetInMessage, int size);
+
+ MessageStore.StoreFuture flushToStore();
void remove();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java Wed Dec 28 13:02:41 2011
@@ -20,72 +20,7 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.AMQStoreException;
-import org.apache.commons.configuration.Configuration;
-
public interface TransactionLog
{
- public static interface Transaction
- {
- /**
- * Places a message onto a specified queue, in a given transactional context.
- *
- * @param queue The queue to place the message on.
- * @param messageId The message to enqueue.
- * @throws AMQStoreException If the operation fails for any reason.
- */
- void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException;
-
- /**
- * Extracts a message from a specified queue, in a given transactional context.
- *
- * @param queue The queue to place the message on.
- * @param messageId The message to dequeue.
- * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException;
-
-
- /**
- * Commits all operations performed within a given transactional context.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- void commitTran() throws AMQStoreException;
-
- /**
- * Commits all operations performed within a given transactional context.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- StoreFuture commitTranAsync() throws AMQStoreException;
-
- /**
- * Abandons all operations performed within a given transactional context.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- void abortTran() throws AMQStoreException;
-
-
-
- }
-
- public void configureTransactionLog(String name,
- TransactionLogRecoveryHandler recoveryHandler,
- Configuration storeConfiguration,
- LogSubject logSubject) throws Exception;
-
- Transaction newTransaction();
-
-
-
- public static interface StoreFuture
- {
- boolean isComplete();
-
- void waitForCompletion();
- }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java Wed Dec 28 13:02:41 2011
@@ -22,7 +22,7 @@ package org.apache.qpid.server.store;
public interface TransactionLogRecoveryHandler
{
- QueueEntryRecoveryHandler begin(TransactionLog log);
+ QueueEntryRecoveryHandler begin(MessageStore log);
public static interface QueueEntryRecoveryHandler
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Wed Dec 28 13:02:41 2011
@@ -62,7 +62,6 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -70,10 +69,10 @@ import org.apache.qpid.framing.FieldTabl
import org.apache.qpid.AMQException;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
@@ -183,6 +182,7 @@ public class Subscription_0_10 implement
throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
}
_queue = queue;
+
Map<String, Object> arguments = queue.getArguments() == null ? Collections.EMPTY_MAP : queue.getArguments();
_traceExclude = (String) arguments.get("qpid.trace.exclude");
_trace = (String) arguments.get("qpid.trace.id");
@@ -224,8 +224,8 @@ public class Subscription_0_10 implement
if (_noLocal && entry.getMessage() instanceof MessageTransferMessage)
{
- Session messageSession= ((MessageTransferMessage)entry.getMessage()).getSession();
- if (messageSession != null && messageSession.getConnection() == _session.getConnection())
+ Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference();
+ if (connectionRef != null && connectionRef == _session.getReference())
{
return false;
}
@@ -377,35 +377,8 @@ public class Subscription_0_10 implement
{
MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
-
-
- Struct[] headers;
- if(msg.getHeader() == null)
- {
- headers = EMPTY_STRUCT_ARRAY;
- }
- else
- {
- headers = msg.getHeader().getStructs();
- }
-
- ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length);
- DeliveryProperties origDeliveryProps = null;
- for(Struct header : headers)
- {
- if(header instanceof DeliveryProperties)
- {
- origDeliveryProps = (DeliveryProperties) header;
- }
- else
- {
- if(header instanceof MessageProperties)
- {
- messageProps = (MessageProperties) header;
- }
- newHeaders.add(header);
- }
- }
+ DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
+ messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
deliveryProps = new DeliveryProperties();
if(origDeliveryProps != null)
@@ -440,17 +413,16 @@ public class Subscription_0_10 implement
deliveryProps.setRedelivered(entry.isRedelivered());
- newHeaders.add(deliveryProps);
-
if(_trace != null && messageProps == null)
{
messageProps = new MessageProperties();
- newHeaders.add(messageProps);
}
- Header header = new Header(newHeaders);
+ Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
+
- xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
+ xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
+ : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
}
else if(serverMsg instanceof AMQMessage)
{
@@ -463,8 +435,6 @@ public class Subscription_0_10 implement
message_0_8.getContent(body, 0);
body.flip();
- Struct[] headers = new Struct[] { deliveryProps, messageProps };
-
BasicContentHeaderProperties properties =
(BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
@@ -505,8 +475,9 @@ public class Subscription_0_10 implement
messageProps.setApplicationHeaders(appHeaders);
- Header header = new Header(headers);
- xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
+ Header header = new Header(deliveryProps, messageProps, null);
+ xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED)
+ : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
}
else
{
@@ -519,8 +490,6 @@ public class Subscription_0_10 implement
serverMsg.getContent(body, 0);
body.flip();
- Struct[] headers = new Struct[] { deliveryProps, messageProps };
-
deliveryProps.setExpiration(serverMsg.getExpiration());
deliveryProps.setImmediate(serverMsg.isImmediate());
@@ -567,8 +536,9 @@ public class Subscription_0_10 implement
messageProps.setApplicationHeaders(appHeaders);
*/
- Header header = new Header(headers);
- xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
+ Header header = new Header(deliveryProps, messageProps, null);
+ xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED)
+ : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
}
boolean excludeDueToFederation = false;
@@ -644,28 +614,51 @@ public class Subscription_0_10 implement
}
}
- private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
+ private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
{
- ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
- txn.dequeue(entry.getQueue(),entry.getMessage(),
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- if(restoreCredit)
- {
- restoreCredit(entry);
- }
- entry.discard();
- }
+ _deferredMessageCredit += deferredMessageCredit;
+ _deferredSizeCredit += deferredSizeCredit;
- public void onRollback()
- {
+ }
- }
- });
+ public void flushCreditState()
+ {
+ flushCreditState(false);
+ }
+ public void flushCreditState(boolean strict)
+ {
+ if(strict || !isSuspended() || _deferredMessageCredit >= 200
+ || !(_creditManager instanceof WindowCreditManager)
+ || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
+ {
+ _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
+ _deferredMessageCredit = 0;
+ _deferredSizeCredit = 0l;
+ }
}
+ private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
+ {
+ AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
+ dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ if (restoreCredit)
+ {
+ restoreCredit(entry);
+ }
+ entry.discard();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ }
+
void reject(final QueueEntry entry)
{
entry.setRedelivered();
@@ -704,7 +697,7 @@ public class Subscription_0_10 implement
{
final InboundMessage m = new InboundMessageAdapter(entry);
- final ArrayList<? extends BaseQueue> destinationQueues = alternateExchange.route(m);
+ final List<? extends BaseQueue> destinationQueues = alternateExchange.route(m);
if (destinationQueues == null || destinationQueues.isEmpty())
{
@@ -751,6 +744,7 @@ public class Subscription_0_10 implement
return _stateChangeLock.tryLock();
}
+
public void getSendLock()
{
_stateChangeLock.lock();
@@ -816,28 +810,6 @@ public class Subscription_0_10 implement
return _properties.get(key);
}
- private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
- {
- _deferredMessageCredit += deferredMessageCredit;
- _deferredSizeCredit += deferredSizeCredit;
-
- }
-
- public void flushCreditState()
- {
- flushCreditState(false);
- }
- public void flushCreditState(boolean strict)
- {
- if(strict || !isSuspended() || _deferredMessageCredit >= 200
- || !(_creditManager instanceof WindowCreditManager)
- || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
- {
- _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
- _deferredMessageCredit = 0;
- _deferredSizeCredit = 0l;
- }
- }
public FlowCreditManager_0_10 getCreditManager()
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Wed Dec 28 13:02:41 2011
@@ -75,7 +75,7 @@ public class ServerConnection extends Co
private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final long _connectionId;
-
+ private final Object _reference = new Object();
private ServerConnectionMBean _mBean;
private VirtualHost _virtualHost;
private AtomicLong _lastIoTime = new AtomicLong();
@@ -90,6 +90,11 @@ public class ServerConnection extends Co
return _config.getId();
}
+ public Object getReference()
+ {
+ return _reference;
+ }
+
@Override
protected void invoke(Method method)
{
@@ -414,13 +419,11 @@ public class ServerConnection extends Co
return _connectionId;
}
- @Override
public boolean isSessionNameUnique(byte[] name)
{
return !super.hasSessionWithName(name);
}
- @Override
public String getUserName()
{
return _authorizedPrincipal.getName();
@@ -450,11 +453,11 @@ public class ServerConnection extends Co
{
for (Session ssn : getChannels())
{
- ((ServerSession)ssn).flushCreditState();
+ ((ServerSession)ssn).receivedComplete();
}
}
- @Override
+
public ManagedObject getManagedObject()
{
return _mBean;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Wed Dec 28 13:02:41 2011
@@ -23,7 +23,6 @@ package org.apache.qpid.server.transport
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
import static org.apache.qpid.util.Serial.gt;
-import java.lang.ref.WeakReference;
import java.security.Principal;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -71,6 +70,7 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.RangeSetFactory;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
import org.slf4j.Logger;
@@ -86,6 +86,7 @@ public class ServerSession extends Sessi
private ConnectionConfig _connectionConfig;
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
+ private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction();
public static interface MessageDispositionChangeListener
{
@@ -121,8 +122,6 @@ public class ServerSession extends Sessi
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
- private final WeakReference<Session> _reference;
-
ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
@@ -134,7 +133,6 @@ public class ServerSession extends Sessi
_connectionConfig = connConfig;
_transaction = new AutoCommitTransaction(this.getMessageStore());
- _reference = new WeakReference<Session>(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
}
@@ -161,40 +159,22 @@ public class ServerSession extends Sessi
return isCommandsFull(id);
}
- public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
+ public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues)
{
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
- _transaction.enqueue(queues,message, new ServerTransaction.Action()
- {
-
- BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
-
- public void postCommit()
- {
- MessageReference<?> ref = message.newReference();
- for(int i = 0; i < _queues.length; i++)
- {
- try
- {
- _queues[i].enqueue(message);
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
- }
- ref.release();
- }
-
- public void onRollback()
- {
- // NO-OP
- }
- });
-
- incrementOutstandingTxnsIfNecessary();
- updateTransactionalActivity();
+ PostEnqueueAction postTransactionAction;
+ if(isTransactional())
+ {
+ postTransactionAction = new PostEnqueueAction(queues, message) ;
+ }
+ else
+ {
+ postTransactionAction = _postEnqueueAction;
+ postTransactionAction.setState(queues, message);
+ }
+ _transaction.enqueue(queues,message, postTransactionAction, 0L);
+ incrementOutstandingTxnsIfNecessary();
+ updateTransactionalActivity();
}
@@ -252,7 +232,7 @@ public class ServerSession extends Sessi
public RangeSet acquire(RangeSet transfers)
{
- RangeSet acquired = new RangeSet();
+ RangeSet acquired = RangeSetFactory.createRangeSet();
if(!_messageDispositionListenerMap.isEmpty())
{
@@ -300,41 +280,56 @@ public class ServerSession extends Sessi
public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
{
- if(ranges != null && !_messageDispositionListenerMap.isEmpty())
+ if(ranges != null)
{
- Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
- Iterator<Range> rangeIter = ranges.iterator();
- if(rangeIter.hasNext())
+ if(ranges.size() == 1)
{
- Range range = rangeIter.next();
+ Range r = ranges.getFirst();
+ for(int i = r.getLower(); i <= r.getUpper(); i++)
+ {
+ MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(i);
+ if(changeListener != null)
+ {
+ action.performAction(changeListener);
+ }
+ }
+ }
+ else if(!_messageDispositionListenerMap.isEmpty())
+ {
+ Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
+ Iterator<Range> rangeIter = ranges.iterator();
- while(range != null && unacceptedMessages.hasNext())
+ if(rangeIter.hasNext())
{
- int next = unacceptedMessages.next();
- while(gt(next, range.getUpper()))
+ Range range = rangeIter.next();
+
+ while(range != null && unacceptedMessages.hasNext())
{
- if(rangeIter.hasNext())
+ int next = unacceptedMessages.next();
+ while(gt(next, range.getUpper()))
{
- range = rangeIter.next();
+ if(rangeIter.hasNext())
+ {
+ range = rangeIter.next();
+ }
+ else
+ {
+ range = null;
+ break;
+ }
}
- else
+ if(range != null && range.includes(next))
{
- range = null;
- break;
+ MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
+ action.performAction(changeListener);
}
- }
- if(range != null && range.includes(next))
- {
- MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
- action.performAction(changeListener);
- }
- }
+ }
+ }
}
-
}
}
@@ -534,10 +529,10 @@ public class ServerSession extends Sessi
_taskList.remove(task);
}
- public WeakReference<Session> getReference()
- {
- return _reference;
- }
+ public Object getReference()
+ {
+ return ((ServerConnection) getConnection()).getReference();
+ }
public MessageStore getMessageStore()
{
@@ -697,7 +692,7 @@ public class ServerSession extends Sessi
}
}
- public void flushCreditState()
+ public void receivedComplete()
{
final Collection<Subscription_0_10> subscriptions = getSubscriptions();
for (Subscription_0_10 subscription_0_10 : subscriptions)
@@ -706,6 +701,54 @@ public class ServerSession extends Sessi
}
}
+ private static class PostEnqueueAction implements ServerTransaction.Action
+ {
+
+ private List<? extends BaseQueue> _queues;
+ private ServerMessage _message;
+ private final boolean _transactional;
+
+ public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message)
+ {
+ _transactional = true;
+ setState(queues, message);
+ }
+
+ public PostEnqueueAction()
+ {
+ _transactional = false;
+ }
+
+ public void setState(List<? extends BaseQueue> queues, ServerMessage message)
+ {
+ _message = message;
+ _queues = queues;
+ }
+
+ public void postCommit()
+ {
+ MessageReference<?> ref = _message.newReference();
+ for(int i = 0; i < _queues.size(); i++)
+ {
+ try
+ {
+ _queues.get(i).enqueue(_message, _transactional, null);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+ ref.release();
+ }
+
+ public void onRollback()
+ {
+ // NO-OP
+ }
+ }
+
public int getUnacknowledgedMessageCount()
{
return _messageDispositionListenerMap.size();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Wed Dec 28 13:02:41 2011
@@ -23,6 +23,7 @@ package org.apache.qpid.server.transport
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
@@ -55,46 +56,7 @@ import org.apache.qpid.server.store.Stor
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.Acquired;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.ExchangeBind;
-import org.apache.qpid.transport.ExchangeBound;
-import org.apache.qpid.transport.ExchangeBoundResult;
-import org.apache.qpid.transport.ExchangeDeclare;
-import org.apache.qpid.transport.ExchangeDelete;
-import org.apache.qpid.transport.ExchangeQuery;
-import org.apache.qpid.transport.ExchangeQueryResult;
-import org.apache.qpid.transport.ExchangeUnbind;
-import org.apache.qpid.transport.ExecutionErrorCode;
-import org.apache.qpid.transport.ExecutionException;
-import org.apache.qpid.transport.MessageAccept;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquire;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCancel;
-import org.apache.qpid.transport.MessageFlow;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageFlush;
-import org.apache.qpid.transport.MessageReject;
-import org.apache.qpid.transport.MessageRejectCode;
-import org.apache.qpid.transport.MessageRelease;
-import org.apache.qpid.transport.MessageResume;
-import org.apache.qpid.transport.MessageSetFlowMode;
-import org.apache.qpid.transport.MessageStop;
-import org.apache.qpid.transport.MessageSubscribe;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.QueueDeclare;
-import org.apache.qpid.transport.QueueDelete;
-import org.apache.qpid.transport.QueuePurge;
-import org.apache.qpid.transport.QueueQuery;
-import org.apache.qpid.transport.QueueQueryResult;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.TxCommit;
-import org.apache.qpid.transport.TxRollback;
-import org.apache.qpid.transport.TxSelect;
+import org.apache.qpid.transport.*;
public class ServerSessionDelegate extends SessionDelegate
{
@@ -295,7 +257,8 @@ public class ServerSessionDelegate exten
final Exchange exchange = getExchangeForMessage(ssn, xfr);
DeliveryProperties delvProps = null;
- if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+ if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() && !delvProps
+ .hasExpiration())
{
delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
}
@@ -312,7 +275,7 @@ public class ServerSessionDelegate exten
}
final Exchange exchangeInUse;
- ArrayList<? extends BaseQueue> queues = exchange.route(messageMetaData);
+ List<? extends BaseQueue> queues = exchange.route(messageMetaData);
if(queues.isEmpty() && exchange.getAlternateExchange() != null)
{
final Exchange alternateExchange = exchange.getAlternateExchange();
@@ -334,15 +297,16 @@ public class ServerSessionDelegate exten
if(!queues.isEmpty())
{
final MessageStore store = getVirtualHost(ssn).getMessageStore();
- final StoredMessage<MessageMetaData_0_10> storeMessage = createAndFlushStoreMessage(xfr, messageMetaData, store);
+ final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
((ServerSession) ssn).enqueue(message, queues);
+ storeMessage.flushToStore();
}
else
{
if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
- RangeSet rejects = new RangeSet();
+ RangeSet rejects = RangeSetFactory.createRangeSet();
rejects.add(xfr.getId());
MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
ssn.invoke(reject);
@@ -353,11 +317,13 @@ public class ServerSessionDelegate exten
}
}
+
+
ssn.processed(xfr);
}
- private StoredMessage<MessageMetaData_0_10> createAndFlushStoreMessage(final MessageTransfer xfr,
- final MessageMetaData_0_10 messageMetaData, final MessageStore store)
+ private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,
+ final MessageMetaData_0_10 messageMetaData, final MessageStore store)
{
final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
ByteBuffer body = xfr.getBody();
@@ -365,7 +331,6 @@ public class ServerSessionDelegate exten
{
storeMessage.addContent(0, body);
}
- storeMessage.flushToStore();
return storeMessage;
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Wed Dec 28 13:02:41 2011
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.txn;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -30,7 +31,7 @@ import org.apache.qpid.server.message.En
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.MessageStore;
/**
* An implementation of ServerTransaction where each enqueue/dequeue
@@ -43,11 +44,11 @@ public class AutoCommitTransaction imple
{
protected static final Logger _logger = Logger.getLogger(AutoCommitTransaction.class);
- private final TransactionLog _transactionLog;
+ private final MessageStore _messageStore;
- public AutoCommitTransaction(TransactionLog transactionLog)
+ public AutoCommitTransaction(MessageStore transactionLog)
{
- _transactionLog = transactionLog;
+ _messageStore = transactionLog;
}
public long getTransactionStartTime()
@@ -59,14 +60,14 @@ public class AutoCommitTransaction imple
* Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
* by the caller are executed immediately.
*/
- public void addPostTransactionAction(Action immediateAction)
+ public void addPostTransactionAction(final Action immediateAction)
{
immediateAction.postCommit();
}
public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
- TransactionLog.Transaction txn = null;
+ MessageStore.Transaction txn = null;
try
{
if(message.isPersistent() && queue.isDurable())
@@ -76,8 +77,8 @@ public class AutoCommitTransaction imple
_logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
}
- txn = _transactionLog.newTransaction();
- txn.dequeueMessage(queue, message.getMessageNumber());
+ txn = _messageStore.newTransaction();
+ txn.dequeueMessage(queue, message);
txn.commitTran();
txn = null;
}
@@ -98,7 +99,7 @@ public class AutoCommitTransaction imple
public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
{
- TransactionLog.Transaction txn = null;
+ MessageStore.Transaction txn = null;
try
{
for(QueueEntry entry : queueEntries)
@@ -115,10 +116,10 @@ public class AutoCommitTransaction imple
if(txn == null)
{
- txn = _transactionLog.newTransaction();
+ txn = _messageStore.newTransaction();
}
- txn.dequeueMessage(queue, message.getMessageNumber());
+ txn.dequeueMessage(queue, message);
}
}
@@ -145,7 +146,7 @@ public class AutoCommitTransaction imple
public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
- TransactionLog.Transaction txn = null;
+ MessageStore.Transaction txn = null;
try
{
if(message.isPersistent() && queue.isDurable())
@@ -155,8 +156,8 @@ public class AutoCommitTransaction imple
_logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
}
- txn = _transactionLog.newTransaction();
- txn.enqueueMessage(queue, message.getMessageNumber());
+ txn = _messageStore.newTransaction();
+ txn.enqueueMessage(queue, message);
txn.commitTran();
txn = null;
}
@@ -176,15 +177,14 @@ public class AutoCommitTransaction imple
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
{
- TransactionLog.Transaction txn = null;
+ MessageStore.Transaction txn = null;
try
{
if(message.isPersistent())
{
- Long id = message.getMessageNumber();
for(BaseQueue queue : queues)
{
if (queue.isDurable())
@@ -195,22 +195,26 @@ public class AutoCommitTransaction imple
}
if (txn == null)
{
- txn = _transactionLog.newTransaction();
+ txn = _messageStore.newTransaction();
}
- txn.enqueueMessage(queue, id);
+ txn.enqueueMessage(queue, message);
+
+
}
}
- if (txn != null)
- {
- txn.commitTran();
- txn = null;
-
- }
}
+ if (txn != null)
+ {
+ txn.commitTran();
+ txn = null;
+ }
+
postTransactionAction.postCommit();
postTransactionAction = null;
+
+
}
catch (AMQException e)
{
@@ -225,6 +229,11 @@ public class AutoCommitTransaction imple
}
+ public void commit(final Runnable immediatePostTransactionAction)
+ {
+ immediatePostTransactionAction.run();
+ }
+
public void commit()
{
}
@@ -233,7 +242,7 @@ public class AutoCommitTransaction imple
{
}
- private void rollbackIfNecessary(Action postTransactionAction, TransactionLog.Transaction txn)
+ private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn)
{
if (txn != null)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Wed Dec 28 13:02:41 2011
@@ -29,11 +29,7 @@ import org.apache.qpid.server.message.En
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.MessageStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,11 +46,11 @@ public class LocalTransaction implements
private final List<Action> _postTransactionActions = new ArrayList<Action>();
- private volatile TransactionLog.Transaction _transaction;
- private TransactionLog _transactionLog;
+ private volatile MessageStore.Transaction _transaction;
+ private MessageStore _transactionLog;
private long _txnStartTime = 0L;
- public LocalTransaction(TransactionLog transactionLog)
+ public LocalTransaction(MessageStore transactionLog)
{
_transactionLog = transactionLog;
}
@@ -63,7 +59,7 @@ public class LocalTransaction implements
{
return _transaction != null;
}
-
+
public long getTransactionStartTime()
{
return _txnStartTime;
@@ -88,7 +84,7 @@ public class LocalTransaction implements
}
beginTranIfNecessary();
- _transaction.dequeueMessage(queue, message.getMessageNumber());
+ _transaction.dequeueMessage(queue, message);
}
catch(AMQException e)
@@ -118,7 +114,7 @@ public class LocalTransaction implements
}
beginTranIfNecessary();
- _transaction.dequeueMessage(queue, message.getMessageNumber());
+ _transaction.dequeueMessage(queue, message);
}
}
@@ -191,7 +187,7 @@ public class LocalTransaction implements
}
beginTranIfNecessary();
- _transaction.enqueueMessage(queue, message.getMessageNumber());
+ _transaction.enqueueMessage(queue, message);
}
catch (Exception e)
{
@@ -202,13 +198,13 @@ public class LocalTransaction implements
}
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
{
_postTransactionActions.add(postTransactionAction);
if (_txnStartTime == 0L)
{
- _txnStartTime = System.currentTimeMillis();
+ _txnStartTime = currentTime == 0L ? System.currentTimeMillis() : currentTime;
}
if(message.isPersistent())
@@ -226,7 +222,7 @@ public class LocalTransaction implements
beginTranIfNecessary();
- _transaction.enqueueMessage(queue, message.getMessageNumber());
+ _transaction.enqueueMessage(queue, message);
}
}
@@ -242,6 +238,11 @@ public class LocalTransaction implements
public void commit()
{
+ commit(null);
+ }
+
+ public void commit(Runnable immediateAction)
+ {
try
{
if(_transaction != null)
@@ -249,9 +250,14 @@ public class LocalTransaction implements
_transaction.commitTran();
}
- for(Action action : _postTransactionActions)
+ if(immediateAction != null)
+ {
+ immediateAction.run();
+ }
+
+ for(int i = 0; i < _postTransactionActions.size(); i++)
{
- action.postCommit();
+ _postTransactionActions.get(i).postCommit();
}
}
catch (Exception e)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Wed Dec 28 13:02:41 2011
@@ -21,6 +21,7 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
@@ -42,7 +43,7 @@ import java.util.List;
*/
public interface ServerTransaction
{
- /**
+ /**
* Represents an action to be performed on transaction commit or rollback
*/
public static interface Action
@@ -91,7 +92,7 @@ public interface ServerTransaction
*
* Store operations will result only for a persistent messages on durable queues.
*/
- void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction);
+ void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime);
/**
* Commit the transaction represented by this object.
@@ -101,6 +102,8 @@ public interface ServerTransaction
*/
void commit();
+ void commit(Runnable immediatePostTransactionAction);
+
/** Rollback the transaction represented by this object.
*
* If the caller has registered one or more Actions, the onRollback() method on each will
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Wed Dec 28 13:02:41 2011
@@ -39,7 +39,6 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
{
@@ -57,8 +56,6 @@ public interface VirtualHost extends Dur
MessageStore getMessageStore();
- TransactionLog getTransactionLog();
-
DurableConfigurationStore getDurableConfigurationStore();
AuthenticationManager getAuthenticationManager();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Wed Dec 28 13:02:41 2011
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.server.virtualhost;
+import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
@@ -73,7 +73,6 @@ public class VirtualHostConfigRecoveryHa
private List<ProcessAction> _actions;
private MessageStore _store;
- private TransactionLog _transactionLog;
private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
@@ -86,7 +85,7 @@ public class VirtualHostConfigRecoveryHa
_virtualHost = virtualHost;
}
- public QueueRecoveryHandler begin(MessageStore store)
+ public VirtualHostConfigRecoveryHandler begin(MessageStore store)
{
_logSubject = new MessageStoreLogSubject(_virtualHost,store);
_store = store;
@@ -99,14 +98,12 @@ public class VirtualHostConfigRecoveryHa
{
try
{
- AMQShortString queueNameShortString = new AMQShortString(queueName);
-
- AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+ AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName);
if (q == null)
{
- q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, exclusive, _virtualHost,
- arguments);
+ q = AMQQueueFactory.createAMQQueueImpl(queueName, true, owner, false, exclusive, _virtualHost,
+ FieldTable.convertToMap(arguments));
_virtualHost.getQueueRegistry().registerQueue(q);
}
@@ -186,12 +183,6 @@ public class VirtualHostConfigRecoveryHa
//To change body of implemented methods use File | Settings | File Templates.
}
- public TransactionLogRecoveryHandler.QueueEntryRecoveryHandler begin(TransactionLog log)
- {
- _transactionLog = log;
- return this;
- }
-
private static final class ProcessAction
{
private final AMQQueue _queue;
@@ -316,15 +307,15 @@ public class VirtualHostConfigRecoveryHa
else
{
_logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded");
- TransactionLog.Transaction txn = _transactionLog.newTransaction();
- txn.dequeueMessage(queue, messageId);
+ MessageStore.Transaction txn = _store.newTransaction();
+ txn.dequeueMessage(queue, new DummyMessage(messageId));
txn.commitTranAsync();
}
}
else
{
_logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded");
- TransactionLog.Transaction txn = _transactionLog.newTransaction();
+ MessageStore.Transaction txn = _store.newTransaction();
TransactionLogResource mockQueue =
new TransactionLogResource()
{
@@ -334,7 +325,7 @@ public class VirtualHostConfigRecoveryHa
return queueName;
}
};
- txn.dequeueMessage(mockQueue, messageId);
+ txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
txn.commitTranAsync();
}
@@ -367,4 +358,32 @@ public class VirtualHostConfigRecoveryHa
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
}
+ private static class DummyMessage implements EnqueableMessage
+ {
+
+
+ private final long _messageId;
+
+ public DummyMessage(long messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+
+ public StoredMessage getStoredMessage()
+ {
+ return null;
+ }
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Wed Dec 28 13:02:41 2011
@@ -75,7 +75,6 @@ import org.apache.qpid.server.stats.Stat
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
@@ -228,7 +227,10 @@ public class VirtualHostImpl implements
if (store != null)
{
_messageStore = store;
- _durableConfigurationStore = store;
+ if(store instanceof DurableConfigurationStore)
+ {
+ _durableConfigurationStore = (DurableConfigurationStore) store;
+ }
}
else
{
@@ -380,6 +382,8 @@ public class VirtualHostImpl implements
Class clazz = Class.forName(messageStoreClass);
Object o = clazz.newInstance();
+
+
if (!(o instanceof MessageStore))
{
throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
@@ -390,10 +394,18 @@ public class VirtualHostImpl implements
MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore);
- messageStore.configureConfigStore(this.getName(),
- recoveryHandler,
- hostConfig.getStoreConfiguration(),
- storeLogSubject);
+
+ if(messageStore instanceof DurableConfigurationStore)
+ {
+ DurableConfigurationStore durableConfigurationStore = (DurableConfigurationStore) messageStore;
+
+ durableConfigurationStore.configureConfigStore(this.getName(),
+ recoveryHandler,
+ hostConfig.getStoreConfiguration(),
+ storeLogSubject);
+
+ _durableConfigurationStore = durableConfigurationStore;
+ }
messageStore.configureMessageStore(this.getName(),
recoveryHandler,
@@ -405,7 +417,8 @@ public class VirtualHostImpl implements
storeLogSubject);
_messageStore = messageStore;
- _durableConfigurationStore = messageStore;
+
+
}
private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
@@ -553,11 +566,6 @@ public class VirtualHostImpl implements
return _messageStore;
}
- public TransactionLog getTransactionLog()
- {
- return _messageStore;
- }
-
public DurableConfigurationStore getDurableConfigurationStore()
{
return _durableConfigurationStore;
Propchange: qpid/trunk/qpid/java/broker/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org