You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/04/04 12:21:38 UTC
svn commit: r1584600 [1/4] - in /qpid/trunk/qpid/java:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/...
Author: kwall
Date: Fri Apr 4 10:21:37 2014
New Revision: 1584600
URL: http://svn.apache.org/r1584600
Log:
QPID-5653: Replace DurableConfigurationStore/MessageStore recoverers with visitors.
* MS/DCS impls now have stateless visitXXX methods to retrieve message/configuration data (replaces the recoverXXXX methods)
* VH implementations now uses Handlers to perform the recovery operation.
* DCS's handler (ConfiguredObjectRecordRecoveverAndUpgrader) currently implemented in terms of the old DefaultUpgradeProvider/DurableConfigurationRecoverer.
This will be refactored by a future commit.
Added:
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
- copied, changed from r1584379, qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Xid.java
- copied, changed from r1584379, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java
- copied, changed from r1584379, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java
- copied, changed from r1584379, qpid/trunk/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageHandler.java
- copied, changed from r1584379, qpid/trunk/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java
- copied, changed from r1584379, qpid/trunk/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java
qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/
qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/java/
qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/java/org/
qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/
qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/
qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/
qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/
qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
- copied, changed from r1584379, qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
- copied, changed from r1584379, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
Removed:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java
qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
qpid/trunk/qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
qpid/trunk/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
qpid/trunk/qpid/java/broker-plugins/memory-store/pom.xml
qpid/trunk/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
qpid/trunk/qpid/java/test-profiles/CPPExcludes
qpid/trunk/qpid/java/test-profiles/JavaBDBExcludes
qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java?rev=1584600&r1=1584599&r2=1584600&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java Fri Apr 4 10:21:37 2014
@@ -29,15 +29,15 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.DurableConfigurationRecoverer;
+import org.apache.qpid.server.store.ConfiguredObjectRecordRecoveverAndUpgrader;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
-import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider;
+import org.apache.qpid.server.virtualhost.MessageStoreRecoverer;
import org.apache.qpid.server.virtualhost.State;
-import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import com.sleepycat.je.rep.StateChangeEvent;
@@ -98,17 +98,12 @@ public class BDBHAVirtualHost extends Ab
{
_messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
- DefaultUpgraderProvider upgraderProvider = new DefaultUpgraderProvider(this);
-
- DurableConfigurationRecoverer configRecoverer =
- new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
- upgraderProvider, getEventLogger());
- _messageStore.recoverConfigurationStore(configRecoverer);
+ ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
+ _messageStore.visitConfiguredObjectRecords(upgraderRecoverer);
initialiseModel();
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this, getMessageStoreLogSubject());
- _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
+ new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
attainActivation();
}
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1584600&r1=1584599&r2=1584600&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java Fri Apr 4 10:21:37 2014
@@ -27,8 +27,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
@@ -38,29 +36,23 @@ import java.util.concurrent.atomic.Atomi
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.EventManager;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMemoryMessage;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.Xid;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding;
@@ -70,6 +62,10 @@ import org.apache.qpid.server.store.berk
import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.util.FileUtils;
@@ -129,7 +125,6 @@ public class BDBMessageStore implements
private long _persistentSizeHighThreshold;
private final EventManager _eventManager = new EventManager();
- private final String _type;
private final EnvironmentFacadeFactory _environmentFacadeFactory;
@@ -143,7 +138,6 @@ public class BDBMessageStore implements
public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory)
{
- _type = environmentFacadeFactory.getType();
_environmentFacadeFactory = environmentFacadeFactory;
}
@@ -160,18 +154,19 @@ public class BDBMessageStore implements
{
if (_environmentFacade == null)
{
- String[] databaseNames = null;
+ EnvironmentFacadeTask[] initialisationTasks = null;
if (MapValueConverter.getBooleanAttribute(IS_MESSAGE_STORE_TOO, storeSettings, false))
{
- databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
+ String[] databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length);
System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length);
+ initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(databaseNames), new DiskSpaceTask(), new MaxMessageIdTask() };
}
else
{
- databaseNames = CONFIGURATION_STORE_DATABASE_NAMES;
+ initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(CONFIGURATION_STORE_DATABASE_NAMES)};
}
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, new UpgradeTask(parent), new OpenDatabasesTask(databaseNames));
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, initialisationTasks);
}
else
{
@@ -181,12 +176,88 @@ public class BDBMessageStore implements
}
@Override
- public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
{
checkConfigurationStoreOpen();
+ try
+ {
+ int configVersion = getConfigVersion();
+
+ handler.begin(configVersion);
+ doVisitAllConfiguredObjectRecords(handler);
+
+ int newConfigVersion = handler.end();
+ if(newConfigVersion != configVersion)
+ {
+ updateConfigVersion(newConfigVersion);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e);
+ }
+
+ }
+
+ private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
+ {
+ Map<UUID, BDBConfiguredObjectRecord> configuredObjects = new HashMap<UUID, BDBConfiguredObjectRecord>();
+ Cursor objectsCursor = null;
+ Cursor hierarchyCursor = null;
+ try
+ {
+ objectsCursor = getConfiguredObjectsDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+
+
+ while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
+
+ BDBConfiguredObjectRecord configuredObject =
+ (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
+ configuredObjects.put(configuredObject.getId(), configuredObject);
+ }
+
+ // set parents
+ hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null);
+ while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
+ UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
+ BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
+ if(child != null)
+ {
+ ConfiguredObjectRecord parent = configuredObjects.get(parentId);
+ if(parent != null)
+ {
+ child.addParent(hk.getParentType(), parent);
+ }
+ else if(hk.getParentType().equals("Exchange"))
+ {
+ // TODO - remove this hack for the pre-defined exchanges
+ child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
+ }
+ }
+ }
+ }
+ finally
+ {
+ closeCursorSafely(objectsCursor);
+ closeCursorSafely(hierarchyCursor);
+ }
+
+ for (ConfiguredObjectRecord record : configuredObjects.values())
+ {
+ boolean shoudlContinue = handler.handle(record);
+ if (!shoudlContinue)
+ {
+ break;
+ }
+ }
- recoverConfig(recoveryHandler);
}
@Override
@@ -210,7 +281,8 @@ public class BDBMessageStore implements
if (_environmentFacade == null)
{
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask());
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings,
+ new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask(), new MaxMessageIdTask());
}
_committer = _environmentFacade.createCommitter(parent.getName());
@@ -219,21 +291,6 @@ public class BDBMessageStore implements
}
@Override
- public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException
- {
- checkMessageStoreOpen();
-
- if(messageRecoveryHandler != null)
- {
- recoverMessages(messageRecoveryHandler);
- }
- if(transactionLogRecoveryHandler != null)
- {
- recoverQueueEntries(transactionLogRecoveryHandler);
- }
- }
-
- @Override
public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException
{
checkMessageStoreOpen();
@@ -315,27 +372,6 @@ public class BDBMessageStore implements
}
}
- private void recoverConfig(ConfigurationRecoveryHandler recoveryHandler) throws StoreException
- {
- try
- {
- final int configVersion = getConfigVersion();
- recoveryHandler.beginConfigurationRecovery(this, configVersion);
- loadConfiguredObjects(recoveryHandler);
-
- final int newConfigVersion = recoveryHandler.completeConfigurationRecovery();
- if(newConfigVersion != configVersion)
- {
- updateConfigVersion(newConfigVersion);
- }
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error recovering persistent state: " + e.getMessage(), e);
- }
-
- }
-
@SuppressWarnings("resource")
private void updateConfigVersion(int newConfigVersion) throws StoreException
{
@@ -400,62 +436,6 @@ public class BDBMessageStore implements
}
}
- private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException, StoreException
- {
- Cursor objectsCursor = null;
- Cursor hierarchyCursor = null;
- try
- {
- objectsCursor = getConfiguredObjectsDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
- Map<UUID, BDBConfiguredObjectRecord> configuredObjects =
- new HashMap<UUID, BDBConfiguredObjectRecord>();
-
- while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
-
- BDBConfiguredObjectRecord configuredObject =
- (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
- configuredObjects.put(configuredObject.getId(), configuredObject);
- }
-
- // set parents
- hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null);
- while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
- UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
- BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
- if(child != null)
- {
- ConfiguredObjectRecord parent = configuredObjects.get(parentId);
- if(parent != null)
- {
- child.addParent(hk.getParentType(), parent);
- }
- else if(hk.getParentType().equals("Exchange"))
- {
- // TODO - remove this hack for the pre-defined exchanges
- child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
- }
- }
- }
-
- for (ConfiguredObjectRecord record : configuredObjects.values())
- {
- crh.configuredObject(record);
- }
- }
- finally
- {
- closeCursorSafely(objectsCursor);
- closeCursorSafely(hierarchyCursor);
- }
- }
-
private void closeCursorSafely(Cursor cursor) throws StoreException
{
if (cursor != null)
@@ -471,124 +451,6 @@ public class BDBMessageStore implements
}
}
- private void recoverMessages(MessageStoreRecoveryHandler msrh) throws StoreException
- {
- StoredMessageRecoveryHandler mrh = msrh.begin();
-
- Cursor cursor = null;
- try
- {
- cursor = getMessageMetaDataDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
-
- long maxId = 0;
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- long messageId = LongBinding.entryToLong(key);
- StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-
- StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
-
- mrh.message(message);
-
- maxId = Math.max(maxId, messageId);
- }
-
- _messageId.set(maxId);
- mrh.completeMessageRecovery();
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot recover messages", e);
- }
- finally
- {
- closeCursorSafely(cursor);
- }
- }
-
- private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
- throws StoreException
- {
- QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
-
- ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
-
- Cursor cursor = null;
- try
- {
- cursor = getDeliveryDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
-
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- QueueEntryKey qek = keyBinding.entryToObject(key);
-
- entries.add(qek);
- }
-
- try
- {
- cursor.close();
- }
- finally
- {
- cursor = null;
- }
-
- for(QueueEntryKey entry : entries)
- {
- UUID queueId = entry.getQueueId();
- long messageId = entry.getMessageId();
- qerh.queueEntry(queueId, messageId);
- }
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot recover queue entries", e);
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery();
-
- cursor = null;
- try
- {
- cursor = getXidDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- XidBinding keyBinding = XidBinding.getInstance();
- PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- Xid xid = keyBinding.entryToObject(key);
- PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
- dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
- preparedTransaction.getEnqueues(),preparedTransaction.getDequeues());
- }
-
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot recover transactions", e);
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
-
- dtxrh.completeDtxRecordRecovery();
- }
void removeMessage(long messageId, boolean sync) throws StoreException
{
@@ -739,6 +601,12 @@ public class BDBMessageStore implements
public void create(ConfiguredObjectRecord configuredObject) throws StoreException
{
checkConfigurationStoreOpen();
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Create " + configuredObject);
+ }
+
com.sleepycat.je.Transaction txn = null;
try
{
@@ -832,7 +700,7 @@ public class BDBMessageStore implements
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Updating " + record.getType() + ", id: " + record.getId());
+ LOGGER.debug("Updating, creating " + createIfNecessary + " : " + record);
}
DatabaseEntry key = new DatabaseEntry();
@@ -890,8 +758,7 @@ public class BDBMessageStore implements
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Enqueuing message " + messageId + " on queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
- + " in transaction " + tx);
+ + queue.getName() + " with id " + queue.getId() + " in transaction " + tx);
}
getDeliveryDb().put(tx, key, value);
}
@@ -899,8 +766,7 @@ public class BDBMessageStore implements
{
LOGGER.error("Failed to enqueue: " + e.getMessage(), e);
throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
- + " to database", e);
+ + queue.getName() + " with id " + queue.getId() + " to database", e);
}
}
@@ -925,7 +791,7 @@ public class BDBMessageStore implements
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Dequeue message id " + messageId + " from queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+ + queue.getName() + " with id " + id);
}
try
@@ -935,19 +801,18 @@ public class BDBMessageStore implements
if (status == OperationStatus.NOTFOUND)
{
throw new StoreException("Unable to find message with id " + messageId + " on queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+ + queue.getName() + " with id " + id);
}
else if (status != OperationStatus.SUCCESS)
{
throw new StoreException("Unable to remove message with id " + messageId + " on queue"
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+ + queue.getName() + " with id " + id);
}
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Removed message " + messageId + " on queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id
- + " from delivery db");
+ + queue.getName() + " with id " + id);
}
}
@@ -1073,57 +938,6 @@ public class BDBMessageStore implements
}
/**
- * Primarily for testing purposes.
- *
- * @param queueId
- *
- * @return a list of message ids for messages enqueued for a particular queue
- */
- List<Long> getEnqueuedMessages(UUID queueId) throws StoreException
- {
- Cursor cursor = null;
- try
- {
- cursor = getDeliveryDb().openCursor(null, null);
-
- DatabaseEntry key = new DatabaseEntry();
-
- QueueEntryKey dd = new QueueEntryKey(queueId, 0);
-
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- keyBinding.objectToEntry(dd, key);
-
- DatabaseEntry value = new DatabaseEntry();
-
- LinkedList<Long> messageIds = new LinkedList<Long>();
-
- OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
- dd = keyBinding.entryToObject(key);
-
- while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId))
- {
-
- messageIds.add(dd.getMessageId());
- status = cursor.getNext(key, value, LockMode.DEFAULT);
- if (status == OperationStatus.SUCCESS)
- {
- dd = keyBinding.entryToObject(key);
- }
- }
-
- return messageIds;
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Database error: " + e.getMessage(), e);
- }
- finally
- {
- closeCursorSafely(cursor);
- }
- }
-
- /**
* Return a valid, currently unused message id.
*
* @return A fresh message id.
@@ -1793,12 +1607,6 @@ public class BDBMessageStore implements
}
}
- @Override
- public String getStoreType()
- {
- return _type;
- }
-
private Database getConfiguredObjectsDb()
{
return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME);
@@ -1902,4 +1710,147 @@ public class BDBMessageStore implements
}
}
+
+ public class MaxMessageIdTask implements EnvironmentFacadeTask, MessageHandler
+ {
+ private long _maxId;
+
+ @Override
+ public void execute(EnvironmentFacade facade)
+ {
+ visitMessagesInternal(this, facade);
+ _messageId.set(_maxId);
+ }
+
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ long id = storedMessage.getMessageNumber();
+ if (_maxId<id)
+ {
+ _maxId = id;
+ }
+ return true;
+ }
+
+ }
+
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+ visitMessagesInternal(handler, _environmentFacade);
+ }
+
+ private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade)
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME).openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ long messageId = LongBinding.entryToLong(key);
+ StorableMessageMetaData metaData = valueBinding.entryToObject(value);
+ StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+
+ if (!handler.handle(message))
+ {
+ break;
+ }
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw environmentFacade.handleDatabaseException("Cannot recover messages", e);
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch(DatabaseException e)
+ {
+ throw environmentFacade.handleDatabaseException("Cannot close cursor", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = getDeliveryDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ QueueEntryKey entry = keyBinding.entryToObject(key);
+ UUID queueId = entry.getQueueId();
+ long messageId = entry.getMessageId();
+ if (!handler.handle(queueId, messageId))
+ {
+ break;
+ }
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot visit message instances", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = getXidDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ XidBinding keyBinding = XidBinding.getInstance();
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ Xid xid = keyBinding.entryToObject(key);
+ PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
+ if (!handler.handle(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
+ preparedTransaction.getEnqueues(),preparedTransaction.getDequeues()))
+ {
+ break;
+ }
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot recover distributed transactions", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+ }
}
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java?rev=1584600&r1=1584599&r2=1584600&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java Fri Apr 4 10:21:37 2014
@@ -25,7 +25,7 @@ import com.sleepycat.bind.tuple.TupleBin
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+import org.apache.qpid.server.store.Xid;
public class XidBinding extends TupleBinding<Xid>
{
Copied: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (from r1584379, qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?p2=qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java&p1=qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java&r1=1584379&r2=1584600&rev=1584600&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Fri Apr 4 10:21:37 2014
@@ -20,14 +20,11 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -35,25 +32,15 @@ import org.apache.qpid.framing.ContentHe
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10;
import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
-import org.apache.qpid.server.store.MessageStoreTest;
+import org.apache.qpid.server.store.MessageStoreTestCase;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
@@ -62,15 +49,31 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.util.FileUtils;
/**
- * Subclass of MessageStoreTest which runs the standard tests from the superclass against
+ * Subclass of MessageStoreTestCase which runs the standard tests from the superclass against
* the BDB Store as well as additional tests specific to the BDB store-implementation.
*/
-public class BDBMessageStoreTest extends MessageStoreTest
+public class BDBMessageStoreTest extends MessageStoreTestCase
{
private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+ private String _storeLocation;
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ deleteStoreIfExists();
+ }
+ }
+
/**
* Tests that message metadata and content are successfully read back from a
* store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
@@ -78,9 +81,7 @@ public class BDBMessageStoreTest extends
*/
public void testBDBMessagePersistence() throws Exception
{
- MessageStore store = getVirtualHost().getMessageStore();
-
- BDBMessageStore bdbStore = assertBDBStore(store);
+ BDBMessageStore bdbStore = (BDBMessageStore)getStore();
// Create content ByteBuffers.
// Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
@@ -133,12 +134,13 @@ public class BDBMessageStoreTest extends
/*
* reload the store only (read-only)
*/
- BDBMessageStore readOnlyStore = reloadStore(bdbStore);
+ reopenStore();
/*
* Read back and validate the 0-8 message metadata and content
*/
- StorableMessageMetaData storeableMMD_0_8 = readOnlyStore.getMessageMetaData(messageid_0_8);
+ BDBMessageStore reopenedBdbStore = (BDBMessageStore) getStore();
+ StorableMessageMetaData storeableMMD_0_8 = reopenedBdbStore.getMessageMetaData(messageid_0_8);
assertEquals("Unexpected message type", MessageMetaDataType_0_8.TYPE, storeableMMD_0_8.getType().ordinal());
assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData);
@@ -162,7 +164,7 @@ public class BDBMessageStoreTest extends
assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString());
ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.getBodySize()) ;
- long recoveredCount_0_8 = readOnlyStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
+ long recoveredCount_0_8 = reopenedBdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
assertEquals("Incorrect amount of payload data recovered", chb_0_8.getBodySize(), recoveredCount_0_8);
String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array());
assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8);
@@ -170,7 +172,7 @@ public class BDBMessageStoreTest extends
/*
* Read back and validate the 0-10 message metadata and content
*/
- StorableMessageMetaData storeableMMD_0_10 = readOnlyStore.getMessageMetaData(messageid_0_10);
+ StorableMessageMetaData storeableMMD_0_10 = reopenedBdbStore.getMessageMetaData(messageid_0_10);
assertEquals("Unexpected message type", MessageMetaDataType_0_10.TYPE, storeableMMD_0_10.getType().ordinal());
assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10);
@@ -193,13 +195,13 @@ public class BDBMessageStoreTest extends
assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType());
ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ;
- long recoveredCount = readOnlyStore.getContent(messageid_0_10, 0, recoveredContent);
+ long recoveredCount = reopenedBdbStore.getContent(messageid_0_10, 0, recoveredContent);
assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount);
String returnedPayloadString_0_10 = new String(recoveredContent.array());
assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10);
- readOnlyStore.closeMessageStore();
+ reopenedBdbStore.closeMessageStore();
}
private DeliveryProperties createDeliveryProperties_0_10()
@@ -226,28 +228,6 @@ public class BDBMessageStoreTest extends
return msgProps_0_10;
}
- /**
- * Close the provided store and create a new (read-only) store to read back the data.
- *
- * Use this method instead of reloading the virtual host like other tests in order
- * to avoid the recovery handler deleting the message for not being on a queue.
- */
- private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception
- {
- messageStore.closeMessageStore();
-
-
- BDBMessageStore newStore = new BDBMessageStore();
-
- MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
- when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
- VirtualHost<?> virtualHost = getVirtualHostModel();
- newStore.openMessageStore(virtualHost, virtualHost.getMessageStoreSettings());
-
- newStore.recoverMessageStore(recoveryHandler, null);
-
- return newStore;
- }
private MessagePublishInfo createPublishInfoBody_0_8()
{
@@ -258,20 +238,24 @@ public class BDBMessageStoreTest extends
return new AMQShortString("exchange12345");
}
+ @Override
public void setExchange(AMQShortString exchange)
{
}
+ @Override
public boolean isImmediate()
{
return false;
}
+ @Override
public boolean isMandatory()
{
return true;
}
+ @Override
public AMQShortString getRoutingKey()
{
return new AMQShortString("routingKey12345");
@@ -298,9 +282,8 @@ public class BDBMessageStoreTest extends
public void testGetContentWithOffset() throws Exception
{
- MessageStore store = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(store);
- StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
+ BDBMessageStore bdbStore = (BDBMessageStore) getStore();
+ StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
// normal case: offset is 0
@@ -350,6 +333,7 @@ public class BDBMessageStoreTest extends
System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5);
assertTrue("Unexpected content", Arrays.equals(expected, array));
}
+
/**
* Tests that messages which are added to the store and then removed using the
* public MessageStore interfaces are actually removed from the store by then
@@ -358,10 +342,9 @@ public class BDBMessageStoreTest extends
*/
public void testMessageCreationAndRemoval() throws Exception
{
- MessageStore store = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(store);
+ BDBMessageStore bdbStore = (BDBMessageStore)getStore();
- StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
bdbStore.removeMessage(messageid_0_8, true);
@@ -384,13 +367,6 @@ public class BDBMessageStoreTest extends
assertEquals("Retrieved content when none was expected",
0, bdbStore.getContent(messageid_0_8, 0, dst));
}
- private BDBMessageStore assertBDBStore(MessageStore store)
- {
-
- assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass());
-
- return (BDBMessageStore) store;
- }
private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
{
@@ -413,254 +389,48 @@ public class BDBMessageStoreTest extends
return storedMessage_0_8;
}
- /**
- * Tests transaction commit by utilising the enqueue and dequeue methods available
- * in the TransactionLog interface implemented by the store, and verifying the
- * behaviour using BDB implementation methods.
- */
- public void testTranCommit() throws Exception
- {
- MessageStore log = getVirtualHost().getMessageStore();
-
- BDBMessageStore bdbStore = assertBDBStore(log);
-
- final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
- TransactionLogResource mockQueue = new TransactionLogResource()
- {
- @Override
- public String getName()
- {
- return getId().toString();
- }
-
- @Override
- public UUID getId()
- {
- return mockQueueId;
- }
-
- @Override
- public boolean isDurable()
- {
- return true;
- }
- };
-
- Transaction txn = log.newTransaction();
-
- txn.enqueueMessage(mockQueue, new MockMessage(1L));
- txn.enqueueMessage(mockQueue, new MockMessage(5L));
- txn.commitTran();
-
- List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
-
- assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
- Long val = enqueuedIds.get(0);
- assertEquals("First Message is incorrect", 1L, val.longValue());
- val = enqueuedIds.get(1);
- assertEquals("Second Message is incorrect", 5L, val.longValue());
- }
-
-
- /**
- * Tests transaction rollback before a commit has occurred by utilising the
- * enqueue and dequeue methods available in the TransactionLog interface
- * implemented by the store, and verifying the behaviour using BDB
- * implementation methods.
- */
- public void testTranRollbackBeforeCommit() throws Exception
- {
- MessageStore log = getVirtualHost().getMessageStore();
-
- BDBMessageStore bdbStore = assertBDBStore(log);
-
- final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
- TransactionLogResource mockQueue = new TransactionLogResource()
- {
- @Override
- public String getName()
- {
- return getId().toString();
- }
-
- @Override
- public UUID getId()
- {
- return mockQueueId;
- }
-
- @Override
- public boolean isDurable()
- {
- return true;
- }
- };
-
- Transaction txn = log.newTransaction();
-
- txn.enqueueMessage(mockQueue, new MockMessage(21L));
- txn.abortTran();
-
- txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, new MockMessage(22L));
- txn.enqueueMessage(mockQueue, new MockMessage(23L));
- txn.commitTran();
-
- List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
-
- assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
- Long val = enqueuedIds.get(0);
- assertEquals("First Message is incorrect", 22L, val.longValue());
- val = enqueuedIds.get(1);
- assertEquals("Second Message is incorrect", 23L, val.longValue());
- }
-
public void testOnDelete() throws Exception
{
- MessageStore log = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(log);
- String storeLocation = bdbStore.getStoreLocation();
+ String storeLocation = getStore().getStoreLocation();
File location = new File(storeLocation);
assertTrue("Store does not exist at " + storeLocation, location.exists());
- bdbStore.closeMessageStore();
+ getStore().closeMessageStore();
assertTrue("Store does not exist at " + storeLocation, location.exists());
- bdbStore.onDelete();
+ getStore().onDelete();
assertFalse("Store exists at " + storeLocation, location.exists());
}
- /**
- * Tests transaction rollback after a commit has occurred by utilising the
- * enqueue and dequeue methods available in the TransactionLog interface
- * implemented by the store, and verifying the behaviour using BDB
- * implementation methods.
- */
- public void testTranRollbackAfterCommit() throws Exception
+
+ @Override
+ protected Map<String, Object> getStoreSettings() throws Exception
{
- MessageStore log = getVirtualHost().getMessageStore();
+ _storeLocation = TMP_FOLDER + File.separator + getTestName();
+ deleteStoreIfExists();
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_PATH, _storeLocation);
+ return messageStoreSettings;
- BDBMessageStore bdbStore = assertBDBStore(log);
+ }
- final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
- TransactionLogResource mockQueue = new TransactionLogResource()
+ private void deleteStoreIfExists()
+ {
+ if (_storeLocation != null)
{
- @Override
- public String getName()
- {
- return getId().toString();
- }
-
- @Override
- public UUID getId()
- {
- return mockQueueId;
- }
-
- @Override
- public boolean isDurable()
+ File location = new File(_storeLocation);
+ if (location.exists())
{
- return true;
+ FileUtils.delete(location, true);
}
- };
-
- Transaction txn = log.newTransaction();
-
- txn.enqueueMessage(mockQueue, new MockMessage(30L));
- txn.commitTran();
-
- txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, new MockMessage(31L));
- txn.abortTran();
-
- txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, new MockMessage(32L));
- txn.commitTran();
-
- List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
-
- assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
- Long val = enqueuedIds.get(0);
- assertEquals("First Message is incorrect", 30L, val.longValue());
- val = enqueuedIds.get(1);
- assertEquals("Second Message is incorrect", 32L, val.longValue());
+ }
}
- @SuppressWarnings("rawtypes")
- private static class MockMessage implements ServerMessage, EnqueueableMessage
+ @Override
+ protected MessageStore createMessageStore()
{
- private long _messageId;
-
- public MockMessage(long messageId)
- {
- _messageId = messageId;
- }
-
- public String getInitialRoutingAddress()
- {
- return null;
- }
-
- public AMQMessageHeader getMessageHeader()
- {
- return null;
- }
-
- public StoredMessage getStoredMessage()
- {
- return null;
- }
-
- public boolean isPersistent()
- {
- return true;
- }
-
- public long getSize()
- {
- return 0;
- }
-
- public boolean isImmediate()
- {
- return false;
- }
-
- public long getExpiration()
- {
- return 0;
- }
-
- public MessageReference newReference()
- {
- return null;
- }
-
- public long getMessageNumber()
- {
- return _messageId;
- }
-
- public long getArrivalTime()
- {
- return 0;
- }
-
- public int getContent(ByteBuffer buf, int offset)
- {
- return 0;
- }
-
- public ByteBuffer getContent(int offset, int length)
- {
- return null;
- }
-
- @Override
- public Object getConnectionReference()
- {
- return null;
- }
+ return new BDBMessageStore();
}
+
}
Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java?rev=1584600&r1=1584599&r2=1584600&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java Fri Apr 4 10:21:37 2014
@@ -48,7 +48,7 @@ import org.apache.qpid.server.model.Life
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+import org.apache.qpid.server.store.Xid;
import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org