You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/03/28 12:30:06 UTC

svn commit: r1306244 [3/4] - in /qpid/branches/java-config-and-management/qpid/java: ./ bdbstore/ bdbstore/bin/ bdbstore/src/main/java/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/...

Copied: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java (from r1306213, qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java?p2=qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java&p1=qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java&r1=1306213&r2=1306244&rev=1306244&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java Wed Mar 28 10:30:03 2012
@@ -18,28 +18,20 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.upgrade;
 
-import com.sleepycat.bind.tuple.TupleBinding;
-
-import org.apache.qpid.server.store.berkeleydb.records.BindingRecord;
-
-public class BindingTupleBindingFactory extends TupleBindingFactory<BindingRecord>
+public interface UpgradeInteractionHandler
 {
-    public BindingTupleBindingFactory(int version)
-    {
-        super(version);
-    }
+    UpgradeInteractionResponse requireResponse(String question, UpgradeInteractionResponse defaultResponse,
+                                               UpgradeInteractionResponse... possibleResponses);
 
-    public TupleBinding<BindingRecord> getInstance()
+    public static final UpgradeInteractionHandler DEFAULT_HANDLER = new UpgradeInteractionHandler()
     {
-        switch (getVersion())
+        public UpgradeInteractionResponse requireResponse(final String question,
+                                                          final UpgradeInteractionResponse defaultResponse,
+                                                          final UpgradeInteractionResponse... possibleResponses)
         {
-            default:
-            case 5:
-                //no change from v4
-            case 4:
-                return new BindingTuple_4();
+            return defaultResponse;
         }
-    }
+    };
 }

Copied: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionResponse.java (from r1306213, qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionResponse.java?p2=qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionResponse.java&p1=qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java&r1=1306213&r2=1306244&rev=1306244&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionResponse.java Wed Mar 28 10:30:03 2012
@@ -18,8 +18,11 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.upgrade;
 
-public interface BindingTuple
+public enum UpgradeInteractionResponse
 {
+    YES,
+    NO,
+    ABORT
 }

Added: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java?rev=1306244&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java Wed Mar 28 10:30:03 2012
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+
+public class Upgrader
+{
+    static final String VERSION_DB_NAME = "VERSION";
+
+    private Environment _environment;
+    private LogSubject _logSubject;
+
+    public Upgrader(Environment environment, LogSubject logSubject)
+    {
+        _environment = environment;
+        _logSubject = logSubject;
+    }
+
+    public void upgradeIfNecessary() throws AMQStoreException
+    {
+        boolean isEmpty = _environment.getDatabaseNames().isEmpty();
+        DatabaseConfig dbConfig = new DatabaseConfig();
+        dbConfig.setTransactional(true);
+        dbConfig.setAllowCreate(true);
+
+        Database versionDb = null;
+        try
+        {
+            versionDb = _environment.openDatabase(null, VERSION_DB_NAME, dbConfig);
+
+            if(versionDb.count() == 0L)
+            {
+                int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion();
+                DatabaseEntry key = new DatabaseEntry();
+                IntegerBinding.intToEntry(sourceVersion, key);
+                DatabaseEntry value = new DatabaseEntry();
+                LongBinding.longToEntry(System.currentTimeMillis(), value);
+
+                versionDb.put(null, key, value);
+            }
+
+            int version = getSourceVersion(versionDb);
+
+            performUpgradeFromVersion(version, versionDb);
+        }
+        finally
+        {
+            if (versionDb != null)
+            {
+                versionDb.close();
+            }
+        }
+    }
+
+    int getSourceVersion(Database versionDb)
+    {
+        int version = BDBMessageStore.VERSION + 1;
+        OperationStatus result;
+
+        do
+        {
+            version--;
+            DatabaseEntry key = new DatabaseEntry();
+            IntegerBinding.intToEntry(version, key);
+            DatabaseEntry value = new DatabaseEntry();
+
+            result = versionDb.get(null, key, value, LockMode.READ_COMMITTED);
+        }
+        while(result == OperationStatus.NOTFOUND);
+        return version;
+    }
+
+    void performUpgradeFromVersion(int sourceVersion, Database versionDb)
+            throws AMQStoreException
+    {
+        while(sourceVersion != BDBMessageStore.VERSION)
+        {
+            upgrade(sourceVersion, ++sourceVersion);
+            DatabaseEntry key = new DatabaseEntry();
+            IntegerBinding.intToEntry(sourceVersion, key);
+            DatabaseEntry value = new DatabaseEntry();
+            LongBinding.longToEntry(System.currentTimeMillis(), value);
+            versionDb.put(null, key, value);
+        }
+    }
+
+    void upgrade(final int fromVersion, final int toVersion) throws AMQStoreException
+    {
+        try
+        {
+            @SuppressWarnings("unchecked")
+            Class<StoreUpgrade> upgradeClass =
+                    (Class<StoreUpgrade>) Class.forName("org.apache.qpid.server.store.berkeleydb.upgrade."
+                                                        + "UpgradeFrom"+fromVersion+"To"+toVersion);
+            Constructor<StoreUpgrade> ctr = upgradeClass.getConstructor();
+            StoreUpgrade upgrade = ctr.newInstance();
+            upgrade.performUpgrade(_logSubject, _environment, UpgradeInteractionHandler.DEFAULT_HANDLER);
+        }
+        catch (ClassNotFoundException e)
+        {
+            throw new AMQStoreException("Unable to upgrade BDB data store from version " + fromVersion + " to version"
+                                        + toVersion, e);
+        }
+        catch (NoSuchMethodException e)
+        {
+            throw new AMQStoreException("Unable to upgrade BDB data store from version " + fromVersion + " to version"
+                                        + toVersion, e);
+        }
+        catch (InvocationTargetException e)
+        {
+            throw new AMQStoreException("Unable to upgrade BDB data store from version " + fromVersion + " to version"
+                                        + toVersion, e);
+        }
+        catch (InstantiationException e)
+        {
+            throw new AMQStoreException("Unable to upgrade BDB data store from version " + fromVersion + " to version"
+                                        + toVersion, e);
+        }
+        catch (IllegalAccessException e)
+        {
+            throw new AMQStoreException("Unable to upgrade BDB data store from version " + fromVersion + " to version"
+                                        + toVersion, e);
+        }
+    }
+
+    private int identifyOldStoreVersion() throws DatabaseException
+    {
+        int version = 0;
+        for (String databaseName : _environment.getDatabaseNames())
+        {
+            if (databaseName.contains("_v"))
+            {
+                int versionIndex = databaseName.indexOf("_v");
+                if (versionIndex == -1)
+                {
+                    versionIndex = 1;
+                }
+                version = Integer.parseInt(databaseName.substring(versionIndex + 2));
+                break;
+            }
+        }
+        return version;
+    }
+}

Modified: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1306244&r1=1306243&r2=1306244&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Wed Mar 28 10:30:03 2012
@@ -58,9 +58,11 @@ import java.util.List;
  */
 public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest
 {
+    private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+
     /**
-     * Tests that message metadata and content are successfully read back from a 
-     * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to 
+     * Tests that message metadata and content are successfully read back from a
+     * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
      * verify their ability to co-exist within the store and be successful retrieved.
      */
     public void testBDBMessagePersistence() throws Exception
@@ -73,10 +75,10 @@ public class BDBMessageStoreTest extends
         // Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
         // Use a single chunk for the 0-10 message as per broker behaviour.
         String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
-        
+
         ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
         ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes());
-        
+
         ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes());
         int bodySize = completeContentBody_0_10.limit();
 
@@ -100,12 +102,12 @@ public class BDBMessageStoreTest extends
 
         /*
          * Create and insert a 0-10 message (metadata and content)
-         */        
+         */
         MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize);
         DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10();
         Header header_0_10 = new Header(delProps_0_10, msgProps_0_10);
 
-        MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, 
+        MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT,
                 MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
 
         MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10);
@@ -190,14 +192,14 @@ public class BDBMessageStoreTest extends
     private DeliveryProperties createDeliveryProperties_0_10()
     {
         DeliveryProperties delProps_0_10 = new DeliveryProperties();
-        
+
         delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
         delProps_0_10.setImmediate(true);
         delProps_0_10.setExchange("exchange12345");
         delProps_0_10.setRoutingKey("routingKey12345");
         delProps_0_10.setExpiration(5);
         delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE);
-        
+
         return delProps_0_10;
     }
 
@@ -207,14 +209,14 @@ public class BDBMessageStoreTest extends
         msgProps_0_10.setContentLength(bodySize);
         msgProps_0_10.setCorrelationId("qwerty".getBytes());
         msgProps_0_10.setContentType("text/html");
-        
+
         return msgProps_0_10;
     }
 
-    /** 
+    /**
      * Close the provided store and create a new (read-only) store to read back the data.
-     * 
-     * Use this method instead of reloading the virtual host like other tests in order 
+     *
+     * Use this method instead of reloading the virtual host like other tests in order
      * to avoid the recovery handler deleting the message for not being on a queue.
      */
     private BDBMessageStore reloadStoreReadOnly(BDBMessageStore messageStore) throws Exception
@@ -275,7 +277,61 @@ public class BDBMessageStoreTest extends
         props.getHeaders().setString("Test", "MST");
         return props;
     }
-    
+
+    public void testGetContentWithOffset() throws Exception
+    {
+        MessageStore store = getVirtualHost().getMessageStore();
+        BDBMessageStore bdbStore = assertBDBStore(store);
+        StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
+        long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+
+        // normal case: offset is 0
+        ByteBuffer dst = ByteBuffer.allocate(10);
+        int length = bdbStore.getContent(messageid_0_8, 0, dst);
+        assertEquals("Unexpected length", CONTENT_BYTES.length, length);
+        byte[] array = dst.array();
+        assertTrue("Unexpected content", Arrays.equals(CONTENT_BYTES, array));
+
+        // offset is in the middle
+        dst = ByteBuffer.allocate(10);
+        length = bdbStore.getContent(messageid_0_8, 5, dst);
+        assertEquals("Unexpected length", 5, length);
+        array = dst.array();
+        byte[] expected = new byte[10];
+        System.arraycopy(CONTENT_BYTES, 5, expected, 0, 5);
+        assertTrue("Unexpected content", Arrays.equals(expected, array));
+
+        // offset beyond the content length
+        dst = ByteBuffer.allocate(10);
+        try
+        {
+            bdbStore.getContent(messageid_0_8, 15, dst);
+            fail("Should fail for the offset greater than message size");
+        }
+        catch (RuntimeException e)
+        {
+            assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id "
+                    + messageid_0_8 + "!", e.getMessage());
+        }
+
+        // buffer is smaller then message size
+        dst = ByteBuffer.allocate(5);
+        length = bdbStore.getContent(messageid_0_8, 0, dst);
+        assertEquals("Unexpected length", 5, length);
+        array = dst.array();
+        expected = new byte[5];
+        System.arraycopy(CONTENT_BYTES, 0, expected, 0, 5);
+        assertTrue("Unexpected content", Arrays.equals(expected, array));
+
+        // buffer is smaller then message size, offset is not 0
+        dst = ByteBuffer.allocate(5);
+        length = bdbStore.getContent(messageid_0_8, 2, dst);
+        assertEquals("Unexpected length", 5, length);
+        array = dst.array();
+        expected = new byte[5];
+        System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5);
+        assertTrue("Unexpected content", Arrays.equals(expected, array));
+    }
     /**
      * Tests that messages which are added to the store and then removed using the
      * public MessageStore interfaces are actually removed from the store by then
@@ -287,11 +343,10 @@ public class BDBMessageStoreTest extends
         MessageStore store = getVirtualHost().getMessageStore();
         BDBMessageStore bdbStore = assertBDBStore(store);
 
-        StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreMultiChunkMessage_0_8(store);
+        StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
         long messageid_0_8 = storedMessage_0_8.getMessageNumber();
-        
-        //remove the message in the fashion the broker normally would
-        storedMessage_0_8.remove();
+
+        bdbStore.removeMessage(messageid_0_8, true);
 
         //verify the removal using the BDB store implementation methods directly
         try
@@ -308,10 +363,10 @@ public class BDBMessageStoreTest extends
         //expecting no content, allocate a 1 byte
         ByteBuffer dst = ByteBuffer.allocate(1);
 
-        assertEquals("Retrieved content when none was expected", 
+        assertEquals("Retrieved content when none was expected",
                         0, bdbStore.getContent(messageid_0_8, 0, dst));
     }
-    
+
     private BDBMessageStore assertBDBStore(Object store)
     {
         if(!(store instanceof BDBMessageStore))
@@ -322,15 +377,11 @@ public class BDBMessageStoreTest extends
         return (BDBMessageStore) store;
     }
 
-    private StoredMessage<MessageMetaData> createAndStoreMultiChunkMessage_0_8(MessageStore store)
+    private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
     {
-        byte[] body10Bytes = "0123456789".getBytes();
-        byte[] body5Bytes = "01234".getBytes();
-
-        ByteBuffer chunk1 = ByteBuffer.wrap(body10Bytes);
-        ByteBuffer chunk2 = ByteBuffer.wrap(body5Bytes);
+        ByteBuffer chunk1 = ByteBuffer.wrap(CONTENT_BYTES);
 
-        int bodySize = body10Bytes.length + body5Bytes.length;
+        int bodySize = CONTENT_BYTES.length;
 
         //create and store the message using the MessageStore interface
         MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
@@ -342,7 +393,6 @@ public class BDBMessageStoreTest extends
         StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
 
         storedMessage_0_8.addContent(0, chunk1);
-        storedMessage_0_8.addContent(chunk1.limit(), chunk2);
         storedMessage_0_8.flushToStore();
 
         return storedMessage_0_8;
@@ -360,7 +410,7 @@ public class BDBMessageStoreTest extends
         BDBMessageStore bdbStore = assertBDBStore(log);
 
         final AMQShortString mockQueueName = new AMQShortString("queueName");
-        
+
         TransactionLogResource mockQueue = new TransactionLogResource()
         {
             public String getResourceName()
@@ -368,27 +418,27 @@ public class BDBMessageStoreTest extends
                 return mockQueueName.asString();
             }
         };
-        
+
         MessageStore.Transaction txn = log.newTransaction();
-        
+
         txn.enqueueMessage(mockQueue, new MockMessage(1L));
         txn.enqueueMessage(mockQueue, new MockMessage(5L));
         txn.commitTran();
 
         List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
-        
+
         assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
         Long val = enqueuedIds.get(0);
         assertEquals("First Message is incorrect", 1L, val.longValue());
         val = enqueuedIds.get(1);
         assertEquals("Second Message is incorrect", 5L, val.longValue());
     }
-    
-    
+
+
     /**
-     * Tests transaction rollback before a commit has occurred by utilising the 
-     * enqueue and dequeue methods available in the TransactionLog interface 
-     * implemented by the store, and verifying the behaviour using BDB 
+     * Tests transaction rollback before a commit has occurred by utilising the
+     * enqueue and dequeue methods available in the TransactionLog interface
+     * implemented by the store, and verifying the behaviour using BDB
      * implementation methods.
      */
     public void testTranRollbackBeforeCommit() throws Exception
@@ -398,7 +448,7 @@ public class BDBMessageStoreTest extends
         BDBMessageStore bdbStore = assertBDBStore(log);
 
         final AMQShortString mockQueueName = new AMQShortString("queueName");
-        
+
         TransactionLogResource mockQueue = new TransactionLogResource()
         {
             public String getResourceName()
@@ -406,30 +456,30 @@ public class BDBMessageStoreTest extends
                 return mockQueueName.asString();
             }
         };
-        
+
         MessageStore.Transaction txn = log.newTransaction();
-        
+
         txn.enqueueMessage(mockQueue, new MockMessage(21L));
         txn.abortTran();
-        
+
         txn = log.newTransaction();
         txn.enqueueMessage(mockQueue, new MockMessage(22L));
         txn.enqueueMessage(mockQueue, new MockMessage(23L));
         txn.commitTran();
 
         List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
-        
+
         assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
         Long val = enqueuedIds.get(0);
         assertEquals("First Message is incorrect", 22L, val.longValue());
         val = enqueuedIds.get(1);
         assertEquals("Second Message is incorrect", 23L, val.longValue());
     }
-    
+
     /**
-     * Tests transaction rollback after a commit has occurred by utilising the 
-     * enqueue and dequeue methods available in the TransactionLog interface 
-     * implemented by the store, and verifying the behaviour using BDB 
+     * Tests transaction rollback after a commit has occurred by utilising the
+     * enqueue and dequeue methods available in the TransactionLog interface
+     * implemented by the store, and verifying the behaviour using BDB
      * implementation methods.
      */
     public void testTranRollbackAfterCommit() throws Exception
@@ -439,7 +489,7 @@ public class BDBMessageStoreTest extends
         BDBMessageStore bdbStore = assertBDBStore(log);
 
         final AMQShortString mockQueueName = new AMQShortString("queueName");
-        
+
         TransactionLogResource mockQueue = new TransactionLogResource()
         {
             public String getResourceName()
@@ -447,22 +497,22 @@ public class BDBMessageStoreTest extends
                 return mockQueueName.asString();
             }
         };
-        
+
         MessageStore.Transaction txn = log.newTransaction();
-        
+
         txn.enqueueMessage(mockQueue, new MockMessage(30L));
         txn.commitTran();
 
         txn = log.newTransaction();
         txn.enqueueMessage(mockQueue, new MockMessage(31L));
         txn.abortTran();
-        
+
         txn = log.newTransaction();
         txn.enqueueMessage(mockQueue, new MockMessage(32L));
         txn.commitTran();
-        
+
         List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
-        
+
         assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
         Long val = enqueuedIds.get(0);
         assertEquals("First Message is incorrect", 30L, val.longValue());
@@ -470,6 +520,7 @@ public class BDBMessageStoreTest extends
         assertEquals("Second Message is incorrect", 32L, val.longValue());
     }
 
+    @SuppressWarnings("rawtypes")
     private static class MockMessage implements ServerMessage, EnqueableMessage
     {
         private long _messageId;

Modified: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java?rev=1306244&r1=1306243&r2=1306244&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java Wed Mar 28 10:30:03 2012
@@ -20,19 +20,36 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
 import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.url.URLSyntaxException;
 
-import javax.jms.*;
-
 /**
- * Prepares an older version brokers BDB store with the required 
+ * Prepares an older version brokers BDB store with the required
  * contents for use in the BDBStoreUpgradeTest.
  *
  * NOTE: Must be used with the equivalent older version client!
  *
- * The store will then be used to verify that the upgraded is 
- * completed properly and that once upgraded it functions as 
+ * The store will then be used to verify that the upgraded is
+ * completed properly and that once upgraded it functions as
  * expected with the new broker.
  *
  */
@@ -43,9 +60,10 @@ public class BDBStoreUpgradeTestPreparer
     public static final String SELECTOR_SUB_NAME="mySelectorDurSubName";
     public static final String SELECTOR_TOPIC_NAME="mySelectorUpgradeTopic";
     public static final String QUEUE_NAME="myUpgradeQueue";
+    public static final String NON_DURABLE_QUEUE_NAME="queue-non-durable";
 
     private static AMQConnectionFactory _connFac;
-    private static final String CONN_URL = 
+    private static final String CONN_URL =
         "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'";
 
     /**
@@ -59,14 +77,28 @@ public class BDBStoreUpgradeTestPreparer
     private void prepareBroker() throws Exception
     {
         prepareQueues();
+        prepareNonDurableQueue();
         prepareDurableSubscriptionWithSelector();
         prepareDurableSubscriptionWithoutSelector();
     }
 
+    private void prepareNonDurableQueue() throws Exception
+    {
+        Connection connection = _connFac.createConnection();
+        AMQSession<?, ?> session = (AMQSession<?,?>)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        AMQShortString queueName = AMQShortString.valueOf(NON_DURABLE_QUEUE_NAME);
+        AMQDestination destination = (AMQDestination) session.createQueue(NON_DURABLE_QUEUE_NAME);
+        session.sendCreateQueue(queueName, false, false, false, null);
+        session.bindQueue(queueName, queueName, null, AMQShortString.valueOf("amq.direct"), destination);
+        MessageProducer messageProducer = session.createProducer(destination);
+        sendMessages(session, messageProducer, destination, DeliveryMode.PERSISTENT, 1024, 3);
+        connection.close();
+    }
+
     /**
      * Prepare a queue for use in testing message and binding recovery
      * after the upgrade is performed.
-     * 
+     *
      * - Create a transacted session on the connection.
      * - Use a consumer to create the (durable by default) queue.
      * - Send 5 large messages to test (multi-frame) content recovery.
@@ -74,7 +106,7 @@ public class BDBStoreUpgradeTestPreparer
      * - Commit the session.
      * - Send 5 small messages to test that uncommitted messages are not recovered.
      *   following the upgrade.
-     * - Close the session.  
+     * - Close the session.
      */
     private void prepareQueues() throws Exception
     {
@@ -114,9 +146,9 @@ public class BDBStoreUpgradeTestPreparer
     }
 
     /**
-     * Prepare a DurableSubscription backing queue for use in testing selector 
+     * Prepare a DurableSubscription backing queue for use in testing selector
      * recovery and queue exclusivity marking during the upgrade process.
-     * 
+     *
      * - Create a transacted session on the connection.
      * - Open and close a DurableSubscription with selector to create the backing queue.
      * - Send a message which matches the selector.
@@ -145,7 +177,7 @@ public class BDBStoreUpgradeTestPreparer
         TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false);
         durSub1.close();
 
-        // Create a publisher and send a persistent message which matches the selector 
+        // Create a publisher and send a persistent message which matches the selector
         // followed by one that does not match, and another which matches but is not
         // committed and so should be 'lost'
         TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
@@ -202,7 +234,7 @@ public class BDBStoreUpgradeTestPreparer
         connection.close();
     }
 
-    public static void sendMessages(Session session, MessageProducer messageProducer, 
+    public static void sendMessages(Session session, MessageProducer messageProducer,
             Destination dest, int deliveryMode, int length, int numMesages) throws JMSException
     {
         for (int i = 1; i <= numMesages; i++)
@@ -213,7 +245,7 @@ public class BDBStoreUpgradeTestPreparer
         }
     }
 
-    public static void publishMessages(Session session, TopicPublisher publisher, 
+    public static void publishMessages(Session session, TopicPublisher publisher,
             Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException
     {
         for (int i = 1; i <= numMesages; i++)
@@ -227,8 +259,8 @@ public class BDBStoreUpgradeTestPreparer
 
     /**
      * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2.
-     * 
-     * @param length number of characters in the string 
+     *
+     * @param length number of characters in the string
      * @return string sequence of the given length
      */
     public static String generateString(int length)
@@ -248,6 +280,7 @@ public class BDBStoreUpgradeTestPreparer
      */
     public static void main(String[] args) throws Exception
     {
+        System.setProperty("qpid.dest_syntax", "BURL");
         BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer();
         producer.prepareBroker();
     }

Modified: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java?rev=1306244&r1=1306243&r2=1306244&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java Wed Mar 28 10:30:03 2012
@@ -20,36 +20,14 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.je.DatabaseEntry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.management.common.mbeans.ManagedQueue;
-import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
-import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.util.FileUtils;
-
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
 import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
 import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_SUB_NAME;
 import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
+
+import java.io.File;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -64,18 +42,18 @@ import javax.jms.TopicConnection;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
+
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Tests upgrading a BDB store and using it with the new broker 
- * after the required contents are entered into the store using 
- * an old broker with the BDBStoreUpgradeTestPreparer. The store
- * will then be used to verify that the upgraded is completed 
- * properly and that once upgraded it functions as expected with 
- * the new broker.
+ * Tests upgrading a BDB store on broker startup.
+ * The store will then be used to verify that the upgrade is completed
+ * properly and that once upgraded it functions as expected.
  */
 public class BDBUpgradeTest extends QpidBrokerTestCase
 {
@@ -84,73 +62,31 @@ public class BDBUpgradeTest extends Qpid
     private static final String STRING_1024 = BDBStoreUpgradeTestPreparer.generateString(1024);
     private static final String STRING_1024_256 = BDBStoreUpgradeTestPreparer.generateString(1024*256);
     private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK");
-    private static final String QPID_HOME = System.getProperty("QPID_HOME");
-    private static final int VERSION_4 = 4;
 
-    private String _fromDir;
-    private String _toDir;
-    private String _toDirTwice;
+    private String _storeLocation;
 
     @Override
     public void setUp() throws Exception
     {
         assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG);
-        assertNotNull("QPID_HOME must be set", QPID_HOME);
-
-        _fromDir = QPID_HOME + "/bdbstore-to-upgrade/test-store";
-        _toDir = getWorkDirBaseDir() + "/bdbstore/test-store";
-        _toDirTwice = getWorkDirBaseDir() + "/bdbstore-upgraded-twice";
+        _storeLocation = QPID_WORK_ORIG + "/bdbstore/test-store";
 
         //Clear the two target directories if they exist.
-        File directory = new File(_toDir);
-        if (directory.exists() && directory.isDirectory())
-        {
-            FileUtils.delete(directory, true);
-        }
-        directory = new File(_toDirTwice);
+        File directory = new File(_storeLocation);
         if (directory.exists() && directory.isDirectory())
         {
             FileUtils.delete(directory, true);
         }
 
-        //Upgrade the test store.
-        upgradeBrokerStore(_fromDir, _toDir);
+        // copy store files
+        String src = getClass().getClassLoader().getResource("upgrade/bdbstore-v4/test-store").toURI().getPath();
+        FileUtils.copyRecursive(new File(src), new File(_storeLocation));
 
         //override the broker config used and then start the broker with the updated store
         _configFile = new File("build/etc/config-systests-bdb.xml");
         setConfigurationProperty("management.enabled", "true");
 
-        super.setUp();       
-    }
-
-    private String getWorkDirBaseDir()
-    {
-        return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort());
-    }
-
-    /**
-     * Tests that the core upgrade method of the store upgrade tool passes through the exception
-     * from the BDBMessageStore indicating that the data on disk can't be loaded as the previous
-     * version because it has already been upgraded.
-     * @throws Exception
-     */
-    public void testMultipleUpgrades() throws Exception
-    {
-        //stop the broker started by setUp() in order to allow the second upgrade attempt to proceed
-        stopBroker();
-
-        try
-        {
-            new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(VERSION_4);
-            fail("Second Upgrade Succeeded");
-        }
-        catch (Exception e)
-        {
-            System.err.println("Showing stack trace, we are expecting an 'Unable to load BDBStore' error");
-            e.printStackTrace();
-            assertTrue("Incorrect Exception Thrown:" + e.getMessage(),
-                    e.getMessage().contains("Unable to load BDBStore as version 4. Store on disk contains version 5 data"));
-        }
+        super.setUp();
     }
 
     /**
@@ -175,26 +111,26 @@ public class BDBUpgradeTest extends Qpid
         try
         {
             ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SELECTOR_SUB_NAME);
-            assertEquals("DurableSubscription backing queue should have 1 message on it initially", 
+            assertEquals("DurableSubscription backing queue should have 1 message on it initially",
                           new Integer(1), dursubQueue.getMessageCount());
-            
+
             // Create a connection and start it
             TopicConnection connection = (TopicConnection) getConnection();
             connection.start();
-            
+
             // Send messages which don't match and do match the selector, checking message count
-            TopicSession pubSession = connection.createTopicSession(true, org.apache.qpid.jms.Session.SESSION_TRANSACTED);
+            TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
             Topic topic = pubSession.createTopic(SELECTOR_TOPIC_NAME);
             TopicPublisher publisher = pubSession.createPublisher(topic);
-            
+
             BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
             pubSession.commit();
-            assertEquals("DurableSubscription backing queue should still have 1 message on it", 
+            assertEquals("DurableSubscription backing queue should still have 1 message on it",
                          Integer.valueOf(1), dursubQueue.getMessageCount());
-            
+
             BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
             pubSession.commit();
-            assertEquals("DurableSubscription backing queue should now have 2 messages on it", 
+            assertEquals("DurableSubscription backing queue should now have 2 messages on it",
                          Integer.valueOf(2), dursubQueue.getMessageCount());
 
             TopicSubscriber durSub = pubSession.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false);
@@ -240,7 +176,7 @@ public class BDBUpgradeTest extends Qpid
             connection.start();
 
             // Send new message matching the topic, checking message count
-            TopicSession session = connection.createTopicSession(true, org.apache.qpid.jms.Session.SESSION_TRANSACTED);
+            TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
             Topic topic = session.createTopic(TOPIC_NAME);
             TopicPublisher publisher = session.createPublisher(topic);
 
@@ -298,10 +234,10 @@ public class BDBUpgradeTest extends Qpid
 
     /**
      * Test that the upgraded queue continues to function properly when used
-     * for persistent messaging and restarting the broker. 
-     * 
+     * for persistent messaging and restarting the broker.
+     *
      * Sends the new messages to the queue BEFORE consuming those which were
-     * sent before the upgrade. In doing so, this also serves to test that 
+     * sent before the upgrade. In doing so, this also serves to test that
      * the queue bindings were successfully transitioned during the upgrade.
      */
     public void testBindingAndMessageDurabability() throws Exception
@@ -329,7 +265,7 @@ public class BDBUpgradeTest extends Qpid
     }
 
     /**
-     * Test that all of the committed persistent messages previously sent to 
+     * Test that all of the committed persistent messages previously sent to
      * the broker are properly received following update of the MetaData and
      * Content entries during the store upgrade process.
      */
@@ -349,200 +285,22 @@ public class BDBUpgradeTest extends Qpid
      *
      * @throws Exception
      */
-    public void testMigrationOfMessagesForNonExistingQueues() throws Exception
+    public void testMigrationOfMessagesForNonDurableQueues() throws Exception
     {
-        stopBroker();
-
-        // copy store data into a new location for adding of phantom message
-        File storeLocation = new File(_fromDir);
-        File target = new File(_toDirTwice);
-        if (!target.exists())
-        {
-            target.mkdirs();
-        }
-        FileUtils.copyRecursive(storeLocation, target);
-
-        // delete migrated data
-        File directory = new File(_toDir);
-        if (directory.exists() && directory.isDirectory())
-        {
-            FileUtils.delete(directory, true);
-        }
-
-        // test data
-        String nonExistingQueueName = getTestQueueName();
-        String messageText = "Test Phantom Message";
-
-        // add message
-        addMessageForNonExistingQueue(target, VERSION_4, nonExistingQueueName, messageText);
-
-        String[] inputs = { "Yes", "Yes", "Yes" };
-        upgradeBrokerStoreInInterractiveMode(_toDirTwice, _toDir, inputs);
-
-        // start broker
-        startBroker();
-
         // Create a connection and start it
         Connection connection = getConnection();
         connection.start();
 
         // consume a message for non-existing store
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(nonExistingQueueName);
+        Queue queue = session.createQueue(NON_DURABLE_QUEUE_NAME);
         MessageConsumer messageConsumer = session.createConsumer(queue);
-        Message message = messageConsumer.receive(1000);
-
-        // assert consumed message
-        assertNotNull("Message was not migrated!", message);
-        assertTrue("Unexpected message received!", message instanceof TextMessage);
-        String text = ((TextMessage) message).getText();
-        assertEquals("Message migration failed!", messageText, text);
-    }
-
-    /**
-     * An utility method to upgrade broker with simulation user interactions
-     *
-     * @param fromDir
-     *            location of the store to migrate
-     * @param toDir
-     *            location of where migrated data will be stored
-     * @param inputs
-     *            user answers on upgrade tool questions
-     * @throws Exception
-     */
-    private void upgradeBrokerStoreInInterractiveMode(String fromDir, String toDir, final String[] inputs)
-            throws Exception
-    {
-        // save to restore system.in after data migration
-        InputStream stdin = System.in;
-
-        // set fake system in to simulate user interactions
-        // FIXME: it is a quite dirty simulator of system input but it does the job
-        System.setIn(new InputStream()
-        {
-
-            private int counter = 0;
-
-            public synchronized int read(byte b[], int off, int len)
-            {
-                byte[] src = (inputs[counter] + "\n").getBytes();
-                System.arraycopy(src, 0, b, off, src.length);
-                counter++;
-                return src.length;
-            }
-
-            @Override
-            public int read() throws IOException
-            {
-                return -1;
-            }
-        });
-
-        try
-        {
-            // Upgrade the test store.
-            new BDBStoreUpgrade(fromDir, toDir, null, true, true).upgradeFromVersion(VERSION_4);
-        }
-        finally
-        {
-            // restore system in
-            System.setIn(stdin);
-        }
-    }
 
-    @SuppressWarnings("unchecked")
-    private void addMessageForNonExistingQueue(File storeLocation, int storeVersion, String nonExistingQueueName,
-            String messageText) throws Exception
-    {
-        final AMQShortString queueName = new AMQShortString(nonExistingQueueName);
-        BDBMessageStore store = new BDBMessageStore(storeVersion);
-        store.configure(storeLocation, false);
-        try
+        for (int i = 0; i < 3; i++)
         {
-            store.start();
-
-            // store message objects
-            ByteBuffer completeContentBody = ByteBuffer.wrap(messageText.getBytes("UTF-8"));
-            long bodySize = completeContentBody.limit();
-            MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new AMQShortString("amq.direct"), false,
-                    false, queueName);
-            BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-            props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
-            props.setContentType("text/plain");
-            props.setType("text/plain");
-            props.setMessageId("whatever");
-            props.setEncoding("UTF-8");
-            props.getHeaders().setString("Test", "MST");
-            MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
-            int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
-            ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize);
-
-            // add content entry to database
-            final long messageId = store.getNewMessageId();
-            TupleBinding<MessageContentKey> contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance();
-            MessageContentKey contentKey = null;
-            if (storeVersion == VERSION_4)
-            {
-                contentKey = new MessageContentKey_4(messageId, 0);
-            }
-            else
-            {
-                throw new Exception(storeVersion + " is not supported");
-            }
-            DatabaseEntry key = new DatabaseEntry();
-            contentKeyTB.objectToEntry(contentKey, key);
-            DatabaseEntry data = new DatabaseEntry();
-            ContentTB contentTB = new ContentTB();
-            contentTB.objectToEntry(completeContentBody, data);
-            store.getContentDb().put(null, key, data);
-
-            // add meta data entry to database
-            TupleBinding<Long> longTB = TupleBinding.getPrimitiveBinding(Long.class);
-            TupleBinding<Object> metaDataTB = new MessageMetaDataTupleBindingFactory(storeVersion).getInstance();
-            key = new DatabaseEntry();
-            data = new DatabaseEntry();
-            longTB.objectToEntry(new Long(messageId), key);
-            MessageMetaData metaData = new MessageMetaData(pubInfoBody, contentHeaderBody, 1);
-            metaDataTB.objectToEntry(metaData, data);
-            store.getMetaDataDb().put(null, key, data);
-
-            // add delivery entry to database
-            TransactionLogResource mockQueue = new TransactionLogResource()
-            {
-                public String getResourceName()
-                {
-                    return queueName.asString();
-                }
-            };
-
-            EnqueableMessage mockMessage = new EnqueableMessage()
-            {
-    
-                public long getMessageNumber()
-                {
-                    return messageId;
-                }
-
-                public boolean isPersistent()
-                {
-                    return true;
-                }
-
-                public StoredMessage getStoredMessage()
-                {
-                    return null;
-                }
-            };
-
-            MessageStore log = (MessageStore) store;
-            MessageStore.Transaction txn = log.newTransaction();
-            txn.enqueueMessage(mockQueue, mockMessage);
-            txn.commitTran();
-        }
-        finally
-        {
-            // close store
-            store.close();
+            Message message = messageConsumer.receive(1000);
+            assertNotNull("Message was not migrated!", message);
+            assertTrue("Unexpected message received!", message instanceof TextMessage);
         }
     }
 
@@ -564,7 +322,7 @@ public class BDBUpgradeTest extends Qpid
         }
 
 
-        // Retrieve the matching message 
+        // Retrieve the matching message
         Message m = durSub.receive(2000);
         assertNotNull("Failed to receive an expected message", m);
         if(selector)
@@ -623,8 +381,4 @@ public class BDBUpgradeTest extends Qpid
         session.close();
     }
 
-    private void upgradeBrokerStore(String fromDir, String toDir) throws Exception
-    {
-        new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(VERSION_4);
-    }
 }

Added: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java?rev=1306244&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java Wed Mar 28 10:30:03 2012
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import java.io.File;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.subjects.TestBlankSubject;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.Transaction;
+
+public abstract class AbstractUpgradeTestCase extends TestCase
+{
+    protected static final class StaticAnswerHandler implements UpgradeInteractionHandler
+    {
+        private UpgradeInteractionResponse _response;
+
+        public StaticAnswerHandler(UpgradeInteractionResponse response)
+        {
+            _response = response;
+        }
+
+        @Override
+        public UpgradeInteractionResponse requireResponse(String question, UpgradeInteractionResponse defaultResponse,
+                UpgradeInteractionResponse... possibleResponses)
+        {
+            return _response;
+        }
+    }
+
+    public static final String[] QUEUE_NAMES = { "clientid:myDurSubName", "clientid:mySelectorDurSubName", "myUpgradeQueue",
+            "queue-non-durable" };
+    public static int[] QUEUE_SIZES = { 1, 1, 10, 3 };
+    public static int TOTAL_MESSAGE_NUMBER = 15;
+    protected static final LogSubject LOG_SUBJECT = new TestBlankSubject();
+    protected static final String TMP_FOLDER = System.getProperty("java.io.tmpdir");
+
+    // one binding per exchange
+    protected static final int TOTAL_BINDINGS = QUEUE_NAMES.length * 2;
+    protected static final int TOTAL_EXCHANGES = 5;
+
+    private File _storeLocation;
+    protected Environment _environment;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _storeLocation = copyStore(getStoreDirectoryName());
+
+        _environment = createEnvironment(_storeLocation);
+    }
+
+    /** @return eg "bdbstore-v4" - used for copying store */
+    protected abstract String getStoreDirectoryName();
+
+    protected Environment createEnvironment(File storeLocation)
+    {
+        EnvironmentConfig envConfig = new EnvironmentConfig();
+        envConfig.setAllowCreate(true);
+        envConfig.setTransactional(true);
+        envConfig.setConfigParam("je.lock.nLockTables", "7");
+        envConfig.setReadOnly(false);
+        envConfig.setSharedCache(false);
+        envConfig.setCacheSize(0);
+        return new Environment(storeLocation, envConfig);
+    }
+
+    @Override
+    public void tearDown() throws Exception
+    {
+        try
+        {
+            _environment.close();
+        }
+        finally
+        {
+            _environment = null;
+            deleteDirectoryIfExists(_storeLocation);
+        }
+        super.tearDown();
+    }
+
+    private File copyStore(String storeDirectoryName) throws Exception
+    {
+        String src = getClass().getClassLoader().getResource("upgrade/" + storeDirectoryName).toURI().getPath();
+        File storeLocation = new File(new File(TMP_FOLDER), "test-store");
+        deleteDirectoryIfExists(storeLocation);
+        FileUtils.copyRecursive(new File(src), new File(TMP_FOLDER));
+        return storeLocation;
+    }
+
+    protected void deleteDirectoryIfExists(File dir)
+    {
+        if (dir.exists())
+        {
+            assertTrue("The provided file " + dir + " is not a directory", dir.isDirectory());
+
+            boolean deletedSuccessfully = FileUtils.delete(dir, true);
+
+            assertTrue("Files at '" + dir + "' should have been deleted", deletedSuccessfully);
+        }
+    }
+
+    protected void assertDatabaseRecordCount(String databaseName, final long expectedCountNumber)
+    {
+        long count = getDatabaseCount(databaseName);
+        assertEquals("Unexpected database '" + databaseName + "' entry number", expectedCountNumber, count);
+    }
+
+    protected long getDatabaseCount(String databaseName)
+    {
+        DatabaseCallable<Long> operation = new DatabaseCallable<Long>()
+        {
+
+            @Override
+            public Long call(Database sourceDatabase, Database targetDatabase, Transaction transaction)
+            {
+                return new Long(sourceDatabase.count());
+
+            }
+        };
+        Long count = new DatabaseTemplate(_environment, databaseName, null).call(operation);
+        return count.longValue();
+    }
+
+}

Added: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java?rev=1306244&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java Wed Mar 28 10:30:03 2012
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import junit.framework.TestCase;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+
+public class DatabaseTemplateTest extends TestCase
+{
+    private static final String SOURCE_DATABASE = "sourceDatabase";
+    private Environment _environment;
+    private Database _sourceDatabase;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _environment = mock(Environment.class);
+        _sourceDatabase = mock(Database.class);
+        when(_environment.openDatabase(any(Transaction.class), same(SOURCE_DATABASE), isA(DatabaseConfig.class)))
+                .thenReturn(_sourceDatabase);
+    }
+
+    public void testExecuteWithTwoDatabases()
+    {
+        String targetDatabaseName = "targetDatabase";
+        Database targetDatabase = mock(Database.class);
+
+        Transaction txn = mock(Transaction.class);
+
+        when(_environment.openDatabase(same(txn), same(targetDatabaseName), isA(DatabaseConfig.class)))
+                .thenReturn(targetDatabase);
+
+        DatabaseTemplate databaseTemplate = new DatabaseTemplate(_environment, SOURCE_DATABASE, targetDatabaseName, txn);
+
+        DatabaseRunnable databaseOperation = mock(DatabaseRunnable.class);
+        databaseTemplate.run(databaseOperation);
+
+        verify(databaseOperation).run(_sourceDatabase, targetDatabase, txn);
+        verify(_sourceDatabase).close();
+        verify(targetDatabase).close();
+    }
+
+    public void testExecuteWithOneDatabases()
+    {
+        DatabaseTemplate databaseTemplate = new DatabaseTemplate(_environment, SOURCE_DATABASE, null, null);
+
+        DatabaseRunnable databaseOperation = mock(DatabaseRunnable.class);
+        databaseTemplate.run(databaseOperation);
+
+        verify(databaseOperation).run(_sourceDatabase, (Database)null, (Transaction)null);
+        verify(_sourceDatabase).close();
+    }
+
+}

Added: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java?rev=1306244&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java Wed Mar 28 10:30:03 2012
@@ -0,0 +1,299 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingRecord;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingTuple;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.MessageContentKey;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.MessageContentKeyBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueEntryKeyBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueRecord;
+
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Transaction;
+
+public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
+{
+    private static final String NON_DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
+    private static final String DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.QUEUE_NAME;
+    private static final String DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR = "clientid:mySelectorDurSubName";
+    private static final String DURABLE_SUBSCRIPTION_QUEUE = "clientid:myDurSubName";
+    private static final String EXCHANGE_DB_NAME = "exchangeDb_v5";
+    private static final String MESSAGE_META_DATA_DB_NAME = "messageMetaDataDb_v5";
+    private static final String MESSAGE_CONTENT_DB_NAME = "messageContentDb_v5";
+    private static final String DELIVERY_DB_NAME = "deliveryDb_v5";
+    private static final String BINDING_DB_NAME = "queueBindingsDb_v5";
+
+    @Override
+    protected String getStoreDirectoryName()
+    {
+        return "bdbstore-v4";
+    }
+
+    public void testPerformUpgradeWithHandlerAnsweringYes() throws Exception
+    {
+        UpgradeFrom4To5 upgrade = new UpgradeFrom4To5();
+        upgrade.performUpgrade(LOG_SUBJECT, _environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES));
+
+        assertQueues(new HashSet<String>(Arrays.asList(QUEUE_NAMES)));
+
+        assertDatabaseRecordCount(DELIVERY_DB_NAME, TOTAL_MESSAGE_NUMBER);
+        assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, TOTAL_MESSAGE_NUMBER);
+        assertDatabaseRecordCount(EXCHANGE_DB_NAME, TOTAL_EXCHANGES);
+
+        for (int i = 0; i < QUEUE_SIZES.length; i++)
+        {
+            assertQueueMessages(QUEUE_NAMES[i], QUEUE_SIZES[i]);
+        }
+
+        final List<BindingRecord> queueBindings = loadBindings();
+
+        assertEquals("Unxpected list size", TOTAL_BINDINGS, queueBindings.size());
+        assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", BDBStoreUpgradeTestPreparer.TOPIC_NAME, "");
+        assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic",
+                BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'");
+        assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null);
+        assertBindingRecord(queueBindings, NON_DURABLE_QUEUE, "amq.direct", NON_DURABLE_QUEUE, null);
+        assertContent();
+    }
+
+    public void testPerformUpgradeWithHandlerAnsweringNo() throws Exception
+    {
+        UpgradeFrom4To5 upgrade = new UpgradeFrom4To5();
+        upgrade.performUpgrade(LOG_SUBJECT, _environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO));
+        assertQueues(new HashSet<String>(Arrays.asList(DURABLE_SUBSCRIPTION_QUEUE, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, DURABLE_QUEUE)));
+
+        assertDatabaseRecordCount(DELIVERY_DB_NAME, 12);
+        assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, 12);
+        assertDatabaseRecordCount(EXCHANGE_DB_NAME, TOTAL_EXCHANGES);
+
+        assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE, 1);
+        assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, 1);
+        assertQueueMessages(DURABLE_QUEUE, 10);
+
+        final List<BindingRecord> queueBindings = loadBindings();
+
+        assertEquals("Unxpected list size", TOTAL_BINDINGS - 2, queueBindings.size());
+        assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", BDBStoreUpgradeTestPreparer.TOPIC_NAME,
+                "");
+        assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic",
+                BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'");
+        assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null);
+        assertContent();
+    }
+
+    private List<BindingRecord> loadBindings()
+    {
+        final BindingTuple bindingTuple = new BindingTuple();
+        final List<BindingRecord> queueBindings = new ArrayList<BindingRecord>();
+        CursorOperation databaseOperation = new CursorOperation()
+        {
+
+            @Override
+            public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+                    DatabaseEntry key, DatabaseEntry value)
+            {
+                BindingRecord bindingRecord = bindingTuple.entryToObject(key);
+
+                AMQShortString queueName = bindingRecord.getQueueName();
+                AMQShortString exchangeName = bindingRecord.getExchangeName();
+                AMQShortString routingKey = bindingRecord.getRoutingKey();
+                FieldTable arguments = bindingRecord.getArguments();
+                queueBindings.add(new BindingRecord(exchangeName, queueName, routingKey, arguments));
+            }
+        };
+        new DatabaseTemplate(_environment, BINDING_DB_NAME, null).run(databaseOperation);
+        return queueBindings;
+    }
+
+    private void assertBindingRecord(List<BindingRecord> queueBindings, String queueName, String exchangeName,
+            String routingKey, String selectorKey)
+    {
+        BindingRecord record = null;
+        for (BindingRecord bindingRecord : queueBindings)
+        {
+            if (bindingRecord.getQueueName().asString().equals(queueName)
+                    && bindingRecord.getExchangeName().asString().equals(exchangeName))
+            {
+                record = bindingRecord;
+                break;
+            }
+        }
+        assertNotNull("Binding is not found for queue " + queueName + " and exchange " + exchangeName, record);
+        assertEquals("Unexpected routing key", routingKey, record.getRoutingKey().asString());
+
+        if (selectorKey != null)
+        {
+            assertEquals("Unexpected selector key for " + queueName, selectorKey,
+                    record.getArguments().get(AMQPFilterTypes.JMS_SELECTOR.getValue()));
+        }
+    }
+
+    private void assertQueueMessages(final String queueName, final int expectedQueueSize)
+    {
+        final Set<Long> messageIdsForQueue = assertDeliveriesForQueue(queueName, expectedQueueSize);
+
+        assertMetadataForQueue(queueName, expectedQueueSize, messageIdsForQueue);
+
+        assertContentForQueue(queueName, expectedQueueSize, messageIdsForQueue);
+    }
+
+    private Set<Long> assertDeliveriesForQueue(final String queueName, final int expectedQueueSize)
+    {
+        final QueueEntryKeyBinding queueEntryKeyBinding = new QueueEntryKeyBinding();
+        final AtomicInteger deliveryCounter = new AtomicInteger();
+        final Set<Long> messagesForQueue = new HashSet<Long>();
+
+        CursorOperation deliveryDatabaseOperation = new CursorOperation()
+        {
+            @Override
+            public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+                    DatabaseEntry key, DatabaseEntry value)
+            {
+                QueueEntryKey entryKey = queueEntryKeyBinding.entryToObject(key);
+                String thisQueueName = entryKey.getQueueName().asString();
+                if (thisQueueName.equals(queueName))
+                {
+                    deliveryCounter.incrementAndGet();
+                    messagesForQueue.add(entryKey.getMessageId());
+                }
+            }
+        };
+        new DatabaseTemplate(_environment, DELIVERY_DB_NAME, null).run(deliveryDatabaseOperation);
+
+        assertEquals("Unxpected number of entries in delivery db for queue " + queueName, expectedQueueSize,
+                deliveryCounter.get());
+
+        return messagesForQueue;
+    }
+
+    private void assertMetadataForQueue(final String queueName, final int expectedQueueSize,
+            final Set<Long> messageIdsForQueue)
+    {
+        final AtomicInteger metadataCounter = new AtomicInteger();
+        CursorOperation databaseOperation = new CursorOperation()
+        {
+
+            @Override
+            public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+                    DatabaseEntry key, DatabaseEntry value)
+            {
+                Long messageId = LongBinding.entryToLong(key);
+
+                boolean messageIsForTheRightQueue = messageIdsForQueue.contains(messageId);
+                if (messageIsForTheRightQueue)
+                {
+                    metadataCounter.incrementAndGet();
+                }
+            }
+        };
+        new DatabaseTemplate(_environment, MESSAGE_META_DATA_DB_NAME, null).run(databaseOperation);
+
+        assertEquals("Unxpected number of entries in metadata db for queue " + queueName, expectedQueueSize,
+                metadataCounter.get());
+    }
+
+    private void assertContentForQueue(String queueName, int expectedQueueSize, final Set<Long> messageIdsForQueue)
+    {
+        final AtomicInteger contentCounter = new AtomicInteger();
+        final MessageContentKeyBinding keyBinding = new MessageContentKeyBinding();
+        CursorOperation cursorOperation = new CursorOperation()
+        {
+            private long _prevMsgId = -1;
+
+            @Override
+            public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+                    DatabaseEntry key, DatabaseEntry value)
+            {
+                MessageContentKey contentKey = keyBinding.entryToObject(key);
+                long msgId = contentKey.getMessageId();
+
+                if (_prevMsgId != msgId && messageIdsForQueue.contains(msgId))
+                {
+                    contentCounter.incrementAndGet();
+                }
+
+                _prevMsgId = msgId;
+            }
+        };
+        new DatabaseTemplate(_environment, MESSAGE_CONTENT_DB_NAME, null).run(cursorOperation);
+
+        assertEquals("Unxpected number of entries in content db for queue " + queueName, expectedQueueSize,
+                contentCounter.get());
+    }
+
+    private void assertQueues(Set<String> expectedQueueNames)
+    {
+        List<AMQShortString> durableSubNames = new ArrayList<AMQShortString>();
+        final UpgradeFrom4To5.QueueRecordBinding binding = new UpgradeFrom4To5.QueueRecordBinding(durableSubNames);
+        final Set<String> actualQueueNames = new HashSet<String>();
+
+        CursorOperation queueNameCollector = new CursorOperation()
+        {
+
+            @Override
+            public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+                    DatabaseEntry key, DatabaseEntry value)
+            {
+                QueueRecord record = binding.entryToObject(value);
+                String queueName = record.getNameShortString().asString();
+                actualQueueNames.add(queueName);
+            }
+        };
+        new DatabaseTemplate(_environment, "queueDb_v5", null).run(queueNameCollector);
+
+        assertEquals("Unexpected queue names", expectedQueueNames, actualQueueNames);
+    }
+
+    private void assertContent()
+    {
+        final UpgradeFrom4To5.ContentBinding contentBinding = new UpgradeFrom4To5.ContentBinding();
+        CursorOperation contentCursorOperation = new CursorOperation()
+        {
+
+            @Override
+            public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key,
+                    DatabaseEntry value)
+            {
+                long id = LongBinding.entryToLong(key);
+                assertTrue("Unexpected id", id > 0);
+                ByteBuffer content = contentBinding.entryToObject(value);
+                assertNotNull("Unexpected content", content);
+            }
+        };
+        new DatabaseTemplate(_environment, MESSAGE_CONTENT_DB_NAME, null).run(contentCursorOperation);
+    }
+}

Added: qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java?rev=1306244&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java Wed Mar 28 10:30:03 2012
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewDataBinding;
+
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Transaction;
+
+public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
+{
+    private static final Logger _logger = Logger.getLogger(UpgradeFrom5To6Test.class);
+
+    @Override
+    protected String getStoreDirectoryName()
+    {
+        return "bdbstore-v5";
+    }
+
+    public void testPerformUpgrade() throws Exception
+    {
+        UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
+        upgrade.performUpgrade(LOG_SUBJECT, _environment, UpgradeInteractionHandler.DEFAULT_HANDLER);
+
+        assertDatabaseRecordCounts();
+        assertContent();
+    }
+
+    public void testPerformUpgradeWithMissingMessageChunkKeepsIncompleteMessage() throws Exception
+    {
+        corruptDatabase();
+
+        UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
+        upgrade.performUpgrade(LOG_SUBJECT, _environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES));
+
+        assertDatabaseRecordCounts();
+    }
+
+    public void testPerformUpgradeWithMissingMessageChunkDiscardsIncompleteMessage() throws Exception
+    {
+        corruptDatabase();
+
+        UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
+
+        UpgradeInteractionHandler discardMessageInteractionHandler = new StaticAnswerHandler(UpgradeInteractionResponse.NO);
+
+        upgrade.performUpgrade(LOG_SUBJECT, _environment, discardMessageInteractionHandler);
+
+        assertDatabaseRecordCount("MESSAGE_METADATA", 11);
+        assertDatabaseRecordCount("MESSAGE_CONTENT", 11);
+    }
+
+    /**
+     * modify the chunk offset of a message to be wrong, so we can test logic
+     * that preserves incomplete messages
+     */
+    private void corruptDatabase()
+    {
+        CursorOperation cursorOperation = new CursorOperation()
+        {
+
+            @Override
+            public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+                    DatabaseEntry key, DatabaseEntry value)
+            {
+                CompoundKeyBinding binding = new CompoundKeyBinding();
+                CompoundKey originalCompoundKey = binding.entryToObject(key);
+                int corruptedOffset = originalCompoundKey.getOffset() + 2;
+                CompoundKey corruptedCompoundKey = new CompoundKey(originalCompoundKey.getMessageId(), corruptedOffset);
+                DatabaseEntry newKey = new DatabaseEntry();
+                binding.objectToEntry(corruptedCompoundKey, newKey);
+
+                _logger.info("Deliberately corrupted message id " + originalCompoundKey.getMessageId()
+                        + ", changed offset from " + originalCompoundKey.getOffset() + " to "
+                        + corruptedCompoundKey.getOffset());
+
+                deleteCurrent();
+                sourceDatabase.put(transaction, newKey, value);
+
+                abort();
+            }
+        };
+
+        Transaction transaction = _environment.beginTransaction(null, null);
+        new DatabaseTemplate(_environment, "messageContentDb_v5", transaction).run(cursorOperation);
+        transaction.commit();
+    }
+
+    private void assertDatabaseRecordCounts()
+    {
+        assertDatabaseRecordCount("EXCHANGES", 5);
+        assertDatabaseRecordCount("QUEUES", 3);
+        assertDatabaseRecordCount("QUEUE_BINDINGS", 6);
+        assertDatabaseRecordCount("DELIVERIES", 12);
+
+        assertDatabaseRecordCount("MESSAGE_METADATA", 12);
+        assertDatabaseRecordCount("MESSAGE_CONTENT", 12);
+    }
+
+    private void assertContent()
+    {
+        final NewDataBinding contentBinding = new NewDataBinding();
+        CursorOperation contentCursorOperation = new CursorOperation()
+        {
+
+            @Override
+            public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key,
+                    DatabaseEntry value)
+            {
+                long id = LongBinding.entryToLong(key);
+                assertTrue("Unexpected id", id > 0);
+                byte[] content = contentBinding.entryToObject(value);
+                assertNotNull("Unexpected content", content);
+            }
+        };
+        new DatabaseTemplate(_environment, "MESSAGE_CONTENT", null).run(contentCursorOperation);
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org