You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/04/05 00:10:25 UTC

svn commit: r1584926 [2/3] - in /qpid/branches/java-broker-config-store-changes/qpid/java: ./ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ bdbstore/src/main/java/o...

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Fri Apr  4 22:10:24 2014
@@ -49,7 +49,10 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonParseException;
@@ -221,19 +224,125 @@ abstract public class AbstractJDBCMessag
     }
 
     @Override
-    public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+    public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
     {
         checkConfigurationStoreOpen();
 
         try
         {
-            recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
-            loadConfiguredObjects(recoveryHandler);
-            setConfigVersion(recoveryHandler.completeConfigurationRecovery());
+            int configVersion = getConfigVersion();
+
+            handler.begin(configVersion);
+            doVisitAllConfiguredObjectRecords(handler);
+
+            int newConfigVersion = handler.end();
+            if(newConfigVersion != configVersion)
+            {
+                setConfigVersion(newConfigVersion);
+            }
         }
         catch (SQLException e)
         {
-            throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+            throw new StoreException("Cannot visit configured object records", e);
+        }
+
+    }
+
+    private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException
+    {
+        Connection conn = newAutoCommitConnection();
+        Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
+        final ObjectMapper objectMapper = new ObjectMapper();
+        try
+        {
+            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+            try
+            {
+                ResultSet rs = stmt.executeQuery();
+                try
+                {
+                    while (rs.next())
+                    {
+                        String id = rs.getString(1);
+                        String objectType = rs.getString(2);
+                        String attributes = getBlobAsString(rs, 3);
+                        final ConfiguredObjectRecordImpl configuredObjectRecord =
+                                new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
+                                                               objectMapper.readValue(attributes, Map.class));
+                        configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
+
+                    }
+                }
+                catch (JsonMappingException e)
+                {
+                    throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+                }
+                catch (JsonParseException e)
+                {
+                    throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+                }
+                catch (IOException e)
+                {
+                    throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+                }
+                finally
+                {
+                    rs.close();
+                }
+            }
+            finally
+            {
+                stmt.close();
+            }
+            stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
+            try
+            {
+                ResultSet rs = stmt.executeQuery();
+                try
+                {
+                    while (rs.next())
+                    {
+                        UUID childId = UUID.fromString(rs.getString(1));
+                        String parentType = rs.getString(2);
+                        UUID parentId = UUID.fromString(rs.getString(3));
+
+                        ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
+                        ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
+
+                        if(child != null && parent != null)
+                        {
+                            child.addParent(parentType, parent);
+                        }
+                        else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange"))
+                        {
+                            // TODO - remove this hack for amq. exchanges
+                            child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap()));
+                        }
+                    }
+                }
+                finally
+                {
+                    rs.close();
+                }
+            }
+            finally
+            {
+                stmt.close();
+            }
+
+        }
+        finally
+        {
+            conn.close();
+        }
+
+        for(ConfiguredObjectRecord record : configuredObjects.values())
+        {
+            boolean shoudlContinue = handler.handle(record);
+            if (!shoudlContinue)
+            {
+                break;
+            }
         }
     }
 
@@ -282,44 +391,25 @@ abstract public class AbstractJDBCMessag
             {
                 createOrOpenMessageStoreDatabase();
                 upgradeIfNecessary(parent);
-            }
-            catch (SQLException e)
-            {
-                throw new StoreException("Unable to activate message store ", e);
-            }
-        }
-    }
 
-    @Override
-    public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
-    {
-        checkMessageStoreOpen();
-
-        if(messageRecoveryHandler != null)
-        {
-            try
-            {
-                recoverMessages(messageRecoveryHandler);
-            }
-            catch (SQLException e)
-            {
-                throw new StoreException("Error encountered when restoring message data from " +
-                                                       "persistent store ", e);
-            }
-        }
-        if(transactionLogRecoveryHandler != null)
-        {
-            try
-            {
-                TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(transactionLogRecoveryHandler);
-                recoverXids(dtxrh);
+                visitMessages(new MessageHandler()
+                {
+                    @Override
+                    public boolean handle(StoredMessage<?> storedMessage)
+                    {
+                        long id = storedMessage.getMessageNumber();
+                        if (_messageId.get() < id)
+                        {
+                            _messageId.set(id);
+                        }
+                        return true;
+                    }
+                });
             }
             catch (SQLException e)
             {
-                throw new StoreException("Error encountered when restoring distributed transaction " +
-                                                       "data from persistent store ", e);
+                throw new StoreException("Unable to activate message store ", e);
             }
-
         }
     }
 
@@ -1043,11 +1133,9 @@ abstract public class AbstractJDBCMessag
                 getLogger().debug("Enqueuing message "
                                    + messageId
                                    + " on queue "
-                                   + (queue instanceof AMQQueue
-                                      ? ((AMQQueue) queue).getName()
-                                      : "")
-                                   + queue.getId()
-                                   + "[Connection"
+                                   + queue.getName()
+                                   + " with id " + queue.getId()
+                                   + " [Connection"
                                    + conn
                                    + "]");
             }
@@ -1068,7 +1156,7 @@ abstract public class AbstractJDBCMessag
         catch (SQLException e)
         {
             getLogger().error("Failed to enqueue: " + e.getMessage(), e);
-            throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId()
+            throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId()
                 + " to database", e);
         }
 
@@ -1093,15 +1181,13 @@ abstract public class AbstractJDBCMessag
 
                 if(results != 1)
                 {
-                    throw new StoreException("Unable to find message with id " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+                    throw new StoreException("Unable to find message with id " + messageId + " on queue " + queue.getName()
                            + " with id " + queue.getId());
                 }
 
                 if (getLogger().isDebugEnabled())
                 {
-                    getLogger().debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue
-                                                                                          ? ((AMQQueue) queue).getName()
-                                                                                          : "")
+                    getLogger().debug("Dequeuing message " + messageId + " on queue " + queue.getName()
                                        + " with id " + queue.getId());
                 }
             }
@@ -1114,7 +1200,7 @@ abstract public class AbstractJDBCMessag
         catch (SQLException e)
         {
             getLogger().error("Failed to dequeue: " + e.getMessage(), e);
-            throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+            throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + queue.getName()
                     + " with id " + queue.getId() + " from database", e);
         }
 
@@ -1363,131 +1449,6 @@ abstract public class AbstractJDBCMessag
 
     }
 
-    private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
-    {
-        Connection conn = newAutoCommitConnection();
-        try
-        {
-            MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin();
-
-            Statement stmt = conn.createStatement();
-            try
-            {
-                ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
-                try
-                {
-
-                    long maxId = 0;
-
-                    while(rs.next())
-                    {
-
-                        long messageId = rs.getLong(1);
-                        if(messageId > maxId)
-                        {
-                            maxId = messageId;
-                        }
-
-                        byte[] dataAsBytes = getBlobAsBytes(rs, 2);
-
-                        ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
-                        buf.position(1);
-                        buf = buf.slice();
-                        MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
-                        StorableMessageMetaData metaData = type.createMetaData(buf);
-                        StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
-                        messageHandler.message(message);
-                    }
-
-                    _messageId.set(maxId);
-
-                    messageHandler.completeMessageRecovery();
-                }
-                finally
-                {
-                    rs.close();
-                }
-            }
-            finally
-            {
-                stmt.close();
-            }
-        }
-        finally
-        {
-            conn.close();
-        }
-    }
-
-
-    private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
-    {
-        Connection conn = newAutoCommitConnection();
-        try
-        {
-            TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this);
-
-            Statement stmt = conn.createStatement();
-            try
-            {
-                ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
-                try
-                {
-                    while(rs.next())
-                    {
-
-                        String id = rs.getString(1);
-                        long messageId = rs.getLong(2);
-                        queueEntryHandler.queueEntry(UUID.fromString(id), messageId);
-                    }
-                }
-                finally
-                {
-                    rs.close();
-                }
-            }
-            finally
-            {
-                stmt.close();
-            }
-
-            return queueEntryHandler.completeQueueEntryRecovery();
-        }
-        finally
-        {
-            conn.close();
-        }
-    }
-
-    private static final class Xid
-    {
-
-        private final long _format;
-        private final byte[] _globalId;
-        private final byte[] _branchId;
-
-        public Xid(long format, byte[] globalId, byte[] branchId)
-        {
-            _format = format;
-            _globalId = globalId;
-            _branchId = branchId;
-        }
-
-        public long getFormat()
-        {
-            return _format;
-        }
-
-        public byte[] getGlobalId()
-        {
-            return _globalId;
-        }
-
-        public byte[] getBranchId()
-        {
-            return _branchId;
-        }
-    }
 
     private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage
     {
@@ -1550,93 +1511,6 @@ abstract public class AbstractJDBCMessag
         }
     }
 
-    private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
-    {
-        Connection conn = newAutoCommitConnection();
-        try
-        {
-            List<Xid> xids = new ArrayList<Xid>();
-
-            Statement stmt = conn.createStatement();
-            try
-            {
-                ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
-                try
-                {
-                    while(rs.next())
-                    {
-
-                        long format = rs.getLong(1);
-                        byte[] globalId = rs.getBytes(2);
-                        byte[] branchId = rs.getBytes(3);
-                        xids.add(new Xid(format, globalId, branchId));
-                    }
-                }
-                finally
-                {
-                    rs.close();
-                }
-            }
-            finally
-            {
-                stmt.close();
-            }
-
-
-
-            for(Xid xid : xids)
-            {
-                List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
-                List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
-
-                PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
-
-                try
-                {
-                    pstmt.setLong(1, xid.getFormat());
-                    pstmt.setBytes(2, xid.getGlobalId());
-                    pstmt.setBytes(3, xid.getBranchId());
-
-                    ResultSet rs = pstmt.executeQuery();
-                    try
-                    {
-                        while(rs.next())
-                        {
-
-                            String actionType = rs.getString(1);
-                            UUID queueId = UUID.fromString(rs.getString(2));
-                            long messageId = rs.getLong(3);
-
-                            RecordImpl record = new RecordImpl(queueId, messageId);
-                            List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
-                            records.add(record);
-                        }
-                    }
-                    finally
-                    {
-                        rs.close();
-                    }
-                }
-                finally
-                {
-                    pstmt.close();
-                }
-
-                dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
-                                enqueues.toArray(new RecordImpl[enqueues.size()]),
-                                dequeues.toArray(new RecordImpl[dequeues.size()]));
-            }
-
-
-            dtxrh.completeDtxRecordRecovery();
-        }
-        finally
-        {
-            conn.close();
-        }
-
-    }
-
     private StorableMessageMetaData getMetaData(long messageId) throws SQLException
     {
 
@@ -2357,43 +2231,81 @@ abstract public class AbstractJDBCMessag
         }
     }
 
-    private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException,
-                                                                                            StoreException
+    @Override
+    public void visitMessages(MessageHandler handler) throws StoreException
     {
-        Connection conn = newAutoCommitConnection();
-        Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
-        final ObjectMapper objectMapper = new ObjectMapper();
+        checkMessageStoreOpen();
+
+        Connection conn = null;
         try
         {
-            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+            conn = newAutoCommitConnection();
+            Statement stmt = conn.createStatement();
             try
             {
-                ResultSet rs = stmt.executeQuery();
+                ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
                 try
                 {
                     while (rs.next())
                     {
-                        String id = rs.getString(1);
-                        String objectType = rs.getString(2);
-                        String attributes = getBlobAsString(rs, 3);
-                        final ConfiguredObjectRecordImpl configuredObjectRecord =
-                                new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
-                                                               objectMapper.readValue(attributes, Map.class));
-                        configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
-
+                        long messageId = rs.getLong(1);
+                        byte[] dataAsBytes = getBlobAsBytes(rs, 2);
+                        ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+                        buf.position(1);
+                        buf = buf.slice();
+                        MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+                        StorableMessageMetaData metaData = type.createMetaData(buf);
+                        StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
+                        if (!handler.handle(message))
+                        {
+                            break;
+                        }
                     }
                 }
-                catch (JsonMappingException e)
-                {
-                    throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
-                }
-                catch (JsonParseException e)
+                finally
                 {
-                    throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+                    rs.close();
                 }
-                catch (IOException e)
+            }
+            finally
+            {
+                stmt.close();
+            }
+        }
+        catch (SQLException e)
+        {
+            throw new StoreException("Error encountered when visiting messages", e);
+        }
+        finally
+        {
+            closeConnection(conn);
+        }
+    }
+
+    @Override
+    public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+    {
+        checkMessageStoreOpen();
+
+        Connection conn = null;
+        try
+        {
+            conn = newAutoCommitConnection();
+            Statement stmt = conn.createStatement();
+            try
+            {
+                ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+                try
                 {
-                    throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+                    while(rs.next())
+                    {
+                        String id = rs.getString(1);
+                        long messageId = rs.getLong(2);
+                        if (!handler.handle(UUID.fromString(id), messageId))
+                        {
+                            break;
+                        }
+                    }
                 }
                 finally
                 {
@@ -2404,31 +2316,41 @@ abstract public class AbstractJDBCMessag
             {
                 stmt.close();
             }
-            stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
+        }
+        catch(SQLException e)
+        {
+            throw new StoreException("Error encountered when visiting message instances", e);
+        }
+        finally
+        {
+            closeConnection(conn);
+        }
+    }
+
+    @Override
+    public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+    {
+        checkMessageStoreOpen();
+
+        Connection conn = null;
+        try
+        {
+            conn = newAutoCommitConnection();
+            List<Xid> xids = new ArrayList<Xid>();
+
+            Statement stmt = conn.createStatement();
             try
             {
-                ResultSet rs = stmt.executeQuery();
+                ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
                 try
                 {
-                    while (rs.next())
+                    while(rs.next())
                     {
-                        UUID childId = UUID.fromString(rs.getString(1));
-                        String parentType = rs.getString(2);
-                        UUID parentId = UUID.fromString(rs.getString(3));
-
-                        ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
-                        ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
-
-                        if(child != null && parent != null)
-                        {
-                            child.addParent(parentType, parent);
-                        }
-                        else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange"))
-                        {
-                            // TODO - remove this hack for amq. exchanges
-                            child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap()));
-                        }
 
+                        long format = rs.getLong(1);
+                        byte[] globalId = rs.getBytes(2);
+                        byte[] branchId = rs.getBytes(3);
+                        xids.add(new Xid(format, globalId, branchId));
                     }
                 }
                 finally
@@ -2441,18 +2363,67 @@ abstract public class AbstractJDBCMessag
                 stmt.close();
             }
 
+
+
+            for(Xid xid : xids)
+            {
+                List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
+                List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
+
+                PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
+
+                try
+                {
+                    pstmt.setLong(1, xid.getFormat());
+                    pstmt.setBytes(2, xid.getGlobalId());
+                    pstmt.setBytes(3, xid.getBranchId());
+
+                    ResultSet rs = pstmt.executeQuery();
+                    try
+                    {
+                        while(rs.next())
+                        {
+
+                            String actionType = rs.getString(1);
+                            UUID queueId = UUID.fromString(rs.getString(2));
+                            long messageId = rs.getLong(3);
+
+                            RecordImpl record = new RecordImpl(queueId, messageId);
+                            List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
+                            records.add(record);
+                        }
+                    }
+                    finally
+                    {
+                        rs.close();
+                    }
+                }
+                finally
+                {
+                    pstmt.close();
+                }
+
+                if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
+                                enqueues.toArray(new RecordImpl[enqueues.size()]),
+                                dequeues.toArray(new RecordImpl[dequeues.size()])))
+                {
+                    break;
+                }
+            }
+
         }
-        finally
+        catch (SQLException e)
         {
-            conn.close();
-        }
+            throw new StoreException("Error encountered when visiting distributed transactions", e);
 
-        for(ConfiguredObjectRecord record : configuredObjects.values())
+        }
+        finally
         {
-            recoveryHandler.configuredObject(record);
+            closeConnection(conn);
         }
     }
 
+
     protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
 
     protected abstract void storedSizeChange(int storeSizeIncrease);

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java Fri Apr  4 22:10:24 2014
@@ -20,17 +20,36 @@
  */
 package org.apache.qpid.server.store;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 
 /** A simple message store that stores the messages in a thread-safe structure in memory. */
-abstract public class AbstractMemoryMessageStore extends NullMessageStore
+abstract class AbstractMemoryMessageStore implements MessageStore, DurableConfigurationStore
 {
-    private final AtomicLong _messageId = new AtomicLong(1);
-
-    private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
+    private final class MemoryMessageStoreTransaction implements Transaction
     {
+        private Map<UUID, Set<Long>> _localEnqueueMap = new HashMap<UUID, Set<Long>>();
+        private Map<UUID, Set<Long>> _localDequeueMap = new HashMap<UUID, Set<Long>>();
+
+        private Map<Xid, DistributedTransactionRecords> _localDistributedTransactionsRecords = new HashMap<Xid, DistributedTransactionRecords>();
+        private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>();
+
         @Override
         public StoreFuture commitTranAsync()
         {
@@ -40,50 +59,145 @@ abstract public class AbstractMemoryMess
         @Override
         public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
         {
+            Set<Long> messageIds = _localEnqueueMap.get(queue.getId());
+            if (messageIds == null)
+            {
+                messageIds = new HashSet<Long>();
+                _localEnqueueMap.put(queue.getId(), messageIds);
+            }
+            messageIds.add(message.getMessageNumber());
         }
 
         @Override
-        public void dequeueMessage(TransactionLogResource  queue, EnqueueableMessage message)
+        public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
         {
+            Set<Long> messageIds = _localDequeueMap.get(queue.getId());
+            if (messageIds == null)
+            {
+                messageIds = new HashSet<Long>();
+                _localDequeueMap.put(queue.getId(), messageIds);
+            }
+            messageIds.add(message.getMessageNumber());
         }
 
         @Override
         public void commitTran()
         {
+            commitTransactionInternal(this);
+            _localEnqueueMap.clear();
+            _localDequeueMap.clear();
         }
 
         @Override
         public void abortTran()
         {
+            _localEnqueueMap.clear();
+            _localDequeueMap.clear();
         }
 
         @Override
         public void removeXid(long format, byte[] globalId, byte[] branchId)
         {
+            _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId));
         }
 
         @Override
         public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
         {
+            _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues));
         }
-    };
+    }
+
+    private final AtomicLong _messageId = new AtomicLong(1);
 
-    private final EventManager _eventManager = new EventManager();
+    private final ConcurrentHashMap<UUID, ConfiguredObjectRecord> _configuredObjectRecords = new ConcurrentHashMap<UUID, ConfiguredObjectRecord>();
 
+    protected ConcurrentHashMap<Long, StoredMemoryMessage> _messages = new ConcurrentHashMap<Long, StoredMemoryMessage>();
 
+    private Object _transactionLock = new Object();
+    private Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>();
+    private Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>();
+
+    @SuppressWarnings("unchecked")
     @Override
-    public StoredMessage addMessage(StorableMessageMetaData metaData)
+    public StoredMessage<StorableMessageMetaData> addMessage(final StorableMessageMetaData metaData)
+    {
+        long id = _messageId.getAndIncrement();
+
+        if(metaData.isPersistent())
+        {
+            return new StoredMemoryMessage(id, metaData)
+            {
+
+                @Override
+                public StoreFuture flushToStore()
+                {
+                    _messages.putIfAbsent(getMessageNumber(), this) ;
+                    return super.flushToStore();
+                }
+
+                @Override
+                public void remove()
+                {
+                    _messages.remove(getMessageNumber());
+                    super.remove();
+                }
+
+            };
+        }
+        else
+        {
+            return new StoredMemoryMessage(id, metaData);
+        }
+    }
+
+    private void commitTransactionInternal(MemoryMessageStoreTransaction transaction)
     {
-        final long id = _messageId.getAndIncrement();
-        StoredMemoryMessage message = new StoredMemoryMessage(id, metaData);
+        synchronized (_transactionLock )
+        {
+            for (Map.Entry<UUID, Set<Long>> loacalEnqueuedEntry : transaction._localEnqueueMap.entrySet())
+            {
+                Set<Long> messageIds = _messageInstances.get(loacalEnqueuedEntry.getKey());
+                if (messageIds == null)
+                {
+                    messageIds = new HashSet<Long>();
+                    _messageInstances.put(loacalEnqueuedEntry.getKey(), messageIds);
+                }
+                messageIds.addAll(loacalEnqueuedEntry.getValue());
+            }
+
+            for (Map.Entry<UUID, Set<Long>> loacalDequeueEntry : transaction._localDequeueMap.entrySet())
+            {
+                Set<Long> messageIds = _messageInstances.get(loacalDequeueEntry.getKey());
+                if (messageIds != null)
+                {
+                    messageIds.removeAll(loacalDequeueEntry.getValue());
+                    if (messageIds.isEmpty())
+                    {
+                        _messageInstances.remove(loacalDequeueEntry.getKey());
+                    }
+                }
+            }
+
+            for (Map.Entry<Xid, DistributedTransactionRecords> entry : transaction._localDistributedTransactionsRecords.entrySet())
+            {
+                _distributedTransactions.put(entry.getKey(), entry.getValue());
+            }
+
+            for (Xid removed : transaction._localDistributedTransactionsRemoves)
+            {
+                _distributedTransactions.remove(removed);
+            }
+
+        }
+
 
-        return message;
     }
 
     @Override
     public Transaction newTransaction()
     {
-        return IN_MEMORY_TRANSACTION;
+        return new MemoryMessageStoreTransaction();
     }
 
     @Override
@@ -95,7 +209,164 @@ abstract public class AbstractMemoryMess
     @Override
     public void addEventListener(EventListener eventListener, Event... events)
     {
-        _eventManager.addEventListener(eventListener, events);
     }
 
+    @Override
+    public void create(ConfiguredObjectRecord record)
+    {
+        if (_configuredObjectRecords.putIfAbsent(record.getId(), record) != null)
+        {
+            throw new StoreException("Record with id " + record.getId() + " is already present");
+        }
+    }
+
+    @Override
+    public void update(boolean createIfNecessary, ConfiguredObjectRecord... records)
+    {
+        for (ConfiguredObjectRecord record : records)
+        {
+            ConfiguredObjectRecord previousValue = _configuredObjectRecords.replace(record.getId(), record);
+            if (previousValue == null && !createIfNecessary)
+            {
+                throw new StoreException("Record with id " + record.getId() + " does not exist");
+            }
+        }
+    }
+
+    @Override
+    public UUID[] remove(final ConfiguredObjectRecord... objects)
+    {
+        List<UUID> removed = new ArrayList<UUID>();
+        for (ConfiguredObjectRecord record : objects)
+        {
+            if (_configuredObjectRecords.remove(record.getId()) != null)
+            {
+                removed.add(record.getId());
+            }
+        }
+        return removed.toArray(new UUID[removed.size()]);
+    }
+
+    @Override
+    public void closeConfigurationStore()
+    {
+        _configuredObjectRecords.clear();
+    }
+
+    @Override
+    public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+    {
+    }
+
+    @Override
+    public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
+    {
+        handler.begin(VirtualHost.CURRENT_CONFIG_VERSION);
+        for (ConfiguredObjectRecord record : _configuredObjectRecords.values())
+        {
+            if (!handler.handle(record))
+            {
+                break;
+            }
+        }
+        handler.end();
+    }
+
+    @Override
+    public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
+    {
+    }
+
+    @Override
+    public void closeMessageStore()
+    {
+        _messages.clear();
+        synchronized (_transactionLock)
+        {
+            _messageInstances.clear();
+            _distributedTransactions.clear();
+        }
+    }
+
+    @Override
+    public String getStoreLocation()
+    {
+        return null;
+    }
+
+    @Override
+    public void onDelete()
+    {
+    }
+
+    @Override
+    public void visitMessages(MessageHandler handler) throws StoreException
+    {
+        for (StoredMemoryMessage message : _messages.values())
+        {
+            if(!handler.handle(message))
+            {
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+    {
+        synchronized (_transactionLock)
+        {
+            for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet())
+            {
+                UUID resourceId = enqueuedEntry.getKey();
+                for (Long messageId : enqueuedEntry.getValue())
+                {
+                    if (!handler.handle(resourceId, messageId))
+                    {
+                        return;
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+    {
+        synchronized (_transactionLock)
+        {
+            for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet())
+            {
+                Xid xid = entry.getKey();
+                DistributedTransactionRecords records = entry.getValue();
+                if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), records.getEnqueues(), records.getDequeues()))
+                {
+                    break;
+                }
+            }
+        }
+    }
+
+    private static final class DistributedTransactionRecords
+    {
+        private Record[] _enqueues;
+        private Record[] _dequeues;
+
+        public DistributedTransactionRecords(Record[] enqueues, Record[] dequeues)
+        {
+            super();
+            _enqueues = enqueues;
+            _dequeues = dequeues;
+        }
+
+        public Record[] getEnqueues()
+        {
+            return _enqueues;
+        }
+
+        public Record[] getDequeues()
+        {
+            return _dequeues;
+        }
+    }
 }

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Fri Apr  4 22:10:24 2014
@@ -20,11 +20,12 @@
  */
 package org.apache.qpid.server.store;
 
-import org.apache.qpid.server.model.ConfiguredObject;
-
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+
 public interface DurableConfigurationStore
 {
     String STORE_TYPE                    = "storeType";
@@ -47,12 +48,6 @@ public interface DurableConfigurationSto
     void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) throws StoreException;
 
     /**
-     * Recovers configuration from the store using given recovery handler
-     * @param recoveryHandler recovery handler
-     */
-    void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) throws StoreException;
-
-    /**
      * Makes the specified object persistent.
      *
      * @param object The object to persist.
@@ -85,4 +80,11 @@ public interface DurableConfigurationSto
 
     void closeConfigurationStore() throws StoreException;
 
+    /**
+     * Visit all configured object records with given handler.
+     *
+     * @param handler a handler to invoke on each configured object record
+     * @throws StoreException
+     */
+    void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException;
 }

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java Fri Apr  4 22:10:24 2014
@@ -31,6 +31,7 @@ import java.util.*;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Model;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.JsonProcessingException;
@@ -97,22 +98,27 @@ public class JsonFileConfigStore impleme
     }
 
     @Override
-    public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+    public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
     {
-        recoveryHandler.beginConfigurationRecovery(this,_configVersion);
+        handler.begin(_configVersion);
         List<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>(_objectsById.values());
         for(ConfiguredObjectRecord record : records)
         {
-            recoveryHandler.configuredObject(record);
+            boolean shouldContinue = handler.handle(record);
+            if (!shouldContinue)
+            {
+                break;
+            }
         }
         int oldConfigVersion = _configVersion;
-        _configVersion = recoveryHandler.completeConfigurationRecovery();
+        _configVersion = handler.end();
         if(oldConfigVersion != _configVersion)
         {
             save();
         }
     }
 
+
     private void setup(final Map<String, Object> configurationStoreSettings)
     {
         Object storePathAttr = configurationStoreSettings.get(DurableConfigurationStore.STORE_PATH);

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java Fri Apr  4 22:10:24 2014
@@ -23,6 +23,9 @@ package org.apache.qpid.server.store;
 import java.util.Map;
 
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 
 /**
  * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
@@ -43,13 +46,6 @@ public interface MessageStore
      */
     void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings);
 
-    /**
-     * Called after opening to recover messages and transactions with given recovery handlers
-     * @param messageRecoveryHandler
-     * @param transactionLogRecoveryHandler
-     */
-    void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler);
-
     public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
 
 
@@ -71,8 +67,10 @@ public interface MessageStore
 
     String getStoreLocation();
 
-    // TODO dead method - remove??
-    String getStoreType();
-
     void onDelete();
+
+    void visitMessages(MessageHandler handler) throws StoreException;
+    void visitMessageInstances(MessageInstanceHandler handler) throws StoreException;
+    void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException;
+
 }

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java Fri Apr  4 22:10:24 2014
@@ -23,6 +23,10 @@ import java.util.Map;
 import java.util.UUID;
 
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 
 public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
 {
@@ -33,11 +37,6 @@ public abstract class NullMessageStore i
     }
 
     @Override
-    public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
-    {
-    }
-
-    @Override
     public void update(boolean createIfNecessary, ConfiguredObjectRecord... records)
     {
     }
@@ -92,11 +91,6 @@ public abstract class NullMessageStore i
     }
 
     @Override
-    public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
-    {
-    }
-
-    @Override
     public void addEventListener(EventListener eventListener, Event... events)
     {
     }
@@ -112,4 +106,24 @@ public abstract class NullMessageStore i
     {
     }
 
+    @Override
+    public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
+    {
+    }
+
+    @Override
+    public void visitMessages(MessageHandler handler) throws StoreException
+    {
+    }
+
+    @Override
+    public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+    {
+    }
+
+    @Override
+    public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+    {
+    }
+
 }

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java Fri Apr  4 22:10:24 2014
@@ -78,6 +78,10 @@ public class DefaultUpgraderProvider imp
 
     public DurableConfigurationStoreUpgrader getUpgrader(final int configVersion, DurableConfigurationRecoverer recoverer)
     {
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Getting upgrader for configVersion:  " + configVersion);
+        }
         DurableConfigurationStoreUpgrader currentUpgrader = null;
         switch(configVersion)
         {

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java Fri Apr  4 22:10:24 2014
@@ -29,11 +29,11 @@ import org.apache.qpid.server.logging.su
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.plugin.MessageStoreFactory;
 import org.apache.qpid.server.stats.StatisticsGatherer;
-
-import org.apache.qpid.server.store.DurableConfigurationRecoverer;
+import org.apache.qpid.server.store.ConfiguredObjectRecordRecoveverAndUpgrader;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.DurableConfigurationStoreCreator;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 
 public class StandardVirtualHost extends AbstractVirtualHost
 {
@@ -107,18 +107,22 @@ public class StandardVirtualHost extends
         if (_configurationStoreLogSubject != null)
         {
             getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.STORE_LOCATION(configurationStoreSettings.toString()));
+            getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_START());
         }
 
-        DurableConfigurationRecoverer configRecoverer = new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
-                new DefaultUpgraderProvider(this), getEventLogger());
+        ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
+
+        _durableConfigurationStore.visitConfiguredObjectRecords(upgraderRecoverer);
 
-        _durableConfigurationStore.recoverConfigurationStore(configRecoverer);
+        if (_configurationStoreLogSubject != null)
+        {
+            getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
+        }
 
         // If store does not have entries for standard exchanges (amq.*), the following will create them.
         initialiseModel();
 
-        VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getMessageStoreLogSubject());
-        _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
+        new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
 
         attainActivation();
     }

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java Fri Apr  4 22:10:24 2014
@@ -50,10 +50,10 @@ import org.apache.qpid.server.model.Prot
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.SystemContext;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
 import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class ManagementModeStoreHandlerTest extends QpidTestCase
@@ -89,20 +89,22 @@ public class ManagementModeStoreHandlerT
         when(_portEntry.getParents()).thenReturn(Collections.singletonMap(Broker.class.getSimpleName(), _root));
         when(_portEntry.getType()).thenReturn(Port.class.getSimpleName());
 
-        final ArgumentCaptor<ConfigurationRecoveryHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfigurationRecoveryHandler.class);
+        final ArgumentCaptor<ConfiguredObjectRecordHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfiguredObjectRecordHandler.class);
         doAnswer(
                 new Answer()
                 {
                     @Override
                     public Object answer(final InvocationOnMock invocation) throws Throwable
                     {
-                        ConfigurationRecoveryHandler recoverer = recovererArgumentCaptor.getValue();
-                        recoverer.configuredObject(_root);
-                        recoverer.configuredObject(_portEntry);
+                        ConfiguredObjectRecordHandler recoverer = recovererArgumentCaptor.getValue();
+                        if(recoverer.handle(_root))
+                        {
+                            recoverer.handle(_portEntry);
+                        }
                         return null;
                     }
                 }
-                ).when(_store).recoverConfigurationStore(recovererArgumentCaptor.capture());
+                ).when(_store).visitConfiguredObjectRecords(recovererArgumentCaptor.capture());
         _options = new BrokerOptions();
         _handler = new ManagementModeStoreHandler(_store, _options);
 
@@ -112,21 +114,21 @@ public class ManagementModeStoreHandlerT
     private ConfiguredObjectRecord getRootEntry()
     {
         BrokerFinder brokerFinder = new BrokerFinder();
-        _handler.recoverConfigurationStore(brokerFinder);
+        _handler.visitConfiguredObjectRecords(brokerFinder);
         return brokerFinder.getBrokerRecord();
     }
 
     private ConfiguredObjectRecord getEntry(UUID id)
     {
         RecordFinder recordFinder = new RecordFinder(id);
-        _handler.recoverConfigurationStore(recordFinder);
+        _handler.visitConfiguredObjectRecords(recordFinder);
         return recordFinder.getFoundRecord();
     }
 
     private Collection<UUID> getChildrenIds(ConfiguredObjectRecord record)
     {
         ChildFinder childFinder = new ChildFinder(record);
-        _handler.recoverConfigurationStore(childFinder);
+        _handler.visitConfiguredObjectRecords(childFinder);
         return childFinder.getChildIds();
     }
 
@@ -288,21 +290,25 @@ public class ManagementModeStoreHandlerT
         attributes.put(VirtualHost.TYPE, "STANDARD");
 
         final ConfiguredObjectRecord virtualHost = new ConfiguredObjectRecordImpl(virtualHostId, VirtualHost.class.getSimpleName(), attributes, Collections.singletonMap(Broker.class.getSimpleName(), _root));
-        final ArgumentCaptor<ConfigurationRecoveryHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfigurationRecoveryHandler.class);
+        final ArgumentCaptor<ConfiguredObjectRecordHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfiguredObjectRecordHandler.class);
         doAnswer(
                 new Answer()
                 {
                     @Override
                     public Object answer(final InvocationOnMock invocation) throws Throwable
                     {
-                        ConfigurationRecoveryHandler recoverer = recovererArgumentCaptor.getValue();
-                        recoverer.configuredObject(_root);
-                        recoverer.configuredObject(_portEntry);
-                        recoverer.configuredObject(virtualHost);
+                        ConfiguredObjectRecordHandler recoverer = recovererArgumentCaptor.getValue();
+                        if(recoverer.handle(_root))
+                        {
+                            if(recoverer.handle(_portEntry))
+                            {
+                                recoverer.handle(virtualHost);
+                            }
+                        }
                         return null;
                     }
                 }
-                ).when(_store).recoverConfigurationStore(recovererArgumentCaptor.capture());
+                ).when(_store).visitConfiguredObjectRecords(recovererArgumentCaptor.capture());
 
         State expectedState = mmQuiesceVhosts ? State.QUIESCED : null;
         if(mmQuiesceVhosts)
@@ -457,28 +463,32 @@ public class ManagementModeStoreHandlerT
     }
 
 
-    private class BrokerFinder implements ConfigurationRecoveryHandler
+    private class BrokerFinder implements ConfiguredObjectRecordHandler
     {
         private ConfiguredObjectRecord _brokerRecord;
+        private int _version;
+
         @Override
-        public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+        public void begin(final int configVersion)
         {
-
+            _version = configVersion;
         }
 
         @Override
-        public void configuredObject(final ConfiguredObjectRecord object)
+        public boolean handle(final ConfiguredObjectRecord object)
         {
             if(object.getType().equals(Broker.class.getSimpleName()))
             {
                 _brokerRecord = object;
+                return false;
             }
+            return true;
         }
 
         @Override
-        public int completeConfigurationRecovery()
+        public int end()
         {
-            return 0;
+            return _version;
         }
 
         public ConfiguredObjectRecord getBrokerRecord()
@@ -487,10 +497,11 @@ public class ManagementModeStoreHandlerT
         }
     }
 
-    private class RecordFinder implements ConfigurationRecoveryHandler
+    private class RecordFinder implements ConfiguredObjectRecordHandler
     {
         private final UUID _id;
         private ConfiguredObjectRecord _foundRecord;
+        private int _version;
 
         private RecordFinder(final UUID id)
         {
@@ -498,24 +509,26 @@ public class ManagementModeStoreHandlerT
         }
 
         @Override
-        public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+        public void begin(final int configVersion)
         {
-
+            _version = configVersion;
         }
 
         @Override
-        public void configuredObject(final ConfiguredObjectRecord object)
+        public boolean handle(final ConfiguredObjectRecord object)
         {
             if(object.getId().equals(_id))
             {
                 _foundRecord = object;
+                return false;
             }
+            return true;
         }
 
         @Override
-        public int completeConfigurationRecovery()
+        public int end()
         {
-            return 0;
+            return _version;
         }
 
         public ConfiguredObjectRecord getFoundRecord()
@@ -524,10 +537,11 @@ public class ManagementModeStoreHandlerT
         }
     }
 
-    private class ChildFinder implements ConfigurationRecoveryHandler
+    private class ChildFinder implements ConfiguredObjectRecordHandler
     {
         private final Collection<UUID> _childIds = new HashSet<UUID>();
         private final ConfiguredObjectRecord _parent;
+        private int _version;
 
         private ChildFinder(final ConfiguredObjectRecord parent)
         {
@@ -535,13 +549,13 @@ public class ManagementModeStoreHandlerT
         }
 
         @Override
-        public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+        public void begin(final int configVersion)
         {
-
+            _version = configVersion;
         }
 
         @Override
-        public void configuredObject(final ConfiguredObjectRecord object)
+        public boolean handle(final ConfiguredObjectRecord object)
         {
 
             if(object.getParents() != null)
@@ -555,12 +569,13 @@ public class ManagementModeStoreHandlerT
                 }
 
             }
+            return true;
         }
 
         @Override
-        public int completeConfigurationRecovery()
+        public int end()
         {
-            return 0;
+            return _version;
         }
 
         public Collection<UUID> getChildIds()

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java Fri Apr  4 22:10:24 2014
@@ -21,9 +21,7 @@
 package org.apache.qpid.server.store;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -49,6 +47,7 @@ import org.apache.qpid.server.model.UUID
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.util.FileUtils;
 import org.mockito.ArgumentCaptor;
@@ -71,9 +70,8 @@ public abstract class AbstractDurableCon
     private String _storePath;
     private String _storeName;
 
-    private ConfigurationRecoveryHandler _recoveryHandler;
+    private ConfiguredObjectRecordHandler _handler;
 
-    private ExchangeImpl _exchange = mock(ExchangeImpl.class);
     private static final String ROUTING_KEY = "routingKey";
     private static final String QUEUE_NAME = "queueName";
     private Map<String,Object> _bindingArgs;
@@ -96,16 +94,8 @@ public abstract class AbstractDurableCon
         FileUtils.delete(new File(_storePath), true);
         setTestSystemProperty("QPID_WORK", TMP_FOLDER);
 
-        _recoveryHandler = mock(ConfigurationRecoveryHandler.class);
-        when(_exchange.getName()).thenReturn(EXCHANGE_NAME);
-        when(_exchange.getId()).thenReturn(_exchangeId);
-        when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
-        when(_exchange.getEventLogger()).thenReturn(new EventLogger());
-
-        ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class);
-        when(exchangeRecord.getId()).thenReturn(_exchangeId);
-        when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName());
-        when(_exchange.asObjectRecord()).thenReturn(exchangeRecord);
+        _handler = mock(ConfiguredObjectRecordHandler.class);
+        when(_handler.handle(any(ConfiguredObjectRecord.class))).thenReturn(true);
 
         _bindingArgs = new HashMap<String, Object>();
         String argKey = AMQPFilterTypes.JMS_SELECTOR.toString();
@@ -134,7 +124,7 @@ public abstract class AbstractDurableCon
         DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
 
         reopenStore();
-        verify(_recoveryHandler).configuredObject(matchesRecord(_exchangeId, EXCHANGE,
+        verify(_handler).handle(matchesRecord(_exchangeId, EXCHANGE,
                 map( org.apache.qpid.server.model.Exchange.NAME, getName(),
                         org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type",
                         org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name())));
@@ -168,14 +158,16 @@ public abstract class AbstractDurableCon
         DurableConfigurationStoreHelper.removeExchange(_configStore, exchange);
 
         reopenStore();
-        verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
+        verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
     }
 
     public void testBindQueue() throws Exception
     {
+        ExchangeImpl<?> exchange = createTestExchange();
         AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
         BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
-                _exchange, _bindingArgs);
+                exchange, _bindingArgs);
+        DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
         DurableConfigurationStoreHelper.createQueue(_configStore, queue);
         DurableConfigurationStoreHelper.createBinding(_configStore, binding);
 
@@ -187,10 +179,10 @@ public abstract class AbstractDurableCon
 
         Map<String,UUID> parents = new HashMap<String, UUID>();
 
-        parents.put(Exchange.class.getSimpleName(), _exchange.getId());
+        parents.put(Exchange.class.getSimpleName(), exchange.getId());
         parents.put(Queue.class.getSimpleName(), queue.getId());
 
-        verify(_recoveryHandler).configuredObject(matchesRecord(binding.getId(), BINDING, map, parents));
+        verify(_handler).handle(matchesRecord(binding.getId(), BINDING, map, parents));
     }
 
 
@@ -260,15 +252,18 @@ public abstract class AbstractDurableCon
 
     public void testUnbindQueue() throws Exception
     {
+        ExchangeImpl<?> exchange = createTestExchange();
+        DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
+
         AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
         BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
-                _exchange, _bindingArgs);
+                exchange, _bindingArgs);
         DurableConfigurationStoreHelper.createBinding(_configStore, binding);
 
         DurableConfigurationStoreHelper.removeBinding(_configStore, binding);
         reopenStore();
 
-        verify(_recoveryHandler, never()).configuredObject(matchesRecord(ANY_UUID, BINDING,
+        verify(_handler, never()).handle(matchesRecord(ANY_UUID, BINDING,
                                                                          ANY_MAP));
     }
 
@@ -282,7 +277,7 @@ public abstract class AbstractDurableCon
         queueAttributes.put(Queue.NAME, getName());
         queueAttributes.put(Queue.OWNER, getName()+"Owner");
         queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
-        verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+        verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
     }
 
     public void testCreateQueueAMQQueueFieldTable() throws Exception
@@ -304,7 +299,7 @@ public abstract class AbstractDurableCon
         queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
         queueAttributes.putAll(attributes);
 
-        verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+        verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
     }
 
     public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
@@ -322,7 +317,7 @@ public abstract class AbstractDurableCon
         queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
         queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
 
-        verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+        verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
     }
 
     private ExchangeImpl createTestAlternateExchange()
@@ -355,7 +350,7 @@ public abstract class AbstractDurableCon
         queueAttributes.put(Queue.NAME, getName());
         queueAttributes.putAll(attributes);
 
-        verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+        verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
 
     }
 
@@ -382,7 +377,7 @@ public abstract class AbstractDurableCon
         queueAttributes.putAll(attributes);
         queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
 
-        verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+        verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
     }
 
     public void testRemoveQueue() throws Exception
@@ -397,7 +392,7 @@ public abstract class AbstractDurableCon
         // remove queue
         DurableConfigurationStoreHelper.removeQueue(_configStore,queue);
         reopenStore();
-        verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
+        verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
     }
 
     private AMQQueue createTestQueue(String queueName,
@@ -463,11 +458,9 @@ public abstract class AbstractDurableCon
     {
         ExchangeImpl exchange = mock(ExchangeImpl.class);
         Map<String,Object> actualAttributes = new HashMap<String, Object>();
-        actualAttributes.put("id", _exchangeId);
         actualAttributes.put("name", getName());
         actualAttributes.put("type", getName() + "Type");
         actualAttributes.put("lifetimePolicy", LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
-        when(exchange.getActualAttributes()).thenReturn(actualAttributes);
         when(exchange.getName()).thenReturn(getName());
         when(exchange.getTypeName()).thenReturn(getName() + "Type");
         when(exchange.isAutoDelete()).thenReturn(true);
@@ -475,11 +468,10 @@ public abstract class AbstractDurableCon
         ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class);
         when(exchangeRecord.getId()).thenReturn(_exchangeId);
         when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName());
-        Map<String,Object> actualAttributesExceptId = new HashMap<String, Object>(actualAttributes);
-        actualAttributesExceptId.remove("id");
-        when(exchangeRecord.getAttributes()).thenReturn(actualAttributesExceptId);
+        when(exchangeRecord.getAttributes()).thenReturn(actualAttributes);
         when(exchange.asObjectRecord()).thenReturn(exchangeRecord);
-
+        when(exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
+        when(exchange.getEventLogger()).thenReturn(new EventLogger());
         return exchange;
     }
 
@@ -491,7 +483,7 @@ public abstract class AbstractDurableCon
         ConfiguredObject<?> parent = mock(ConfiguredObject.class);
         when(parent.getName()).thenReturn("testName");
         _configStore.openConfigurationStore(parent, _configurationStoreSettings);
-        _configStore.recoverConfigurationStore(_recoveryHandler);
+        _configStore.visitConfiguredObjectRecords(_handler);
     }
 
     protected abstract DurableConfigurationStore createConfigStore() throws Exception;

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java Fri Apr  4 22:10:24 2014
@@ -27,10 +27,4 @@ public class JsonFileConfigStoreConfigur
     {
         return new JsonFileConfigStore();
     }
-
-    @Override
-    public void testBindQueue() throws Exception
-    {
-        // TODO: Temporarily disable the test as it is already fixed on trunk
-    }
 }

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java Fri Apr  4 22:10:24 2014
@@ -28,6 +28,7 @@ import java.util.UUID;
 
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.test.utils.TestFileUtils;
@@ -43,15 +44,15 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
 
 public class JsonFileConfigStoreTest extends QpidTestCase
 {
-    private final ConfigurationRecoveryHandler _recoveryHandler = mock(ConfigurationRecoveryHandler.class);
-
     private JsonFileConfigStore _store;
     private HashMap<String, Object> _configurationStoreSettings;
     private ConfiguredObject<?> _virtualHost;
     private File _storeLocation;
+    private ConfiguredObjectRecordHandler _handler;
 
 
     private static final UUID ANY_UUID = UUID.randomUUID();
@@ -69,6 +70,9 @@ public class JsonFileConfigStoreTest ext
         _configurationStoreSettings.put(JsonFileConfigStore.STORE_TYPE, JsonFileConfigStore.TYPE);
         _configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, _storeLocation.getAbsolutePath());
         _store = new JsonFileConfigStore();
+
+        _handler = mock(ConfiguredObjectRecordHandler.class);
+        when(_handler.handle(any(ConfiguredObjectRecord.class))).thenReturn(true);
     }
 
     @Override
@@ -113,35 +117,35 @@ public class JsonFileConfigStoreTest ext
         }
     }
 
-    public void testStartFromNoStore() throws Exception
+    public void testVisitEmptyStore()
     {
         _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
-        _store.recoverConfigurationStore(_recoveryHandler);
-        InOrder inorder = inOrder(_recoveryHandler);
-        inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0));
-        inorder.verify(_recoveryHandler,never()).configuredObject(any(ConfiguredObjectRecord.class));
-        inorder.verify(_recoveryHandler).completeConfigurationRecovery();
+        _store.visitConfiguredObjectRecords(_handler);
+        InOrder inorder = inOrder(_handler);
+        inorder.verify(_handler).begin(eq(0));
+        inorder.verify(_handler,never()).handle(any(ConfiguredObjectRecord.class));
+        inorder.verify(_handler).end();
         _store.closeConfigurationStore();
     }
 
     public void testUpdatedConfigVersionIsRetained() throws Exception
     {
         final int NEW_CONFIG_VERSION = 42;
-        when(_recoveryHandler.completeConfigurationRecovery()).thenReturn(NEW_CONFIG_VERSION);
+        when(_handler.end()).thenReturn(NEW_CONFIG_VERSION);
 
         _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
-        _store.recoverConfigurationStore(_recoveryHandler);
+        _store.visitConfiguredObjectRecords(_handler);
         _store.closeConfigurationStore();
 
         _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
-        _store.recoverConfigurationStore(_recoveryHandler);
-        InOrder inorder = inOrder(_recoveryHandler);
+        _store.visitConfiguredObjectRecords(_handler);
+        InOrder inorder = inOrder(_handler);
 
         // first time the config version should be the initial version - 0
-        inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0));
+        inorder.verify(_handler).begin(eq(0));
 
         // second time the config version should be the updated version
-        inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(NEW_CONFIG_VERSION));
+        inorder.verify(_handler).begin(eq(NEW_CONFIG_VERSION));
 
         _store.closeConfigurationStore();
     }
@@ -157,8 +161,9 @@ public class JsonFileConfigStoreTest ext
         _store.closeConfigurationStore();
 
         _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
-        _store.recoverConfigurationStore(_recoveryHandler);
-        verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr));
+
+        _store.visitConfiguredObjectRecords(_handler);
+        verify(_handler, times(1)).handle(matchesRecord(queueId, queueType, queueAttr));
         _store.closeConfigurationStore();
     }
 
@@ -179,8 +184,8 @@ public class JsonFileConfigStoreTest ext
         _store.closeConfigurationStore();
 
         _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
-        _store.recoverConfigurationStore(_recoveryHandler);
-        verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr));
+        _store.visitConfiguredObjectRecords(_handler);
+        verify(_handler, times(1)).handle(matchesRecord(queueId, queueType, queueAttr));
         _store.closeConfigurationStore();
     }
 
@@ -201,8 +206,8 @@ public class JsonFileConfigStoreTest ext
         _store.closeConfigurationStore();
 
         _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
-        _store.recoverConfigurationStore(_recoveryHandler);
-        verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
+        _store.visitConfiguredObjectRecords(_handler);
+        verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
         _store.closeConfigurationStore();
     }
 
@@ -311,12 +316,12 @@ public class JsonFileConfigStoreTest ext
         _store.update(true, bindingRecord, binding2Record);
         _store.closeConfigurationStore();
         _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
-        _store.recoverConfigurationStore(_recoveryHandler);
-        verify(_recoveryHandler).configuredObject(matchesRecord(queueId, "Queue", EMPTY_ATTR));
-        verify(_recoveryHandler).configuredObject(matchesRecord(queue2Id, "Queue", EMPTY_ATTR));
-        verify(_recoveryHandler).configuredObject(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR));
-        verify(_recoveryHandler).configuredObject(matchesRecord(bindingId, "Binding", EMPTY_ATTR));
-        verify(_recoveryHandler).configuredObject(matchesRecord(binding2Id, "Binding", EMPTY_ATTR));
+        _store.visitConfiguredObjectRecords(_handler);
+        verify(_handler).handle(matchesRecord(queueId, "Queue", EMPTY_ATTR));
+        verify(_handler).handle(matchesRecord(queue2Id, "Queue", EMPTY_ATTR));
+        verify(_handler).handle(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR));
+        verify(_handler).handle(matchesRecord(bindingId, "Binding", EMPTY_ATTR));
+        verify(_handler).handle(matchesRecord(binding2Id, "Binding", EMPTY_ATTR));
         _store.closeConfigurationStore();
 
     }

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java Fri Apr  4 22:10:24 2014
@@ -33,7 +33,6 @@ import java.util.UUID;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.util.FileUtils;
 
@@ -66,12 +65,9 @@ public abstract class MessageStoreQuotaE
 
         _store = createStore();
 
-        MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
-        when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
         ConfiguredObject<?> parent = mock(ConfiguredObject.class);
         when(parent.getName()).thenReturn("test");
         _store.openMessageStore(parent, storeSettings);
-        _store.recoverMessageStore(recoveryHandler, null);
 
         _transactionResource = UUID.randomUUID();
         _events = new ArrayList<Event>();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org