You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/07/25 16:24:37 UTC
svn commit: r1613440 [1/2] - in /qpid/trunk/qpid/java:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/...
Author: rgodfrey
Date: Fri Jul 25 14:24:36 2014
New Revision: 1613440
URL: http://svn.apache.org/r1613440
Log:
QPID-4304 : [Java Broker] Add an attribute to queues - "messageDurability" - which controls whether message data is persisted or not. By default, depend on the persistence setting of the message, but allow an individual queue to declare that all (or no) messages should be persisted on the queue
Added:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
qpid/trunk/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
qpid/trunk/qpid/java/test-profiles/CPPExcludes
qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Fri Jul 25 14:24:36 2014
@@ -55,7 +55,6 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMemoryMessage;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.Xid;
@@ -123,14 +122,7 @@ public abstract class AbstractBDBMessage
long newMessageId = getNextMessageId();
- if (metaData.isPersistent())
- {
- return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData);
- }
- else
- {
- return new StoredMemoryMessage<T>(newMessageId, metaData);
- }
+ return new StoredBDBMessage<T>(newMessageId, metaData);
}
public long getNextMessageId()
@@ -1049,7 +1041,7 @@ public abstract class AbstractBDBMessage
protected abstract Logger getLogger();
- private class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+ class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
{
private final long _messageId;
@@ -1177,8 +1169,7 @@ public abstract class AbstractBDBMessage
}
}
- @Override
- public synchronized StoreFuture flushToStore()
+ synchronized StoreFuture flushToStore()
{
if(!stored())
{
@@ -1229,6 +1220,7 @@ public abstract class AbstractBDBMessage
{
private Transaction _txn;
private int _storeSizeIncrease;
+ private final List<Runnable> _onCommitActions = new ArrayList<>();
private BDBTransaction() throws StoreException
{
@@ -1250,8 +1242,16 @@ public abstract class AbstractBDBMessage
if(message.getStoredMessage() instanceof StoredBDBMessage)
{
final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
- storedMessage.store(_txn);
- _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ _onCommitActions.add(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ storedMessage.store(_txn);
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ }
+ });
+
}
AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
@@ -1269,16 +1269,25 @@ public abstract class AbstractBDBMessage
public void commitTran() throws StoreException
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
}
+ private void doPreCommitActions()
+ {
+ for(Runnable action : _onCommitActions)
+ {
+ action.run();
+ }
+ _onCommitActions.clear();
+ }
+
@Override
public StoreFuture commitTranAsync() throws StoreException
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
return AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
}
@@ -1287,7 +1296,7 @@ public abstract class AbstractBDBMessage
public void abortTran() throws StoreException
{
checkMessageStoreOpen();
-
+ _onCommitActions.clear();
AbstractBDBMessageStore.this.abortTran(_txn);
}
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java Fri Jul 25 14:24:36 2014
@@ -27,6 +27,7 @@ import com.sleepycat.bind.tuple.TupleBin
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -131,9 +132,9 @@ public class PreparedTransactionBinding
}
@Override
- public boolean isDurable()
+ public MessageDurability getMessageDurability()
{
- return true;
+ return MessageDurability.DEFAULT;
}
}
}
Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Fri Jul 25 14:24:36 2014
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -51,9 +54,6 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.util.FileUtils;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* Subclass of MessageStoreTestCase which runs the standard tests from the superclass against
* the BDB Store as well as additional tests specific to the BDB store-implementation.
@@ -113,7 +113,7 @@ public class BDBMessageStoreTest extends
storedMessage_0_8.addContent(0, firstContentBytes_0_8);
storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8);
- storedMessage_0_8.flushToStore();
+ ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
/*
* Create and insert a 0-10 message (metadata and content)
@@ -132,7 +132,7 @@ public class BDBMessageStoreTest extends
long messageid_0_10 = storedMessage_0_10.getMessageNumber();
storedMessage_0_10.addContent(0, completeContentBody_0_10);
- storedMessage_0_10.flushToStore();
+ ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_10).flushToStore();
/*
* reload the store only (read-only)
@@ -387,7 +387,7 @@ public class BDBMessageStoreTest extends
StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
storedMessage_0_8.addContent(0, chunk1);
- storedMessage_0_8.flushToStore();
+ ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
return storedMessage_0_8;
}
Modified: qpid/trunk/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java (original)
+++ qpid/trunk/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java Fri Jul 25 14:24:36 2014
@@ -24,7 +24,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-@Retention(RetentionPolicy.SOURCE)
+@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
public @interface PluggableService
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java Fri Jul 25 14:24:36 2014
@@ -235,12 +235,6 @@ public class InternalMessage extends Abs
}
@Override
- public StoreFuture flushToStore()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Fri Jul 25 14:24:36 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.model;
import java.util.Collection;
import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.store.MessageDurability;
@ManagedObject( defaultType = "standard" )
public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@@ -35,6 +36,7 @@ public interface Queue<X extends Queue<X
String ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "alertThresholdQueueDepthMessages";
String ALTERNATE_EXCHANGE = "alternateExchange";
String EXCLUSIVE = "exclusive";
+ String MESSAGE_DURABILITY = "messageDurability";
String MESSAGE_GROUP_KEY = "messageGroupKey";
String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups";
String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup";
@@ -130,6 +132,10 @@ public interface Queue<X extends Queue<X
@ManagedAttribute( defaultValue = "${queue.alertRepeatGap}")
long getAlertRepeatGap();
+ @ManagedAttribute( defaultValue = "DEFAULT" )
+ MessageDurability getMessageDurability();
+
+
//children
Collection<? extends Binding> getBindings();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Jul 25 14:24:36 2014
@@ -77,6 +77,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
@@ -175,6 +176,9 @@ public abstract class AbstractQueue<X ex
@ManagedAttributeField
private ExclusivityPolicy _exclusive;
+ @ManagedAttributeField
+ private MessageDurability _messageDurability;
+
private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name
private final Set<NotificationCheck> _notificationChecks =
@@ -245,12 +249,38 @@ public abstract class AbstractQueue<X ex
{
super.onCreate();
+ if(isDurable() && (getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+ || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END))
+ {
+ Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(),
+ new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ setAttribute(AbstractConfiguredObject.DURABLE, true, false);
+ return null;
+ }
+ });
+ }
- if (isDurable() && !(getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
- || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END))
+ if (isDurable())
{
_virtualHost.getDurableConfigurationStore().create(asObjectRecord());
}
+ else if(getMessageDurability() != MessageDurability.NEVER)
+ {
+ Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(),
+ new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ setAttribute(Queue.MESSAGE_DURABILITY, getMessageDurability(), MessageDurability.NEVER);
+ return null;
+ }
+ });
+ }
_recovering.set(false);
}
@@ -510,6 +540,11 @@ public abstract class AbstractQueue<X ex
}
}
+ @Override
+ public final MessageDurability getMessageDurability()
+ {
+ return _messageDurability;
+ }
@Override
public Collection<String> getAvailableAttributes()
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java Fri Jul 25 14:24:36 2014
@@ -52,6 +52,9 @@ public class QueueArgumentsConverter
public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
public static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
+
+ public static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
+
public static final String QPID_TRACE_EXCLUDE = "qpid.trace.exclude";
public static final String QPID_TRACE_ID = "qpid.trace.id";
@@ -91,6 +94,7 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP);
ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL);
+ ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY);
}
@@ -138,7 +142,12 @@ public class QueueArgumentsConverter
{
if(modelArguments.containsKey(entry.getValue()))
{
- wireArguments.put(entry.getKey(), modelArguments.get(entry.getValue()));
+ Object value = modelArguments.get(entry.getValue());
+ if(value instanceof Enum)
+ {
+ value = ((Enum) value).name();
+ }
+ wireArguments.put(entry.getKey(), value);
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Fri Jul 25 14:24:36 2014
@@ -447,14 +447,8 @@ public abstract class AbstractJDBCMessag
{
checkMessageStoreOpen();
- if(metaData.isPersistent())
- {
- return new StoredJDBCMessage(getNextMessageId(), metaData);
- }
- else
- {
- return new StoredMemoryMessage(getNextMessageId(), metaData);
- }
+ return new StoredJDBCMessage(getNextMessageId(), metaData);
+
}
@Override
@@ -970,9 +964,9 @@ public abstract class AbstractJDBCMessag
}
@Override
- public boolean isDurable()
+ public MessageDurability getMessageDurability()
{
- return true;
+ return MessageDurability.DEFAULT;
}
}
@@ -1122,7 +1116,7 @@ public abstract class AbstractJDBCMessag
{
private final ConnectionWrapper _connWrapper;
private int _storeSizeIncrease;
-
+ private final List<Runnable> _onCommitActions = new ArrayList<>();
protected JDBCTransaction()
{
@@ -1144,16 +1138,23 @@ public abstract class AbstractJDBCMessag
final StoredMessage storedMessage = message.getStoredMessage();
if(storedMessage instanceof StoredJDBCMessage)
{
- try
- {
- ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
- }
- catch (SQLException e)
+ _onCommitActions.add(new Runnable()
{
- throw new StoreException("Exception on enqueuing message into message store" + _messageId, e);
- }
+ @Override
+ public void run()
+ {
+ try
+ {
+ ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Exception on enqueuing message into message store" + _messageId, e);
+ }
+ }
+ });
}
- _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
}
@@ -1170,7 +1171,7 @@ public abstract class AbstractJDBCMessag
public void commitTran()
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
AbstractJDBCMessageStore.this.commitTran(_connWrapper);
storedSizeChange(_storeSizeIncrease);
}
@@ -1179,17 +1180,26 @@ public abstract class AbstractJDBCMessag
public StoreFuture commitTranAsync()
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
storedSizeChange(_storeSizeIncrease);
return storeFuture;
}
+ private void doPreCommitActions()
+ {
+ for(Runnable action : _onCommitActions)
+ {
+ action.run();
+ }
+ _onCommitActions.clear();
+ }
+
@Override
public void abortTran()
{
checkMessageStoreOpen();
-
+ _onCommitActions.clear();
AbstractJDBCMessageStore.this.abortTran(_connWrapper);
}
@@ -1215,7 +1225,6 @@ public abstract class AbstractJDBCMessag
private final long _messageId;
private final boolean _isRecovered;
-
private StorableMessageMetaData _metaData;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
private byte[] _data;
@@ -1320,39 +1329,6 @@ public abstract class AbstractJDBCMessag
}
@Override
- public synchronized StoreFuture flushToStore()
- {
- checkMessageStoreOpen();
-
- Connection conn = null;
- try
- {
- if(!stored())
- {
- conn = newConnection();
-
- store(conn);
-
- conn.commit();
- storedSizeChange(getMetaData().getContentSize());
- }
- }
- catch (SQLException e)
- {
- if(getLogger().isDebugEnabled())
- {
- getLogger().debug("Error when trying to flush message " + _messageId + " to store: " + e);
- }
- throw new StoreException(e);
- }
- finally
- {
- JdbcUtils.closeConnection(conn, AbstractJDBCMessageStore.this.getLogger());
- }
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
checkMessageStoreOpen();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Fri Jul 25 14:24:36 2014
@@ -64,6 +64,12 @@ public class MemoryMessageStore implemen
@Override
public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
+
+ if(message.getStoredMessage() instanceof StoredMemoryMessage)
+ {
+ _messages.putIfAbsent(message.getMessageNumber(), (StoredMemoryMessage) message.getStoredMessage());
+ }
+
Set<Long> messageIds = _localEnqueueMap.get(queue.getId());
if (messageIds == null)
{
@@ -196,31 +202,20 @@ public class MemoryMessageStore implemen
{
long id = getNextMessageId();
- if(metaData.isPersistent())
+ StoredMemoryMessage<T> storedMemoryMessage = new StoredMemoryMessage<T>(id, metaData)
{
- return new StoredMemoryMessage<T>(id, metaData)
+
+ @Override
+ public void remove()
{
+ _messages.remove(getMessageNumber());
+ super.remove();
+ }
- @Override
- public StoreFuture flushToStore()
- {
- _messages.putIfAbsent(getMessageNumber(), this) ;
- return super.flushToStore();
- }
+ };
- @Override
- public void remove()
- {
- _messages.remove(getMessageNumber());
- super.remove();
- }
+ return storedMemoryMessage;
- };
- }
- else
- {
- return new StoredMemoryMessage<T>(id, metaData);
- }
}
@Override
Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java?rev=1613440&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java Fri Jul 25 14:24:36 2014
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+public enum MessageDurability
+{
+ DEFAULT(false,true),
+ ALWAYS(true,true),
+ NEVER(false,false);
+
+ private final boolean _nonPersistent;
+ private final boolean _persistent;
+
+ MessageDurability(final boolean nonPersistent, final boolean persistent)
+ {
+ _nonPersistent = nonPersistent;
+ _persistent = persistent;
+ }
+
+ public boolean persist(final boolean persistent)
+ {
+ return persistent ? _persistent : _nonPersistent;
+ }
+}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Fri Jul 25 14:24:36 2014
@@ -122,12 +122,6 @@ public class StoredMemoryMessage<T exten
return buf;
}
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
-
public T getMetaData()
{
return _metaData;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java Fri Jul 25 14:24:36 2014
@@ -34,7 +34,5 @@ public interface StoredMessage<M extends
ByteBuffer getContent(int offsetInMessage, int size);
- StoreFuture flushToStore();
-
void remove();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java Fri Jul 25 14:24:36 2014
@@ -24,7 +24,9 @@ import java.util.UUID;
public interface TransactionLogResource
{
+
String getName();
public UUID getId();
- boolean isDurable();
+ //boolean isDurable();
+ MessageDurability getMessageDurability();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Fri Jul 25 14:24:36 2014
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.txn;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
@@ -31,9 +34,6 @@ import org.apache.qpid.server.store.Stor
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
-import java.util.Collection;
-import java.util.List;
-
/**
* An implementation of ServerTransaction where each enqueue/dequeue
* operation takes place within it own transaction.
@@ -93,7 +93,7 @@ public class AsyncAutoCommitTransaction
try
{
StoreFuture future;
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -162,7 +162,7 @@ public class AsyncAutoCommitTransaction
ServerMessage message = entry.getMessage();
TransactionLogResource queue = entry.getOwningResource();
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -205,7 +205,7 @@ public class AsyncAutoCommitTransaction
try
{
StoreFuture future;
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -237,28 +237,24 @@ public class AsyncAutoCommitTransaction
try
{
- if(message.isPersistent())
+ for(BaseQueue queue : queues)
{
- for(BaseQueue queue : queues)
+ if (queue.getMessageDurability().persist(message.isPersistent()))
{
- if (queue.isDurable())
+ if (_logger.isDebugEnabled())
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
- }
- if (txn == null)
- {
- txn = _messageStore.newTransaction();
- }
-
- txn.enqueueMessage(queue, message);
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
+ }
+ if (txn == null)
+ {
+ txn = _messageStore.newTransaction();
+ }
+ txn.enqueueMessage(queue, message);
- }
}
-
}
+
StoreFuture future;
if (txn != null)
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Fri Jul 25 14:24:36 2014
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.txn;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
@@ -30,9 +33,6 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
-import java.util.Collection;
-import java.util.List;
-
/**
* An implementation of ServerTransaction where each enqueue/dequeue
* operation takes place within it own transaction.
@@ -77,7 +77,7 @@ public class AutoCommitTransaction imple
Transaction txn = null;
try
{
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -109,7 +109,7 @@ public class AutoCommitTransaction imple
ServerMessage message = entry.getMessage();
TransactionLogResource queue = entry.getOwningResource();
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -146,7 +146,7 @@ public class AutoCommitTransaction imple
Transaction txn = null;
try
{
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -175,25 +175,21 @@ public class AutoCommitTransaction imple
try
{
- if(message.isPersistent())
+ for(BaseQueue queue : queues)
{
- for(BaseQueue queue : queues)
+ if (queue.getMessageDurability().persist(message.isPersistent()))
{
- if (queue.isDurable())
+ if (_logger.isDebugEnabled())
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
- }
- if (txn == null)
- {
- txn = _messageStore.newTransaction();
- }
-
- txn.enqueueMessage(queue, message);
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
+ }
+ if (txn == null)
+ {
+ txn = _messageStore.newTransaction();
+ }
+ txn.enqueueMessage(queue, message);
- }
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java Fri Jul 25 14:24:36 2014
@@ -381,7 +381,7 @@ public class DtxBranch
public boolean isDurable()
{
- return _message.isPersistent() && _resource.isDurable();
+ return _resource.getMessageDurability().persist(_message.isPersistent());
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Fri Jul 25 14:24:36 2014
@@ -20,21 +20,21 @@
*/
package org.apache.qpid.server.txn;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.TransactionLogResource;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.Transaction;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import org.apache.qpid.server.store.TransactionLogResource;
/**
* A concrete implementation of ServerTransaction where enqueue/dequeue
@@ -97,7 +97,7 @@ public class LocalTransaction implements
_postTransactionActions.add(postTransactionAction);
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
try
{
@@ -129,7 +129,7 @@ public class LocalTransaction implements
ServerMessage message = entry.getMessage();
TransactionLogResource queue = entry.getOwningResource();
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
{
@@ -186,7 +186,7 @@ public class LocalTransaction implements
_postTransactionActions.add(postTransactionAction);
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
- if(message.isPersistent() && queue.isDurable())
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
try
{
@@ -211,29 +211,26 @@ public class LocalTransaction implements
_postTransactionActions.add(postTransactionAction);
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
- if(message.isPersistent())
+ try
{
- try
+ for(BaseQueue queue : queues)
{
- for(BaseQueue queue : queues)
+ if(queue.getMessageDurability().persist(message.isPersistent()))
{
- if(queue.isDurable())
+ if (_logger.isDebugEnabled())
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() );
- }
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() );
+ }
- beginTranIfNecessary();
- _transaction.enqueueMessage(queue, message);
+ beginTranIfNecessary();
+ _transaction.enqueueMessage(queue, message);
- }
}
}
- catch(RuntimeException e)
- {
- tidyUpOnError(e);
- }
+ }
+ catch(RuntimeException e)
+ {
+ tidyUpOnError(e);
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java Fri Jul 25 14:24:36 2014
@@ -38,6 +38,7 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
@@ -220,9 +221,9 @@ public class SynchronousMessageStoreReco
}
@Override
- public boolean isDurable()
+ public MessageDurability getMessageDurability()
{
- return false;
+ return MessageDurability.DEFAULT;
}
};
txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java Fri Jul 25 14:24:36 2014
@@ -34,8 +34,6 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.model.ConfiguredObjectFactory;
-import org.apache.qpid.server.model.VirtualHostNode;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
@@ -49,6 +47,7 @@ import org.apache.qpid.server.logging.Ev
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
@@ -56,6 +55,7 @@ import org.apache.qpid.server.model.Life
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java Fri Jul 25 14:24:36 2014
@@ -20,16 +20,14 @@
*/
package org.apache.qpid.server.store;
-import static org.mockito.Mockito.mock;
-
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.VirtualHost;
@@ -148,12 +146,6 @@ public abstract class MessageStoreQuotaE
return _transactionResource;
}
- @Override
- public boolean isDurable()
- {
- return true;
- }
-
private static class TestMessage implements EnqueueableMessage
{
private final StoredMessage<?> _handle;
@@ -180,4 +172,10 @@ public abstract class MessageStoreQuotaE
return _handle;
}
}
+
+ @Override
+ public MessageDurability getMessageDurability()
+ {
+ return MessageDurability.DEFAULT;
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java Fri Jul 25 14:24:36 2014
@@ -28,13 +28,14 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import org.hamcrest.Description;
+import org.mockito.ArgumentMatcher;
+
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -45,9 +46,6 @@ import org.apache.qpid.server.store.hand
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.hamcrest.Description;
-import org.mockito.ArgumentMatcher;
-
public abstract class MessageStoreTestCase extends QpidTestCase
{
private MessageStore _store;
@@ -117,8 +115,7 @@ public abstract class MessageStoreTestCa
long messageId = 1;
int contentSize = 0;
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
+ enqueueMessage(message, "dummyQ");
MessageHandler handler = mock(MessageHandler.class);
_store.visitMessages(handler);
@@ -127,14 +124,60 @@ public abstract class MessageStoreTestCa
}
+ public void enqueueMessage(final StoredMessage<TestMessageMetaData> message, final String queueName)
+ {
+ Transaction txn = _store.newTransaction();
+ txn.enqueueMessage(new TransactionLogResource()
+ {
+ private final UUID _id = UUID.nameUUIDFromBytes(queueName.getBytes());
+
+ @Override
+ public String getName()
+ {
+ return queueName;
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ @Override
+ public MessageDurability getMessageDurability()
+ {
+ return MessageDurability.DEFAULT;
+ }
+ }, new EnqueueableMessage()
+ {
+ @Override
+ public long getMessageNumber()
+ {
+ return message.getMessageNumber();
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ @Override
+ public StoredMessage getStoredMessage()
+ {
+ return message;
+ }
+ });
+ txn.commitTran();
+ }
+
public void testVisitMessagesAborted() throws Exception
{
int contentSize = 0;
for (int i = 0; i < 3; i++)
{
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
+ enqueueMessage(message, "dummyQ");
}
MessageHandler handler = mock(MessageHandler.class);
@@ -151,16 +194,16 @@ public abstract class MessageStoreTestCa
for (int i = 0; i < 3; i++)
{
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
+ enqueueMessage(message, "dummyQ");
+
}
reopenStore();
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
+ enqueueMessage(message, "dummyQ");
+
assertTrue("Unexpected message id " + message.getMessageNumber(), message.getMessageNumber() >= 4);
}
@@ -170,8 +213,6 @@ public abstract class MessageStoreTestCa
long messageId = 1;
int contentSize = 0;
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message);
@@ -305,8 +346,6 @@ public abstract class MessageStoreTestCa
long messageId = 1;
int contentSize = 0;
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
MessageHandler handler = mock(MessageHandler.class);
_store.visitMessages(handler);
@@ -319,8 +358,7 @@ public abstract class MessageStoreTestCa
long messageId = 1;
int contentSize = 0;
final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
- StoreFuture flushFuture = message.flushToStore();
- flushFuture.waitForCompletion();
+ enqueueMessage(message, "dummyQ");
final AtomicReference<StoredMessage<?>> retrievedMessageRef = new AtomicReference<StoredMessage<?>>();
_store.visitMessages(new MessageHandler()
@@ -360,7 +398,7 @@ public abstract class MessageStoreTestCa
TransactionLogResource queue = mock(TransactionLogResource.class);
when(queue.getId()).thenReturn(queueId);
when(queue.getName()).thenReturn("testQueue");
- when(queue.isDurable()).thenReturn(true);
+ when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
return queue;
}
@@ -391,8 +429,6 @@ public abstract class MessageStoreTestCa
private EnqueueableMessage createEnqueueableMessage(long messageId1)
{
final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0));
- StoreFuture flushFuture = message1.flushToStore();
- flushFuture.waitForCompletion();
EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1);
return enqueueableMessage1;
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java Fri Jul 25 14:24:36 2014
@@ -25,6 +25,7 @@ import java.util.Collections;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.Transaction;
@@ -53,6 +54,7 @@ public class AsyncAutoCommitTransactionT
when(_messageStore.newTransaction()).thenReturn(_storeTransaction);
when(_storeTransaction.commitTranAsync()).thenReturn(_future);
when(_queue.isDurable()).thenReturn(true);
+ when(_queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
}
public void testEnqueuePersistentMessagePostCommitNotCalledWhenFutureAlreadyComplete() throws Exception
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Fri Jul 25 14:24:36 2014
@@ -20,22 +20,23 @@
*/
package org.apache.qpid.server.txn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.MockMessageInstance;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
import org.apache.qpid.test.utils.QpidTestCase;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* A unit test ensuring that AutoCommitTransaction creates a separate transaction for
* each dequeue/enqueue operation that involves enlistable messages. Verifies
@@ -428,6 +429,7 @@ public class AutoCommitTransactionTest e
{
BaseQueue queue = mock(BaseQueue.class);
when(queue.isDurable()).thenReturn(durable);
+ when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER);
return queue;
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java Fri Jul 25 14:24:36 2014
@@ -20,22 +20,23 @@
*/
package org.apache.qpid.server.txn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.MockMessageInstance;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
import org.apache.qpid.test.utils.QpidTestCase;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* A unit test ensuring that LocalTransactionTest creates a long-lived store transaction
* that spans many dequeue/enqueue operations of enlistable messages. Verifies
@@ -652,6 +653,7 @@ public class LocalTransactionTest extend
{
BaseQueue queue = mock(BaseQueue.class);
when(queue.isDurable()).thenReturn(durable);
+ when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER);
return queue;
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java Fri Jul 25 14:24:36 2014
@@ -42,6 +42,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.NullMessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -379,6 +380,7 @@ public class SynchronousMessageStoreReco
{
AMQQueue<?> queue = mock(AMQQueue.class);
final UUID queueId = UUID.randomUUID();
+ when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
when(queue.getId()).thenReturn(queueId);
when(queue.getName()).thenReturn("test-queue");
when(_virtualHost.getQueue(queueId)).thenReturn(queue);
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Fri Jul 25 14:24:36 2014
@@ -102,12 +102,6 @@ public class MessageConverter_Internal_t
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Fri Jul 25 14:24:36 2014
@@ -102,12 +102,6 @@ public class MessageConverter_v0_10 impl
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Fri Jul 25 14:24:36 2014
@@ -334,11 +334,7 @@ public class ServerSessionDelegate exten
int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
- if(enqueues != 0)
- {
- storeMessage.flushToStore();
- }
- else
+ if(enqueues == 0)
{
if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Jul 25 14:24:36 2014
@@ -34,8 +34,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -93,7 +91,6 @@ import org.apache.qpid.server.protocol.C
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
@@ -152,9 +149,6 @@ public class AMQChannel<T extends AMQPro
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
- // Set of messages being acknowledged in the current transaction
- private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>();
-
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private ServerTransaction _transaction;
@@ -422,7 +416,6 @@ public class AMQChannel<T extends AMQPro
else
{
incrementOutstandingTxnsIfNecessary();
- handle.flushToStore();
}
}
}
@@ -1412,7 +1405,7 @@ public class AMQChannel<T extends AMQPro
}
finally
{
- _acknowledgedMessages.clear();
+ _ackedMessages.clear();
}
}
@@ -1435,7 +1428,7 @@ public class AMQChannel<T extends AMQPro
}
finally
{
- _acknowledgedMessages.clear();
+ _ackedMessages.clear();
}
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Fri Jul 25 14:24:36 2014
@@ -114,12 +114,6 @@ public class MessageConverter_Internal_t
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java Fri Jul 25 14:24:36 2014
@@ -104,11 +104,6 @@ public class MockStoredMessage implement
return buf;
}
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
public void remove()
{
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java Fri Jul 25 14:24:36 2014
@@ -20,15 +20,21 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.UUID;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.store.MessageCounter;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.test.utils.QpidTestCase;
/**
@@ -85,8 +91,9 @@ public class ReferenceCountingTest exten
final MessageMetaData mmd = new MessageMetaData(info, chb);
StoredMessage storedMessage = _store.addMessage(mmd);
- storedMessage.flushToStore();
-
+ Transaction txn = _store.newTransaction();
+ txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage));
+ txn.commitTran();
AMQMessage message = new AMQMessage(storedMessage);
MessageReference ref = message.newReference();
@@ -151,14 +158,13 @@ public class ReferenceCountingTest exten
final MessageMetaData mmd = new MessageMetaData(info, chb);
StoredMessage storedMessage = _store.addMessage(mmd);
- storedMessage.flushToStore();
-
+ Transaction txn = _store.newTransaction();
+ txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage));
+ txn.commitTran();
AMQMessage message = new AMQMessage(storedMessage);
MessageReference ref = message.newReference();
- // we call routing complete to set up the handle
- // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
assertEquals(1, getStoreMessageCount());
MessageReference ref2 = message.newReference();
@@ -166,6 +172,54 @@ public class ReferenceCountingTest exten
assertEquals(1, getStoreMessageCount());
}
+ private TransactionLogResource createTransactionLogResource(final String queueName)
+ {
+ return new TransactionLogResource()
+ {
+ @Override
+ public String getName()
+ {
+ return queueName;
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return UUID.nameUUIDFromBytes(queueName.getBytes());
+ }
+
+ @Override
+ public MessageDurability getMessageDurability()
+ {
+ return MessageDurability.DEFAULT;
+ }
+ };
+ }
+
+ private EnqueueableMessage createEnqueueableMessage(final StoredMessage storedMessage)
+ {
+ return new EnqueueableMessage()
+ {
+ @Override
+ public long getMessageNumber()
+ {
+ return storedMessage.getMessageNumber();
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ @Override
+ public StoredMessage getStoredMessage()
+ {
+ return storedMessage;
+ }
+ };
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(ReferenceCountingTest.class);
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Fri Jul 25 14:24:36 2014
@@ -261,12 +261,6 @@ public abstract class MessageConverter_t
}
@Override
- public StoreFuture flushToStore()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Fri Jul 25 14:24:36 2014
@@ -159,8 +159,6 @@ public class ReceivingLink_1_0 implement
offset += bareMessageBuf.remaining();
}
- storedMessage.flushToStore();
-
Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());
MessageReference<Message_1_0> reference = message.newReference();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java Fri Jul 25 14:24:36 2014
@@ -111,12 +111,6 @@ public class MessageConverter_1_0_to_v0_
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Fri Jul 25 14:24:36 2014
@@ -210,12 +210,6 @@ public class MessageConverter_0_10_to_0_
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Fri Jul 25 14:24:36 2014
@@ -99,12 +99,6 @@ public class MessageConverter_0_8_to_0_1
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java Fri Jul 25 14:24:36 2014
@@ -115,12 +115,6 @@ public class MessageConverter_1_0_to_v0_
}
@Override
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
public void remove()
{
throw new UnsupportedOperationException();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org