You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/03/28 12:30:06 UTC
svn commit: r1306244 [3/4] - in
/qpid/branches/java-config-and-management/qpid/java: ./ bdbstore/
bdbstore/bin/ bdbstore/src/main/java/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/...
Copied: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java (from r1306213, qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java?p2=qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java&p1=qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java&r1=1306213&r2=1306244&rev=1306244&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java Wed Mar 28 10:30:03 2012
@@ -18,28 +18,20 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.upgrade;
-import com.sleepycat.bind.tuple.TupleBinding;
-
-import org.apache.qpid.server.store.berkeleydb.records.BindingRecord;
-
-public class BindingTupleBindingFactory extends TupleBindingFactory<BindingRecord>
+public interface UpgradeInteractionHandler
{
- public BindingTupleBindingFactory(int version)
- {
- super(version);
- }
+ UpgradeInteractionResponse requireResponse(String question, UpgradeInteractionResponse defaultResponse,
+ UpgradeInteractionResponse... possibleResponses);
- public TupleBinding<BindingRecord> getInstance()
+ public static final UpgradeInteractionHandler DEFAULT_HANDLER = new UpgradeInteractionHandler()
{
- switch (getVersion())
+ public UpgradeInteractionResponse requireResponse(final String question,
+ final UpgradeInteractionResponse defaultResponse,
+ final UpgradeInteractionResponse... possibleResponses)
{
- default:
- case 5:
- //no change from v4
- case 4:
- return new BindingTuple_4();
+ return defaultResponse;
}
- }
+ };
}
Copied: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionResponse.java (from r1306213, qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionResponse.java?p2=qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionResponse.java&p1=qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java&r1=1306213&r2=1306244&rev=1306244&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionResponse.java Wed Mar 28 10:30:03 2012
@@ -18,8 +18,11 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.upgrade;
-public interface BindingTuple
+public enum UpgradeInteractionResponse
{
+ YES,
+ NO,
+ ABORT
}
Added: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java?rev=1306244&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java Wed Mar 28 10:30:03 2012
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+
+public class Upgrader
+{
+ static final String VERSION_DB_NAME = "VERSION";
+
+ private Environment _environment;
+ private LogSubject _logSubject;
+
+ public Upgrader(Environment environment, LogSubject logSubject)
+ {
+ _environment = environment;
+ _logSubject = logSubject;
+ }
+
+ public void upgradeIfNecessary() throws AMQStoreException
+ {
+ boolean isEmpty = _environment.getDatabaseNames().isEmpty();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+
+ Database versionDb = null;
+ try
+ {
+ versionDb = _environment.openDatabase(null, VERSION_DB_NAME, dbConfig);
+
+ if(versionDb.count() == 0L)
+ {
+ int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion();
+ DatabaseEntry key = new DatabaseEntry();
+ IntegerBinding.intToEntry(sourceVersion, key);
+ DatabaseEntry value = new DatabaseEntry();
+ LongBinding.longToEntry(System.currentTimeMillis(), value);
+
+ versionDb.put(null, key, value);
+ }
+
+ int version = getSourceVersion(versionDb);
+
+ performUpgradeFromVersion(version, versionDb);
+ }
+ finally
+ {
+ if (versionDb != null)
+ {
+ versionDb.close();
+ }
+ }
+ }
+
+ int getSourceVersion(Database versionDb)
+ {
+ int version = BDBMessageStore.VERSION + 1;
+ OperationStatus result;
+
+ do
+ {
+ version--;
+ DatabaseEntry key = new DatabaseEntry();
+ IntegerBinding.intToEntry(version, key);
+ DatabaseEntry value = new DatabaseEntry();
+
+ result = versionDb.get(null, key, value, LockMode.READ_COMMITTED);
+ }
+ while(result == OperationStatus.NOTFOUND);
+ return version;
+ }
+
+ void performUpgradeFromVersion(int sourceVersion, Database versionDb)
+ throws AMQStoreException
+ {
+ while(sourceVersion != BDBMessageStore.VERSION)
+ {
+ upgrade(sourceVersion, ++sourceVersion);
+ DatabaseEntry key = new DatabaseEntry();
+ IntegerBinding.intToEntry(sourceVersion, key);
+ DatabaseEntry value = new DatabaseEntry();
+ LongBinding.longToEntry(System.currentTimeMillis(), value);
+ versionDb.put(null, key, value);
+ }
+ }
+
+ void upgrade(final int fromVersion, final int toVersion) throws AMQStoreException
+ {
+ try
+ {
+ @SuppressWarnings("unchecked")
+ Class<StoreUpgrade> upgradeClass =
+ (Class<StoreUpgrade>) Class.forName("org.apache.qpid.server.store.berkeleydb.upgrade."
+ + "UpgradeFrom"+fromVersion+"To"+toVersion);
+ Constructor<StoreUpgrade> ctr = upgradeClass.getConstructor();
+ StoreUpgrade upgrade = ctr.newInstance();
+ upgrade.performUpgrade(_logSubject, _environment, UpgradeInteractionHandler.DEFAULT_HANDLER);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new AMQStoreException("Unable to upgrade BDB data store from version " + fromVersion + " to version"
+ + toVersion, e);
+ }
+ catch (NoSuchMethodException e)
+ {
+ throw new AMQStoreException("Unable to upgrade BDB data store from version " + fromVersion + " to version"
+ + toVersion, e);
+ }
+ catch (InvocationTargetException e)
+ {
+ throw new AMQStoreException("Unable to upgrade BDB data store from version " + fromVersion + " to version"
+ + toVersion, e);
+ }
+ catch (InstantiationException e)
+ {
+ throw new AMQStoreException("Unable to upgrade BDB data store from version " + fromVersion + " to version"
+ + toVersion, e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new AMQStoreException("Unable to upgrade BDB data store from version " + fromVersion + " to version"
+ + toVersion, e);
+ }
+ }
+
+ private int identifyOldStoreVersion() throws DatabaseException
+ {
+ int version = 0;
+ for (String databaseName : _environment.getDatabaseNames())
+ {
+ if (databaseName.contains("_v"))
+ {
+ int versionIndex = databaseName.indexOf("_v");
+ if (versionIndex == -1)
+ {
+ versionIndex = 1;
+ }
+ version = Integer.parseInt(databaseName.substring(versionIndex + 2));
+ break;
+ }
+ }
+ return version;
+ }
+}
Modified: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1306244&r1=1306243&r2=1306244&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Wed Mar 28 10:30:03 2012
@@ -58,9 +58,11 @@ import java.util.List;
*/
public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest
{
+ private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+
/**
- * Tests that message metadata and content are successfully read back from a
- * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
+ * Tests that message metadata and content are successfully read back from a
+ * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
* verify their ability to co-exist within the store and be successful retrieved.
*/
public void testBDBMessagePersistence() throws Exception
@@ -73,10 +75,10 @@ public class BDBMessageStoreTest extends
// Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
// Use a single chunk for the 0-10 message as per broker behaviour.
String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
-
+
ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes());
-
+
ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes());
int bodySize = completeContentBody_0_10.limit();
@@ -100,12 +102,12 @@ public class BDBMessageStoreTest extends
/*
* Create and insert a 0-10 message (metadata and content)
- */
+ */
MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize);
DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10();
Header header_0_10 = new Header(delProps_0_10, msgProps_0_10);
- MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT,
+ MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT,
MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10);
@@ -190,14 +192,14 @@ public class BDBMessageStoreTest extends
private DeliveryProperties createDeliveryProperties_0_10()
{
DeliveryProperties delProps_0_10 = new DeliveryProperties();
-
+
delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
delProps_0_10.setImmediate(true);
delProps_0_10.setExchange("exchange12345");
delProps_0_10.setRoutingKey("routingKey12345");
delProps_0_10.setExpiration(5);
delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE);
-
+
return delProps_0_10;
}
@@ -207,14 +209,14 @@ public class BDBMessageStoreTest extends
msgProps_0_10.setContentLength(bodySize);
msgProps_0_10.setCorrelationId("qwerty".getBytes());
msgProps_0_10.setContentType("text/html");
-
+
return msgProps_0_10;
}
- /**
+ /**
* Close the provided store and create a new (read-only) store to read back the data.
- *
- * Use this method instead of reloading the virtual host like other tests in order
+ *
+ * Use this method instead of reloading the virtual host like other tests in order
* to avoid the recovery handler deleting the message for not being on a queue.
*/
private BDBMessageStore reloadStoreReadOnly(BDBMessageStore messageStore) throws Exception
@@ -275,7 +277,61 @@ public class BDBMessageStoreTest extends
props.getHeaders().setString("Test", "MST");
return props;
}
-
+
+ public void testGetContentWithOffset() throws Exception
+ {
+ MessageStore store = getVirtualHost().getMessageStore();
+ BDBMessageStore bdbStore = assertBDBStore(store);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
+ long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+
+ // normal case: offset is 0
+ ByteBuffer dst = ByteBuffer.allocate(10);
+ int length = bdbStore.getContent(messageid_0_8, 0, dst);
+ assertEquals("Unexpected length", CONTENT_BYTES.length, length);
+ byte[] array = dst.array();
+ assertTrue("Unexpected content", Arrays.equals(CONTENT_BYTES, array));
+
+ // offset is in the middle
+ dst = ByteBuffer.allocate(10);
+ length = bdbStore.getContent(messageid_0_8, 5, dst);
+ assertEquals("Unexpected length", 5, length);
+ array = dst.array();
+ byte[] expected = new byte[10];
+ System.arraycopy(CONTENT_BYTES, 5, expected, 0, 5);
+ assertTrue("Unexpected content", Arrays.equals(expected, array));
+
+ // offset beyond the content length
+ dst = ByteBuffer.allocate(10);
+ try
+ {
+ bdbStore.getContent(messageid_0_8, 15, dst);
+ fail("Should fail for the offset greater than message size");
+ }
+ catch (RuntimeException e)
+ {
+ assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id "
+ + messageid_0_8 + "!", e.getMessage());
+ }
+
+ // buffer is smaller then message size
+ dst = ByteBuffer.allocate(5);
+ length = bdbStore.getContent(messageid_0_8, 0, dst);
+ assertEquals("Unexpected length", 5, length);
+ array = dst.array();
+ expected = new byte[5];
+ System.arraycopy(CONTENT_BYTES, 0, expected, 0, 5);
+ assertTrue("Unexpected content", Arrays.equals(expected, array));
+
+ // buffer is smaller then message size, offset is not 0
+ dst = ByteBuffer.allocate(5);
+ length = bdbStore.getContent(messageid_0_8, 2, dst);
+ assertEquals("Unexpected length", 5, length);
+ array = dst.array();
+ expected = new byte[5];
+ System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5);
+ assertTrue("Unexpected content", Arrays.equals(expected, array));
+ }
/**
* Tests that messages which are added to the store and then removed using the
* public MessageStore interfaces are actually removed from the store by then
@@ -287,11 +343,10 @@ public class BDBMessageStoreTest extends
MessageStore store = getVirtualHost().getMessageStore();
BDBMessageStore bdbStore = assertBDBStore(store);
- StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreMultiChunkMessage_0_8(store);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
-
- //remove the message in the fashion the broker normally would
- storedMessage_0_8.remove();
+
+ bdbStore.removeMessage(messageid_0_8, true);
//verify the removal using the BDB store implementation methods directly
try
@@ -308,10 +363,10 @@ public class BDBMessageStoreTest extends
//expecting no content, allocate a 1 byte
ByteBuffer dst = ByteBuffer.allocate(1);
- assertEquals("Retrieved content when none was expected",
+ assertEquals("Retrieved content when none was expected",
0, bdbStore.getContent(messageid_0_8, 0, dst));
}
-
+
private BDBMessageStore assertBDBStore(Object store)
{
if(!(store instanceof BDBMessageStore))
@@ -322,15 +377,11 @@ public class BDBMessageStoreTest extends
return (BDBMessageStore) store;
}
- private StoredMessage<MessageMetaData> createAndStoreMultiChunkMessage_0_8(MessageStore store)
+ private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
{
- byte[] body10Bytes = "0123456789".getBytes();
- byte[] body5Bytes = "01234".getBytes();
-
- ByteBuffer chunk1 = ByteBuffer.wrap(body10Bytes);
- ByteBuffer chunk2 = ByteBuffer.wrap(body5Bytes);
+ ByteBuffer chunk1 = ByteBuffer.wrap(CONTENT_BYTES);
- int bodySize = body10Bytes.length + body5Bytes.length;
+ int bodySize = CONTENT_BYTES.length;
//create and store the message using the MessageStore interface
MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
@@ -342,7 +393,6 @@ public class BDBMessageStoreTest extends
StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
storedMessage_0_8.addContent(0, chunk1);
- storedMessage_0_8.addContent(chunk1.limit(), chunk2);
storedMessage_0_8.flushToStore();
return storedMessage_0_8;
@@ -360,7 +410,7 @@ public class BDBMessageStoreTest extends
BDBMessageStore bdbStore = assertBDBStore(log);
final AMQShortString mockQueueName = new AMQShortString("queueName");
-
+
TransactionLogResource mockQueue = new TransactionLogResource()
{
public String getResourceName()
@@ -368,27 +418,27 @@ public class BDBMessageStoreTest extends
return mockQueueName.asString();
}
};
-
+
MessageStore.Transaction txn = log.newTransaction();
-
+
txn.enqueueMessage(mockQueue, new MockMessage(1L));
txn.enqueueMessage(mockQueue, new MockMessage(5L));
txn.commitTran();
List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
-
+
assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
assertEquals("First Message is incorrect", 1L, val.longValue());
val = enqueuedIds.get(1);
assertEquals("Second Message is incorrect", 5L, val.longValue());
}
-
-
+
+
/**
- * Tests transaction rollback before a commit has occurred by utilising the
- * enqueue and dequeue methods available in the TransactionLog interface
- * implemented by the store, and verifying the behaviour using BDB
+ * Tests transaction rollback before a commit has occurred by utilising the
+ * enqueue and dequeue methods available in the TransactionLog interface
+ * implemented by the store, and verifying the behaviour using BDB
* implementation methods.
*/
public void testTranRollbackBeforeCommit() throws Exception
@@ -398,7 +448,7 @@ public class BDBMessageStoreTest extends
BDBMessageStore bdbStore = assertBDBStore(log);
final AMQShortString mockQueueName = new AMQShortString("queueName");
-
+
TransactionLogResource mockQueue = new TransactionLogResource()
{
public String getResourceName()
@@ -406,30 +456,30 @@ public class BDBMessageStoreTest extends
return mockQueueName.asString();
}
};
-
+
MessageStore.Transaction txn = log.newTransaction();
-
+
txn.enqueueMessage(mockQueue, new MockMessage(21L));
txn.abortTran();
-
+
txn = log.newTransaction();
txn.enqueueMessage(mockQueue, new MockMessage(22L));
txn.enqueueMessage(mockQueue, new MockMessage(23L));
txn.commitTran();
List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
-
+
assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
assertEquals("First Message is incorrect", 22L, val.longValue());
val = enqueuedIds.get(1);
assertEquals("Second Message is incorrect", 23L, val.longValue());
}
-
+
/**
- * Tests transaction rollback after a commit has occurred by utilising the
- * enqueue and dequeue methods available in the TransactionLog interface
- * implemented by the store, and verifying the behaviour using BDB
+ * Tests transaction rollback after a commit has occurred by utilising the
+ * enqueue and dequeue methods available in the TransactionLog interface
+ * implemented by the store, and verifying the behaviour using BDB
* implementation methods.
*/
public void testTranRollbackAfterCommit() throws Exception
@@ -439,7 +489,7 @@ public class BDBMessageStoreTest extends
BDBMessageStore bdbStore = assertBDBStore(log);
final AMQShortString mockQueueName = new AMQShortString("queueName");
-
+
TransactionLogResource mockQueue = new TransactionLogResource()
{
public String getResourceName()
@@ -447,22 +497,22 @@ public class BDBMessageStoreTest extends
return mockQueueName.asString();
}
};
-
+
MessageStore.Transaction txn = log.newTransaction();
-
+
txn.enqueueMessage(mockQueue, new MockMessage(30L));
txn.commitTran();
txn = log.newTransaction();
txn.enqueueMessage(mockQueue, new MockMessage(31L));
txn.abortTran();
-
+
txn = log.newTransaction();
txn.enqueueMessage(mockQueue, new MockMessage(32L));
txn.commitTran();
-
+
List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
-
+
assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
assertEquals("First Message is incorrect", 30L, val.longValue());
@@ -470,6 +520,7 @@ public class BDBMessageStoreTest extends
assertEquals("Second Message is incorrect", 32L, val.longValue());
}
+ @SuppressWarnings("rawtypes")
private static class MockMessage implements ServerMessage, EnqueableMessage
{
private long _messageId;
Modified: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java?rev=1306244&r1=1306243&r2=1306244&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java Wed Mar 28 10:30:03 2012
@@ -20,19 +20,36 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.url.URLSyntaxException;
-import javax.jms.*;
-
/**
- * Prepares an older version brokers BDB store with the required
+ * Prepares an older version brokers BDB store with the required
* contents for use in the BDBStoreUpgradeTest.
*
* NOTE: Must be used with the equivalent older version client!
*
- * The store will then be used to verify that the upgraded is
- * completed properly and that once upgraded it functions as
+ * The store will then be used to verify that the upgraded is
+ * completed properly and that once upgraded it functions as
* expected with the new broker.
*
*/
@@ -43,9 +60,10 @@ public class BDBStoreUpgradeTestPreparer
public static final String SELECTOR_SUB_NAME="mySelectorDurSubName";
public static final String SELECTOR_TOPIC_NAME="mySelectorUpgradeTopic";
public static final String QUEUE_NAME="myUpgradeQueue";
+ public static final String NON_DURABLE_QUEUE_NAME="queue-non-durable";
private static AMQConnectionFactory _connFac;
- private static final String CONN_URL =
+ private static final String CONN_URL =
"amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'";
/**
@@ -59,14 +77,28 @@ public class BDBStoreUpgradeTestPreparer
private void prepareBroker() throws Exception
{
prepareQueues();
+ prepareNonDurableQueue();
prepareDurableSubscriptionWithSelector();
prepareDurableSubscriptionWithoutSelector();
}
+ private void prepareNonDurableQueue() throws Exception
+ {
+ Connection connection = _connFac.createConnection();
+ AMQSession<?, ?> session = (AMQSession<?,?>)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQShortString queueName = AMQShortString.valueOf(NON_DURABLE_QUEUE_NAME);
+ AMQDestination destination = (AMQDestination) session.createQueue(NON_DURABLE_QUEUE_NAME);
+ session.sendCreateQueue(queueName, false, false, false, null);
+ session.bindQueue(queueName, queueName, null, AMQShortString.valueOf("amq.direct"), destination);
+ MessageProducer messageProducer = session.createProducer(destination);
+ sendMessages(session, messageProducer, destination, DeliveryMode.PERSISTENT, 1024, 3);
+ connection.close();
+ }
+
/**
* Prepare a queue for use in testing message and binding recovery
* after the upgrade is performed.
- *
+ *
* - Create a transacted session on the connection.
* - Use a consumer to create the (durable by default) queue.
* - Send 5 large messages to test (multi-frame) content recovery.
@@ -74,7 +106,7 @@ public class BDBStoreUpgradeTestPreparer
* - Commit the session.
* - Send 5 small messages to test that uncommitted messages are not recovered.
* following the upgrade.
- * - Close the session.
+ * - Close the session.
*/
private void prepareQueues() throws Exception
{
@@ -114,9 +146,9 @@ public class BDBStoreUpgradeTestPreparer
}
/**
- * Prepare a DurableSubscription backing queue for use in testing selector
+ * Prepare a DurableSubscription backing queue for use in testing selector
* recovery and queue exclusivity marking during the upgrade process.
- *
+ *
* - Create a transacted session on the connection.
* - Open and close a DurableSubscription with selector to create the backing queue.
* - Send a message which matches the selector.
@@ -145,7 +177,7 @@ public class BDBStoreUpgradeTestPreparer
TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false);
durSub1.close();
- // Create a publisher and send a persistent message which matches the selector
+ // Create a publisher and send a persistent message which matches the selector
// followed by one that does not match, and another which matches but is not
// committed and so should be 'lost'
TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
@@ -202,7 +234,7 @@ public class BDBStoreUpgradeTestPreparer
connection.close();
}
- public static void sendMessages(Session session, MessageProducer messageProducer,
+ public static void sendMessages(Session session, MessageProducer messageProducer,
Destination dest, int deliveryMode, int length, int numMesages) throws JMSException
{
for (int i = 1; i <= numMesages; i++)
@@ -213,7 +245,7 @@ public class BDBStoreUpgradeTestPreparer
}
}
- public static void publishMessages(Session session, TopicPublisher publisher,
+ public static void publishMessages(Session session, TopicPublisher publisher,
Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException
{
for (int i = 1; i <= numMesages; i++)
@@ -227,8 +259,8 @@ public class BDBStoreUpgradeTestPreparer
/**
* Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2.
- *
- * @param length number of characters in the string
+ *
+ * @param length number of characters in the string
* @return string sequence of the given length
*/
public static String generateString(int length)
@@ -248,6 +280,7 @@ public class BDBStoreUpgradeTestPreparer
*/
public static void main(String[] args) throws Exception
{
+ System.setProperty("qpid.dest_syntax", "BURL");
BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer();
producer.prepareBroker();
}
Modified: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java?rev=1306244&r1=1306243&r2=1306244&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java Wed Mar 28 10:30:03 2012
@@ -20,36 +20,14 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.je.DatabaseEntry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.management.common.mbeans.ManagedQueue;
-import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
-import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.util.FileUtils;
-
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_SUB_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
+
+import java.io.File;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@@ -64,18 +42,18 @@ import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
+
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Tests upgrading a BDB store and using it with the new broker
- * after the required contents are entered into the store using
- * an old broker with the BDBStoreUpgradeTestPreparer. The store
- * will then be used to verify that the upgraded is completed
- * properly and that once upgraded it functions as expected with
- * the new broker.
+ * Tests upgrading a BDB store on broker startup.
+ * The store will then be used to verify that the upgrade is completed
+ * properly and that once upgraded it functions as expected.
*/
public class BDBUpgradeTest extends QpidBrokerTestCase
{
@@ -84,73 +62,31 @@ public class BDBUpgradeTest extends Qpid
private static final String STRING_1024 = BDBStoreUpgradeTestPreparer.generateString(1024);
private static final String STRING_1024_256 = BDBStoreUpgradeTestPreparer.generateString(1024*256);
private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK");
- private static final String QPID_HOME = System.getProperty("QPID_HOME");
- private static final int VERSION_4 = 4;
- private String _fromDir;
- private String _toDir;
- private String _toDirTwice;
+ private String _storeLocation;
@Override
public void setUp() throws Exception
{
assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG);
- assertNotNull("QPID_HOME must be set", QPID_HOME);
-
- _fromDir = QPID_HOME + "/bdbstore-to-upgrade/test-store";
- _toDir = getWorkDirBaseDir() + "/bdbstore/test-store";
- _toDirTwice = getWorkDirBaseDir() + "/bdbstore-upgraded-twice";
+ _storeLocation = QPID_WORK_ORIG + "/bdbstore/test-store";
//Clear the two target directories if they exist.
- File directory = new File(_toDir);
- if (directory.exists() && directory.isDirectory())
- {
- FileUtils.delete(directory, true);
- }
- directory = new File(_toDirTwice);
+ File directory = new File(_storeLocation);
if (directory.exists() && directory.isDirectory())
{
FileUtils.delete(directory, true);
}
- //Upgrade the test store.
- upgradeBrokerStore(_fromDir, _toDir);
+ // copy store files
+ String src = getClass().getClassLoader().getResource("upgrade/bdbstore-v4/test-store").toURI().getPath();
+ FileUtils.copyRecursive(new File(src), new File(_storeLocation));
//override the broker config used and then start the broker with the updated store
_configFile = new File("build/etc/config-systests-bdb.xml");
setConfigurationProperty("management.enabled", "true");
- super.setUp();
- }
-
- private String getWorkDirBaseDir()
- {
- return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort());
- }
-
- /**
- * Tests that the core upgrade method of the store upgrade tool passes through the exception
- * from the BDBMessageStore indicating that the data on disk can't be loaded as the previous
- * version because it has already been upgraded.
- * @throws Exception
- */
- public void testMultipleUpgrades() throws Exception
- {
- //stop the broker started by setUp() in order to allow the second upgrade attempt to proceed
- stopBroker();
-
- try
- {
- new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(VERSION_4);
- fail("Second Upgrade Succeeded");
- }
- catch (Exception e)
- {
- System.err.println("Showing stack trace, we are expecting an 'Unable to load BDBStore' error");
- e.printStackTrace();
- assertTrue("Incorrect Exception Thrown:" + e.getMessage(),
- e.getMessage().contains("Unable to load BDBStore as version 4. Store on disk contains version 5 data"));
- }
+ super.setUp();
}
/**
@@ -175,26 +111,26 @@ public class BDBUpgradeTest extends Qpid
try
{
ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SELECTOR_SUB_NAME);
- assertEquals("DurableSubscription backing queue should have 1 message on it initially",
+ assertEquals("DurableSubscription backing queue should have 1 message on it initially",
new Integer(1), dursubQueue.getMessageCount());
-
+
// Create a connection and start it
TopicConnection connection = (TopicConnection) getConnection();
connection.start();
-
+
// Send messages which don't match and do match the selector, checking message count
- TopicSession pubSession = connection.createTopicSession(true, org.apache.qpid.jms.Session.SESSION_TRANSACTED);
+ TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
Topic topic = pubSession.createTopic(SELECTOR_TOPIC_NAME);
TopicPublisher publisher = pubSession.createPublisher(topic);
-
+
BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
pubSession.commit();
- assertEquals("DurableSubscription backing queue should still have 1 message on it",
+ assertEquals("DurableSubscription backing queue should still have 1 message on it",
Integer.valueOf(1), dursubQueue.getMessageCount());
-
+
BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
pubSession.commit();
- assertEquals("DurableSubscription backing queue should now have 2 messages on it",
+ assertEquals("DurableSubscription backing queue should now have 2 messages on it",
Integer.valueOf(2), dursubQueue.getMessageCount());
TopicSubscriber durSub = pubSession.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false);
@@ -240,7 +176,7 @@ public class BDBUpgradeTest extends Qpid
connection.start();
// Send new message matching the topic, checking message count
- TopicSession session = connection.createTopicSession(true, org.apache.qpid.jms.Session.SESSION_TRANSACTED);
+ TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(TOPIC_NAME);
TopicPublisher publisher = session.createPublisher(topic);
@@ -298,10 +234,10 @@ public class BDBUpgradeTest extends Qpid
/**
* Test that the upgraded queue continues to function properly when used
- * for persistent messaging and restarting the broker.
- *
+ * for persistent messaging and restarting the broker.
+ *
* Sends the new messages to the queue BEFORE consuming those which were
- * sent before the upgrade. In doing so, this also serves to test that
+ * sent before the upgrade. In doing so, this also serves to test that
* the queue bindings were successfully transitioned during the upgrade.
*/
public void testBindingAndMessageDurabability() throws Exception
@@ -329,7 +265,7 @@ public class BDBUpgradeTest extends Qpid
}
/**
- * Test that all of the committed persistent messages previously sent to
+ * Test that all of the committed persistent messages previously sent to
* the broker are properly received following update of the MetaData and
* Content entries during the store upgrade process.
*/
@@ -349,200 +285,22 @@ public class BDBUpgradeTest extends Qpid
*
* @throws Exception
*/
- public void testMigrationOfMessagesForNonExistingQueues() throws Exception
+ public void testMigrationOfMessagesForNonDurableQueues() throws Exception
{
- stopBroker();
-
- // copy store data into a new location for adding of phantom message
- File storeLocation = new File(_fromDir);
- File target = new File(_toDirTwice);
- if (!target.exists())
- {
- target.mkdirs();
- }
- FileUtils.copyRecursive(storeLocation, target);
-
- // delete migrated data
- File directory = new File(_toDir);
- if (directory.exists() && directory.isDirectory())
- {
- FileUtils.delete(directory, true);
- }
-
- // test data
- String nonExistingQueueName = getTestQueueName();
- String messageText = "Test Phantom Message";
-
- // add message
- addMessageForNonExistingQueue(target, VERSION_4, nonExistingQueueName, messageText);
-
- String[] inputs = { "Yes", "Yes", "Yes" };
- upgradeBrokerStoreInInterractiveMode(_toDirTwice, _toDir, inputs);
-
- // start broker
- startBroker();
-
// Create a connection and start it
Connection connection = getConnection();
connection.start();
// consume a message for non-existing store
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(nonExistingQueueName);
+ Queue queue = session.createQueue(NON_DURABLE_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
- Message message = messageConsumer.receive(1000);
-
- // assert consumed message
- assertNotNull("Message was not migrated!", message);
- assertTrue("Unexpected message received!", message instanceof TextMessage);
- String text = ((TextMessage) message).getText();
- assertEquals("Message migration failed!", messageText, text);
- }
-
- /**
- * An utility method to upgrade broker with simulation user interactions
- *
- * @param fromDir
- * location of the store to migrate
- * @param toDir
- * location of where migrated data will be stored
- * @param inputs
- * user answers on upgrade tool questions
- * @throws Exception
- */
- private void upgradeBrokerStoreInInterractiveMode(String fromDir, String toDir, final String[] inputs)
- throws Exception
- {
- // save to restore system.in after data migration
- InputStream stdin = System.in;
-
- // set fake system in to simulate user interactions
- // FIXME: it is a quite dirty simulator of system input but it does the job
- System.setIn(new InputStream()
- {
-
- private int counter = 0;
-
- public synchronized int read(byte b[], int off, int len)
- {
- byte[] src = (inputs[counter] + "\n").getBytes();
- System.arraycopy(src, 0, b, off, src.length);
- counter++;
- return src.length;
- }
-
- @Override
- public int read() throws IOException
- {
- return -1;
- }
- });
-
- try
- {
- // Upgrade the test store.
- new BDBStoreUpgrade(fromDir, toDir, null, true, true).upgradeFromVersion(VERSION_4);
- }
- finally
- {
- // restore system in
- System.setIn(stdin);
- }
- }
- @SuppressWarnings("unchecked")
- private void addMessageForNonExistingQueue(File storeLocation, int storeVersion, String nonExistingQueueName,
- String messageText) throws Exception
- {
- final AMQShortString queueName = new AMQShortString(nonExistingQueueName);
- BDBMessageStore store = new BDBMessageStore(storeVersion);
- store.configure(storeLocation, false);
- try
+ for (int i = 0; i < 3; i++)
{
- store.start();
-
- // store message objects
- ByteBuffer completeContentBody = ByteBuffer.wrap(messageText.getBytes("UTF-8"));
- long bodySize = completeContentBody.limit();
- MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new AMQShortString("amq.direct"), false,
- false, queueName);
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
- props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
- props.setContentType("text/plain");
- props.setType("text/plain");
- props.setMessageId("whatever");
- props.setEncoding("UTF-8");
- props.getHeaders().setString("Test", "MST");
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
- ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize);
-
- // add content entry to database
- final long messageId = store.getNewMessageId();
- TupleBinding<MessageContentKey> contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance();
- MessageContentKey contentKey = null;
- if (storeVersion == VERSION_4)
- {
- contentKey = new MessageContentKey_4(messageId, 0);
- }
- else
- {
- throw new Exception(storeVersion + " is not supported");
- }
- DatabaseEntry key = new DatabaseEntry();
- contentKeyTB.objectToEntry(contentKey, key);
- DatabaseEntry data = new DatabaseEntry();
- ContentTB contentTB = new ContentTB();
- contentTB.objectToEntry(completeContentBody, data);
- store.getContentDb().put(null, key, data);
-
- // add meta data entry to database
- TupleBinding<Long> longTB = TupleBinding.getPrimitiveBinding(Long.class);
- TupleBinding<Object> metaDataTB = new MessageMetaDataTupleBindingFactory(storeVersion).getInstance();
- key = new DatabaseEntry();
- data = new DatabaseEntry();
- longTB.objectToEntry(new Long(messageId), key);
- MessageMetaData metaData = new MessageMetaData(pubInfoBody, contentHeaderBody, 1);
- metaDataTB.objectToEntry(metaData, data);
- store.getMetaDataDb().put(null, key, data);
-
- // add delivery entry to database
- TransactionLogResource mockQueue = new TransactionLogResource()
- {
- public String getResourceName()
- {
- return queueName.asString();
- }
- };
-
- EnqueableMessage mockMessage = new EnqueableMessage()
- {
-
- public long getMessageNumber()
- {
- return messageId;
- }
-
- public boolean isPersistent()
- {
- return true;
- }
-
- public StoredMessage getStoredMessage()
- {
- return null;
- }
- };
-
- MessageStore log = (MessageStore) store;
- MessageStore.Transaction txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, mockMessage);
- txn.commitTran();
- }
- finally
- {
- // close store
- store.close();
+ Message message = messageConsumer.receive(1000);
+ assertNotNull("Message was not migrated!", message);
+ assertTrue("Unexpected message received!", message instanceof TextMessage);
}
}
@@ -564,7 +322,7 @@ public class BDBUpgradeTest extends Qpid
}
- // Retrieve the matching message
+ // Retrieve the matching message
Message m = durSub.receive(2000);
assertNotNull("Failed to receive an expected message", m);
if(selector)
@@ -623,8 +381,4 @@ public class BDBUpgradeTest extends Qpid
session.close();
}
- private void upgradeBrokerStore(String fromDir, String toDir) throws Exception
- {
- new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(VERSION_4);
- }
}
Added: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java?rev=1306244&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java Wed Mar 28 10:30:03 2012
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import java.io.File;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.subjects.TestBlankSubject;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.Transaction;
+
+public abstract class AbstractUpgradeTestCase extends TestCase
+{
+ protected static final class StaticAnswerHandler implements UpgradeInteractionHandler
+ {
+ private UpgradeInteractionResponse _response;
+
+ public StaticAnswerHandler(UpgradeInteractionResponse response)
+ {
+ _response = response;
+ }
+
+ @Override
+ public UpgradeInteractionResponse requireResponse(String question, UpgradeInteractionResponse defaultResponse,
+ UpgradeInteractionResponse... possibleResponses)
+ {
+ return _response;
+ }
+ }
+
+ public static final String[] QUEUE_NAMES = { "clientid:myDurSubName", "clientid:mySelectorDurSubName", "myUpgradeQueue",
+ "queue-non-durable" };
+ public static int[] QUEUE_SIZES = { 1, 1, 10, 3 };
+ public static int TOTAL_MESSAGE_NUMBER = 15;
+ protected static final LogSubject LOG_SUBJECT = new TestBlankSubject();
+ protected static final String TMP_FOLDER = System.getProperty("java.io.tmpdir");
+
+ // one binding per exchange
+ protected static final int TOTAL_BINDINGS = QUEUE_NAMES.length * 2;
+ protected static final int TOTAL_EXCHANGES = 5;
+
+ private File _storeLocation;
+ protected Environment _environment;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _storeLocation = copyStore(getStoreDirectoryName());
+
+ _environment = createEnvironment(_storeLocation);
+ }
+
+ /** @return eg "bdbstore-v4" - used for copying store */
+ protected abstract String getStoreDirectoryName();
+
+ protected Environment createEnvironment(File storeLocation)
+ {
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setConfigParam("je.lock.nLockTables", "7");
+ envConfig.setReadOnly(false);
+ envConfig.setSharedCache(false);
+ envConfig.setCacheSize(0);
+ return new Environment(storeLocation, envConfig);
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ _environment.close();
+ }
+ finally
+ {
+ _environment = null;
+ deleteDirectoryIfExists(_storeLocation);
+ }
+ super.tearDown();
+ }
+
+ private File copyStore(String storeDirectoryName) throws Exception
+ {
+ String src = getClass().getClassLoader().getResource("upgrade/" + storeDirectoryName).toURI().getPath();
+ File storeLocation = new File(new File(TMP_FOLDER), "test-store");
+ deleteDirectoryIfExists(storeLocation);
+ FileUtils.copyRecursive(new File(src), new File(TMP_FOLDER));
+ return storeLocation;
+ }
+
+ protected void deleteDirectoryIfExists(File dir)
+ {
+ if (dir.exists())
+ {
+ assertTrue("The provided file " + dir + " is not a directory", dir.isDirectory());
+
+ boolean deletedSuccessfully = FileUtils.delete(dir, true);
+
+ assertTrue("Files at '" + dir + "' should have been deleted", deletedSuccessfully);
+ }
+ }
+
+ protected void assertDatabaseRecordCount(String databaseName, final long expectedCountNumber)
+ {
+ long count = getDatabaseCount(databaseName);
+ assertEquals("Unexpected database '" + databaseName + "' entry number", expectedCountNumber, count);
+ }
+
+ protected long getDatabaseCount(String databaseName)
+ {
+ DatabaseCallable<Long> operation = new DatabaseCallable<Long>()
+ {
+
+ @Override
+ public Long call(Database sourceDatabase, Database targetDatabase, Transaction transaction)
+ {
+ return new Long(sourceDatabase.count());
+
+ }
+ };
+ Long count = new DatabaseTemplate(_environment, databaseName, null).call(operation);
+ return count.longValue();
+ }
+
+}
Added: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java?rev=1306244&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java Wed Mar 28 10:30:03 2012
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import junit.framework.TestCase;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+
+public class DatabaseTemplateTest extends TestCase
+{
+ private static final String SOURCE_DATABASE = "sourceDatabase";
+ private Environment _environment;
+ private Database _sourceDatabase;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _environment = mock(Environment.class);
+ _sourceDatabase = mock(Database.class);
+ when(_environment.openDatabase(any(Transaction.class), same(SOURCE_DATABASE), isA(DatabaseConfig.class)))
+ .thenReturn(_sourceDatabase);
+ }
+
+ public void testExecuteWithTwoDatabases()
+ {
+ String targetDatabaseName = "targetDatabase";
+ Database targetDatabase = mock(Database.class);
+
+ Transaction txn = mock(Transaction.class);
+
+ when(_environment.openDatabase(same(txn), same(targetDatabaseName), isA(DatabaseConfig.class)))
+ .thenReturn(targetDatabase);
+
+ DatabaseTemplate databaseTemplate = new DatabaseTemplate(_environment, SOURCE_DATABASE, targetDatabaseName, txn);
+
+ DatabaseRunnable databaseOperation = mock(DatabaseRunnable.class);
+ databaseTemplate.run(databaseOperation);
+
+ verify(databaseOperation).run(_sourceDatabase, targetDatabase, txn);
+ verify(_sourceDatabase).close();
+ verify(targetDatabase).close();
+ }
+
+ public void testExecuteWithOneDatabases()
+ {
+ DatabaseTemplate databaseTemplate = new DatabaseTemplate(_environment, SOURCE_DATABASE, null, null);
+
+ DatabaseRunnable databaseOperation = mock(DatabaseRunnable.class);
+ databaseTemplate.run(databaseOperation);
+
+ verify(databaseOperation).run(_sourceDatabase, (Database)null, (Transaction)null);
+ verify(_sourceDatabase).close();
+ }
+
+}
Added: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java?rev=1306244&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java Wed Mar 28 10:30:03 2012
@@ -0,0 +1,299 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingRecord;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingTuple;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.MessageContentKey;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.MessageContentKeyBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueEntryKeyBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueRecord;
+
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Transaction;
+
+public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
+{
+ private static final String NON_DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
+ private static final String DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.QUEUE_NAME;
+ private static final String DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR = "clientid:mySelectorDurSubName";
+ private static final String DURABLE_SUBSCRIPTION_QUEUE = "clientid:myDurSubName";
+ private static final String EXCHANGE_DB_NAME = "exchangeDb_v5";
+ private static final String MESSAGE_META_DATA_DB_NAME = "messageMetaDataDb_v5";
+ private static final String MESSAGE_CONTENT_DB_NAME = "messageContentDb_v5";
+ private static final String DELIVERY_DB_NAME = "deliveryDb_v5";
+ private static final String BINDING_DB_NAME = "queueBindingsDb_v5";
+
+ @Override
+ protected String getStoreDirectoryName()
+ {
+ return "bdbstore-v4";
+ }
+
+ public void testPerformUpgradeWithHandlerAnsweringYes() throws Exception
+ {
+ UpgradeFrom4To5 upgrade = new UpgradeFrom4To5();
+ upgrade.performUpgrade(LOG_SUBJECT, _environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES));
+
+ assertQueues(new HashSet<String>(Arrays.asList(QUEUE_NAMES)));
+
+ assertDatabaseRecordCount(DELIVERY_DB_NAME, TOTAL_MESSAGE_NUMBER);
+ assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, TOTAL_MESSAGE_NUMBER);
+ assertDatabaseRecordCount(EXCHANGE_DB_NAME, TOTAL_EXCHANGES);
+
+ for (int i = 0; i < QUEUE_SIZES.length; i++)
+ {
+ assertQueueMessages(QUEUE_NAMES[i], QUEUE_SIZES[i]);
+ }
+
+ final List<BindingRecord> queueBindings = loadBindings();
+
+ assertEquals("Unxpected list size", TOTAL_BINDINGS, queueBindings.size());
+ assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", BDBStoreUpgradeTestPreparer.TOPIC_NAME, "");
+ assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic",
+ BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'");
+ assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null);
+ assertBindingRecord(queueBindings, NON_DURABLE_QUEUE, "amq.direct", NON_DURABLE_QUEUE, null);
+ assertContent();
+ }
+
+ public void testPerformUpgradeWithHandlerAnsweringNo() throws Exception
+ {
+ UpgradeFrom4To5 upgrade = new UpgradeFrom4To5();
+ upgrade.performUpgrade(LOG_SUBJECT, _environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO));
+ assertQueues(new HashSet<String>(Arrays.asList(DURABLE_SUBSCRIPTION_QUEUE, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, DURABLE_QUEUE)));
+
+ assertDatabaseRecordCount(DELIVERY_DB_NAME, 12);
+ assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, 12);
+ assertDatabaseRecordCount(EXCHANGE_DB_NAME, TOTAL_EXCHANGES);
+
+ assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE, 1);
+ assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, 1);
+ assertQueueMessages(DURABLE_QUEUE, 10);
+
+ final List<BindingRecord> queueBindings = loadBindings();
+
+ assertEquals("Unxpected list size", TOTAL_BINDINGS - 2, queueBindings.size());
+ assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", BDBStoreUpgradeTestPreparer.TOPIC_NAME,
+ "");
+ assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic",
+ BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'");
+ assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null);
+ assertContent();
+ }
+
+ private List<BindingRecord> loadBindings()
+ {
+ final BindingTuple bindingTuple = new BindingTuple();
+ final List<BindingRecord> queueBindings = new ArrayList<BindingRecord>();
+ CursorOperation databaseOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ BindingRecord bindingRecord = bindingTuple.entryToObject(key);
+
+ AMQShortString queueName = bindingRecord.getQueueName();
+ AMQShortString exchangeName = bindingRecord.getExchangeName();
+ AMQShortString routingKey = bindingRecord.getRoutingKey();
+ FieldTable arguments = bindingRecord.getArguments();
+ queueBindings.add(new BindingRecord(exchangeName, queueName, routingKey, arguments));
+ }
+ };
+ new DatabaseTemplate(_environment, BINDING_DB_NAME, null).run(databaseOperation);
+ return queueBindings;
+ }
+
+ private void assertBindingRecord(List<BindingRecord> queueBindings, String queueName, String exchangeName,
+ String routingKey, String selectorKey)
+ {
+ BindingRecord record = null;
+ for (BindingRecord bindingRecord : queueBindings)
+ {
+ if (bindingRecord.getQueueName().asString().equals(queueName)
+ && bindingRecord.getExchangeName().asString().equals(exchangeName))
+ {
+ record = bindingRecord;
+ break;
+ }
+ }
+ assertNotNull("Binding is not found for queue " + queueName + " and exchange " + exchangeName, record);
+ assertEquals("Unexpected routing key", routingKey, record.getRoutingKey().asString());
+
+ if (selectorKey != null)
+ {
+ assertEquals("Unexpected selector key for " + queueName, selectorKey,
+ record.getArguments().get(AMQPFilterTypes.JMS_SELECTOR.getValue()));
+ }
+ }
+
+ private void assertQueueMessages(final String queueName, final int expectedQueueSize)
+ {
+ final Set<Long> messageIdsForQueue = assertDeliveriesForQueue(queueName, expectedQueueSize);
+
+ assertMetadataForQueue(queueName, expectedQueueSize, messageIdsForQueue);
+
+ assertContentForQueue(queueName, expectedQueueSize, messageIdsForQueue);
+ }
+
+ private Set<Long> assertDeliveriesForQueue(final String queueName, final int expectedQueueSize)
+ {
+ final QueueEntryKeyBinding queueEntryKeyBinding = new QueueEntryKeyBinding();
+ final AtomicInteger deliveryCounter = new AtomicInteger();
+ final Set<Long> messagesForQueue = new HashSet<Long>();
+
+ CursorOperation deliveryDatabaseOperation = new CursorOperation()
+ {
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ QueueEntryKey entryKey = queueEntryKeyBinding.entryToObject(key);
+ String thisQueueName = entryKey.getQueueName().asString();
+ if (thisQueueName.equals(queueName))
+ {
+ deliveryCounter.incrementAndGet();
+ messagesForQueue.add(entryKey.getMessageId());
+ }
+ }
+ };
+ new DatabaseTemplate(_environment, DELIVERY_DB_NAME, null).run(deliveryDatabaseOperation);
+
+ assertEquals("Unxpected number of entries in delivery db for queue " + queueName, expectedQueueSize,
+ deliveryCounter.get());
+
+ return messagesForQueue;
+ }
+
+ private void assertMetadataForQueue(final String queueName, final int expectedQueueSize,
+ final Set<Long> messageIdsForQueue)
+ {
+ final AtomicInteger metadataCounter = new AtomicInteger();
+ CursorOperation databaseOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ Long messageId = LongBinding.entryToLong(key);
+
+ boolean messageIsForTheRightQueue = messageIdsForQueue.contains(messageId);
+ if (messageIsForTheRightQueue)
+ {
+ metadataCounter.incrementAndGet();
+ }
+ }
+ };
+ new DatabaseTemplate(_environment, MESSAGE_META_DATA_DB_NAME, null).run(databaseOperation);
+
+ assertEquals("Unxpected number of entries in metadata db for queue " + queueName, expectedQueueSize,
+ metadataCounter.get());
+ }
+
+ private void assertContentForQueue(String queueName, int expectedQueueSize, final Set<Long> messageIdsForQueue)
+ {
+ final AtomicInteger contentCounter = new AtomicInteger();
+ final MessageContentKeyBinding keyBinding = new MessageContentKeyBinding();
+ CursorOperation cursorOperation = new CursorOperation()
+ {
+ private long _prevMsgId = -1;
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ MessageContentKey contentKey = keyBinding.entryToObject(key);
+ long msgId = contentKey.getMessageId();
+
+ if (_prevMsgId != msgId && messageIdsForQueue.contains(msgId))
+ {
+ contentCounter.incrementAndGet();
+ }
+
+ _prevMsgId = msgId;
+ }
+ };
+ new DatabaseTemplate(_environment, MESSAGE_CONTENT_DB_NAME, null).run(cursorOperation);
+
+ assertEquals("Unxpected number of entries in content db for queue " + queueName, expectedQueueSize,
+ contentCounter.get());
+ }
+
+ private void assertQueues(Set<String> expectedQueueNames)
+ {
+ List<AMQShortString> durableSubNames = new ArrayList<AMQShortString>();
+ final UpgradeFrom4To5.QueueRecordBinding binding = new UpgradeFrom4To5.QueueRecordBinding(durableSubNames);
+ final Set<String> actualQueueNames = new HashSet<String>();
+
+ CursorOperation queueNameCollector = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ QueueRecord record = binding.entryToObject(value);
+ String queueName = record.getNameShortString().asString();
+ actualQueueNames.add(queueName);
+ }
+ };
+ new DatabaseTemplate(_environment, "queueDb_v5", null).run(queueNameCollector);
+
+ assertEquals("Unexpected queue names", expectedQueueNames, actualQueueNames);
+ }
+
+ private void assertContent()
+ {
+ final UpgradeFrom4To5.ContentBinding contentBinding = new UpgradeFrom4To5.ContentBinding();
+ CursorOperation contentCursorOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key,
+ DatabaseEntry value)
+ {
+ long id = LongBinding.entryToLong(key);
+ assertTrue("Unexpected id", id > 0);
+ ByteBuffer content = contentBinding.entryToObject(value);
+ assertNotNull("Unexpected content", content);
+ }
+ };
+ new DatabaseTemplate(_environment, MESSAGE_CONTENT_DB_NAME, null).run(contentCursorOperation);
+ }
+}
Added: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java?rev=1306244&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java Wed Mar 28 10:30:03 2012
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewDataBinding;
+
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Transaction;
+
+public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
+{
+ private static final Logger _logger = Logger.getLogger(UpgradeFrom5To6Test.class);
+
+ @Override
+ protected String getStoreDirectoryName()
+ {
+ return "bdbstore-v5";
+ }
+
+ public void testPerformUpgrade() throws Exception
+ {
+ UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
+ upgrade.performUpgrade(LOG_SUBJECT, _environment, UpgradeInteractionHandler.DEFAULT_HANDLER);
+
+ assertDatabaseRecordCounts();
+ assertContent();
+ }
+
+ public void testPerformUpgradeWithMissingMessageChunkKeepsIncompleteMessage() throws Exception
+ {
+ corruptDatabase();
+
+ UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
+ upgrade.performUpgrade(LOG_SUBJECT, _environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES));
+
+ assertDatabaseRecordCounts();
+ }
+
+ public void testPerformUpgradeWithMissingMessageChunkDiscardsIncompleteMessage() throws Exception
+ {
+ corruptDatabase();
+
+ UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
+
+ UpgradeInteractionHandler discardMessageInteractionHandler = new StaticAnswerHandler(UpgradeInteractionResponse.NO);
+
+ upgrade.performUpgrade(LOG_SUBJECT, _environment, discardMessageInteractionHandler);
+
+ assertDatabaseRecordCount("MESSAGE_METADATA", 11);
+ assertDatabaseRecordCount("MESSAGE_CONTENT", 11);
+ }
+
+ /**
+ * modify the chunk offset of a message to be wrong, so we can test logic
+ * that preserves incomplete messages
+ */
+ private void corruptDatabase()
+ {
+ CursorOperation cursorOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ CompoundKeyBinding binding = new CompoundKeyBinding();
+ CompoundKey originalCompoundKey = binding.entryToObject(key);
+ int corruptedOffset = originalCompoundKey.getOffset() + 2;
+ CompoundKey corruptedCompoundKey = new CompoundKey(originalCompoundKey.getMessageId(), corruptedOffset);
+ DatabaseEntry newKey = new DatabaseEntry();
+ binding.objectToEntry(corruptedCompoundKey, newKey);
+
+ _logger.info("Deliberately corrupted message id " + originalCompoundKey.getMessageId()
+ + ", changed offset from " + originalCompoundKey.getOffset() + " to "
+ + corruptedCompoundKey.getOffset());
+
+ deleteCurrent();
+ sourceDatabase.put(transaction, newKey, value);
+
+ abort();
+ }
+ };
+
+ Transaction transaction = _environment.beginTransaction(null, null);
+ new DatabaseTemplate(_environment, "messageContentDb_v5", transaction).run(cursorOperation);
+ transaction.commit();
+ }
+
+ private void assertDatabaseRecordCounts()
+ {
+ assertDatabaseRecordCount("EXCHANGES", 5);
+ assertDatabaseRecordCount("QUEUES", 3);
+ assertDatabaseRecordCount("QUEUE_BINDINGS", 6);
+ assertDatabaseRecordCount("DELIVERIES", 12);
+
+ assertDatabaseRecordCount("MESSAGE_METADATA", 12);
+ assertDatabaseRecordCount("MESSAGE_CONTENT", 12);
+ }
+
+ private void assertContent()
+ {
+ final NewDataBinding contentBinding = new NewDataBinding();
+ CursorOperation contentCursorOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key,
+ DatabaseEntry value)
+ {
+ long id = LongBinding.entryToLong(key);
+ assertTrue("Unexpected id", id > 0);
+ byte[] content = contentBinding.entryToObject(value);
+ assertNotNull("Unexpected content", content);
+ }
+ };
+ new DatabaseTemplate(_environment, "MESSAGE_CONTENT", null).run(contentCursorOperation);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org