You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/04/05 00:10:25 UTC

svn commit: r1584926 [1/3] - in /qpid/branches/java-broker-config-store-changes/qpid/java: ./ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ bdbstore/src/main/java/o...

Author: rgodfrey
Date: Fri Apr  4 22:10:24 2014
New Revision: 1584926

URL: http://svn.apache.org/r1584926
Log:
Merge from trunk

Added:
    qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
      - copied unchanged from r1584910, qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java
      - copied unchanged from r1584910, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Xid.java
      - copied unchanged from r1584910, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Xid.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/
      - copied from r1584910, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
      - copied unchanged from r1584910, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java
      - copied unchanged from r1584910, qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java
      - copied unchanged from r1584910, qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-plugins/memory-store/src/test/
      - copied from r1584910, qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/
    qpid/branches/java-broker-config-store-changes/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
      - copied unchanged from r1584910, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
Removed:
    qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java
    qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
    qpid/branches/java-broker-config-store-changes/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
Modified:
    qpid/branches/java-broker-config-store-changes/qpid/java/   (props changed)
    qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
    qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java
    qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-plugins/memory-store/pom.xml
    qpid/branches/java-broker-config-store-changes/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
    qpid/branches/java-broker-config-store-changes/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
    qpid/branches/java-broker-config-store-changes/qpid/java/test-profiles/CPPExcludes   (contents, props changed)
    qpid/branches/java-broker-config-store-changes/qpid/java/test-profiles/JavaBDBExcludes   (contents, props changed)
    qpid/branches/java-broker-config-store-changes/qpid/java/test-profiles/JavaTransientExcludes   (contents, props changed)

Propchange: qpid/branches/java-broker-config-store-changes/qpid/java/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java:r1584339-1584910

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java Fri Apr  4 22:10:24 2014
@@ -29,15 +29,15 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.DurableConfigurationRecoverer;
+import org.apache.qpid.server.store.ConfiguredObjectRecordRecoveverAndUpgrader;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
 import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
-import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider;
+import org.apache.qpid.server.virtualhost.MessageStoreRecoverer;
 import org.apache.qpid.server.virtualhost.State;
-import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
 import com.sleepycat.je.rep.StateChangeEvent;
@@ -98,17 +98,12 @@ public class BDBHAVirtualHost extends Ab
         {
             _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
 
-            DefaultUpgraderProvider upgraderProvider = new DefaultUpgraderProvider(this);
-
-            DurableConfigurationRecoverer configRecoverer =
-                    new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
-                            upgraderProvider, getEventLogger());
-            _messageStore.recoverConfigurationStore(configRecoverer);
+            ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
+            _messageStore.visitConfiguredObjectRecords(upgraderRecoverer);
 
             initialiseModel();
 
-            VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this, getMessageStoreLogSubject());
-            _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
+            new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
 
             attainActivation();
         }

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java Fri Apr  4 22:10:24 2014
@@ -27,8 +27,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
@@ -38,29 +36,23 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.Event;
 import org.apache.qpid.server.store.EventListener;
 import org.apache.qpid.server.store.EventManager;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMemoryMessage;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.Xid;
 import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
 import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
 import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
 import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
 import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
 import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
 import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding;
@@ -70,6 +62,10 @@ import org.apache.qpid.server.store.berk
 import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
 import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
 import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.util.FileUtils;
 
@@ -129,7 +125,6 @@ public class BDBMessageStore implements 
     private long _persistentSizeHighThreshold;
 
     private final EventManager _eventManager = new EventManager();
-    private final String _type;
 
     private final EnvironmentFacadeFactory _environmentFacadeFactory;
 
@@ -143,7 +138,6 @@ public class BDBMessageStore implements 
 
     public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory)
     {
-        _type = environmentFacadeFactory.getType();
         _environmentFacadeFactory = environmentFacadeFactory;
     }
 
@@ -160,18 +154,19 @@ public class BDBMessageStore implements 
         {
             if (_environmentFacade == null)
             {
-                String[] databaseNames = null;
+                EnvironmentFacadeTask[] initialisationTasks = null;
                 if (MapValueConverter.getBooleanAttribute(IS_MESSAGE_STORE_TOO, storeSettings, false))
                 {
-                    databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
+                    String[] databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
                     System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length);
                     System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length);
+                    initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(databaseNames), new DiskSpaceTask(), new MaxMessageIdTask() };
                 }
                 else
                 {
-                    databaseNames = CONFIGURATION_STORE_DATABASE_NAMES;
+                    initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(CONFIGURATION_STORE_DATABASE_NAMES)};
                 }
-                _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, new UpgradeTask(parent), new OpenDatabasesTask(databaseNames));
+                _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, initialisationTasks);
             }
             else
             {
@@ -181,11 +176,88 @@ public class BDBMessageStore implements 
     }
 
     @Override
-    public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+    public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
     {
         checkConfigurationStoreOpen();
 
-        recoverConfig(recoveryHandler);
+        try
+        {
+            int configVersion = getConfigVersion();
+
+            handler.begin(configVersion);
+            doVisitAllConfiguredObjectRecords(handler);
+
+            int newConfigVersion = handler.end();
+            if(newConfigVersion != configVersion)
+            {
+                updateConfigVersion(newConfigVersion);
+            }
+        }
+        catch (DatabaseException e)
+        {
+            throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e);
+        }
+
+    }
+
+    private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
+    {
+        Map<UUID, BDBConfiguredObjectRecord> configuredObjects = new HashMap<UUID, BDBConfiguredObjectRecord>();
+        Cursor objectsCursor = null;
+        Cursor hierarchyCursor = null;
+        try
+        {
+            objectsCursor = getConfiguredObjectsDb().openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+
+
+            while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
+
+                BDBConfiguredObjectRecord configuredObject =
+                        (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
+                configuredObjects.put(configuredObject.getId(), configuredObject);
+            }
+
+            // set parents
+            hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null);
+            while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
+                UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
+                BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
+                if(child != null)
+                {
+                    ConfiguredObjectRecord parent = configuredObjects.get(parentId);
+                    if(parent != null)
+                    {
+                        child.addParent(hk.getParentType(), parent);
+                    }
+                    else if(hk.getParentType().equals("Exchange"))
+                    {
+                        // TODO - remove this hack for the pre-defined exchanges
+                        child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
+                    }
+                }
+            }
+        }
+        finally
+        {
+            closeCursorSafely(objectsCursor);
+            closeCursorSafely(hierarchyCursor);
+        }
+
+        for (ConfiguredObjectRecord record : configuredObjects.values())
+        {
+            boolean shoudlContinue = handler.handle(record);
+            if (!shoudlContinue)
+            {
+                break;
+            }
+        }
+
     }
 
     @Override
@@ -209,7 +281,8 @@ public class BDBMessageStore implements 
 
             if (_environmentFacade == null)
             {
-                _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask());
+                _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings,
+                        new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask(), new MaxMessageIdTask());
             }
 
             _committer = _environmentFacade.createCommitter(parent.getName());
@@ -218,21 +291,6 @@ public class BDBMessageStore implements 
     }
 
     @Override
-    public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException
-    {
-        checkMessageStoreOpen();
-
-        if(messageRecoveryHandler != null)
-        {
-            recoverMessages(messageRecoveryHandler);
-        }
-        if(transactionLogRecoveryHandler != null)
-        {
-            recoverQueueEntries(transactionLogRecoveryHandler);
-        }
-    }
-
-    @Override
     public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException
     {
         checkMessageStoreOpen();
@@ -314,27 +372,6 @@ public class BDBMessageStore implements 
         }
     }
 
-    private void recoverConfig(ConfigurationRecoveryHandler recoveryHandler) throws StoreException
-    {
-        try
-        {
-            final int configVersion = getConfigVersion();
-            recoveryHandler.beginConfigurationRecovery(this, configVersion);
-            loadConfiguredObjects(recoveryHandler);
-
-            final int newConfigVersion = recoveryHandler.completeConfigurationRecovery();
-            if(newConfigVersion != configVersion)
-            {
-                updateConfigVersion(newConfigVersion);
-            }
-        }
-        catch (DatabaseException e)
-        {
-            throw _environmentFacade.handleDatabaseException("Error recovering persistent state: " + e.getMessage(), e);
-        }
-
-    }
-
     @SuppressWarnings("resource")
     private void updateConfigVersion(int newConfigVersion) throws StoreException
     {
@@ -399,62 +436,6 @@ public class BDBMessageStore implements 
         }
     }
 
-    private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException, StoreException
-    {
-        Cursor objectsCursor = null;
-        Cursor hierarchyCursor = null;
-        try
-        {
-            objectsCursor = getConfiguredObjectsDb().openCursor(null, null);
-            DatabaseEntry key = new DatabaseEntry();
-            DatabaseEntry value = new DatabaseEntry();
-
-            Map<UUID, BDBConfiguredObjectRecord> configuredObjects =
-                    new HashMap<UUID, BDBConfiguredObjectRecord>();
-
-            while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
-
-                BDBConfiguredObjectRecord configuredObject =
-                        (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
-                configuredObjects.put(configuredObject.getId(), configuredObject);
-            }
-
-            // set parents
-            hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null);
-            while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
-                UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
-                BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
-                if(child != null)
-                {
-                    ConfiguredObjectRecord parent = configuredObjects.get(parentId);
-                    if(parent != null)
-                    {
-                        child.addParent(hk.getParentType(), parent);
-                    }
-                    else if(hk.getParentType().equals("Exchange"))
-                    {
-                        // TODO - remove this hack for the pre-defined exchanges
-                        child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
-                    }
-                }
-            }
-
-            for (ConfiguredObjectRecord record : configuredObjects.values())
-            {
-                crh.configuredObject(record);
-            }
-        }
-        finally
-        {
-            closeCursorSafely(objectsCursor);
-            closeCursorSafely(hierarchyCursor);
-        }
-    }
-
     private void closeCursorSafely(Cursor cursor) throws StoreException
     {
         if (cursor != null)
@@ -470,124 +451,6 @@ public class BDBMessageStore implements 
         }
     }
 
-    private void recoverMessages(MessageStoreRecoveryHandler msrh) throws StoreException
-    {
-        StoredMessageRecoveryHandler mrh = msrh.begin();
-
-        Cursor cursor = null;
-        try
-        {
-            cursor = getMessageMetaDataDb().openCursor(null, null);
-            DatabaseEntry key = new DatabaseEntry();
-            DatabaseEntry value = new DatabaseEntry();
-            MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
-
-            long maxId = 0;
-
-            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                long messageId = LongBinding.entryToLong(key);
-                StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-
-                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
-
-                mrh.message(message);
-
-                maxId = Math.max(maxId, messageId);
-            }
-
-            _messageId.set(maxId);
-            mrh.completeMessageRecovery();
-        }
-        catch (DatabaseException e)
-        {
-            throw _environmentFacade.handleDatabaseException("Cannot recover messages", e);
-        }
-        finally
-        {
-            closeCursorSafely(cursor);
-        }
-    }
-
-    private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
-    throws StoreException
-    {
-        QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
-
-        ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
-
-        Cursor cursor = null;
-        try
-        {
-            cursor = getDeliveryDb().openCursor(null, null);
-            DatabaseEntry key = new DatabaseEntry();
-            QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
-
-            DatabaseEntry value = new DatabaseEntry();
-            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                QueueEntryKey qek = keyBinding.entryToObject(key);
-
-                entries.add(qek);
-            }
-
-            try
-            {
-                cursor.close();
-            }
-            finally
-            {
-                cursor = null;
-            }
-
-            for(QueueEntryKey entry : entries)
-            {
-                UUID queueId = entry.getQueueId();
-                long messageId = entry.getMessageId();
-                qerh.queueEntry(queueId, messageId);
-            }
-        }
-        catch (DatabaseException e)
-        {
-            throw _environmentFacade.handleDatabaseException("Cannot recover queue entries", e);
-        }
-        finally
-        {
-            closeCursorSafely(cursor);
-        }
-
-        TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery();
-
-        cursor = null;
-        try
-        {
-            cursor = getXidDb().openCursor(null, null);
-            DatabaseEntry key = new DatabaseEntry();
-            XidBinding keyBinding = XidBinding.getInstance();
-            PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
-            DatabaseEntry value = new DatabaseEntry();
-
-            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                Xid xid = keyBinding.entryToObject(key);
-                PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
-                dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
-                                preparedTransaction.getEnqueues(),preparedTransaction.getDequeues());
-            }
-
-        }
-        catch (DatabaseException e)
-        {
-            throw _environmentFacade.handleDatabaseException("Cannot recover transactions", e);
-        }
-        finally
-        {
-            closeCursorSafely(cursor);
-        }
-
-
-        dtxrh.completeDtxRecordRecovery();
-    }
 
     void removeMessage(long messageId, boolean sync) throws StoreException
     {
@@ -738,6 +601,12 @@ public class BDBMessageStore implements 
     public void create(ConfiguredObjectRecord configuredObject) throws StoreException
     {
         checkConfigurationStoreOpen();
+
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Create " + configuredObject);
+        }
+
         com.sleepycat.je.Transaction txn = null;
         try
         {
@@ -831,7 +700,7 @@ public class BDBMessageStore implements 
     {
         if (LOGGER.isDebugEnabled())
         {
-            LOGGER.debug("Updating " + record.getType() + ", id: " + record.getId());
+            LOGGER.debug("Updating, creating " + createIfNecessary + " : "  + record);
         }
 
         DatabaseEntry key = new DatabaseEntry();
@@ -889,8 +758,7 @@ public class BDBMessageStore implements 
             if (LOGGER.isDebugEnabled())
             {
                 LOGGER.debug("Enqueuing message " + messageId + " on queue "
-                        + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
-                        + " in transaction " + tx);
+                        + queue.getName() + " with id " + queue.getId() + " in transaction " + tx);
             }
             getDeliveryDb().put(tx, key, value);
         }
@@ -898,8 +766,7 @@ public class BDBMessageStore implements 
         {
             LOGGER.error("Failed to enqueue: " + e.getMessage(), e);
             throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue "
-                    + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
-                    + " to database", e);
+                    + queue.getName() + " with id " + queue.getId() + " to database", e);
         }
     }
 
@@ -924,7 +791,7 @@ public class BDBMessageStore implements 
         if (LOGGER.isDebugEnabled())
         {
             LOGGER.debug("Dequeue message id " + messageId + " from queue "
-                    + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+                    + queue.getName() + " with id " + id);
         }
 
         try
@@ -934,19 +801,18 @@ public class BDBMessageStore implements 
             if (status == OperationStatus.NOTFOUND)
             {
                 throw new StoreException("Unable to find message with id " + messageId + " on queue "
-                        + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+                        + queue.getName() + " with id "  + id);
             }
             else if (status != OperationStatus.SUCCESS)
             {
                 throw new StoreException("Unable to remove message with id " + messageId + " on queue"
-                        + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+                        + queue.getName() + " with id " + id);
             }
 
             if (LOGGER.isDebugEnabled())
             {
                 LOGGER.debug("Removed message " + messageId + " on queue "
-                        + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id
-                        + " from delivery db");
+                        + queue.getName() + " with id " + id);
 
             }
         }
@@ -1072,57 +938,6 @@ public class BDBMessageStore implements 
     }
 
     /**
-     * Primarily for testing purposes.
-     *
-     * @param queueId
-     *
-     * @return a list of message ids for messages enqueued for a particular queue
-     */
-    List<Long> getEnqueuedMessages(UUID queueId) throws StoreException
-    {
-        Cursor cursor = null;
-        try
-        {
-            cursor = getDeliveryDb().openCursor(null, null);
-
-            DatabaseEntry key = new DatabaseEntry();
-
-            QueueEntryKey dd = new QueueEntryKey(queueId, 0);
-
-            QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
-            keyBinding.objectToEntry(dd, key);
-
-            DatabaseEntry value = new DatabaseEntry();
-
-            LinkedList<Long> messageIds = new LinkedList<Long>();
-
-            OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
-            dd = keyBinding.entryToObject(key);
-
-            while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId))
-            {
-
-                messageIds.add(dd.getMessageId());
-                status = cursor.getNext(key, value, LockMode.DEFAULT);
-                if (status == OperationStatus.SUCCESS)
-                {
-                    dd = keyBinding.entryToObject(key);
-                }
-            }
-
-            return messageIds;
-        }
-        catch (DatabaseException e)
-        {
-            throw new StoreException("Database error: " + e.getMessage(), e);
-        }
-        finally
-        {
-            closeCursorSafely(cursor);
-        }
-    }
-
-    /**
      * Return a valid, currently unused message id.
      *
      * @return A fresh message id.
@@ -1792,12 +1607,6 @@ public class BDBMessageStore implements 
         }
     }
 
-    @Override
-    public String getStoreType()
-    {
-        return _type;
-    }
-
     private Database getConfiguredObjectsDb()
     {
         return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME);
@@ -1901,4 +1710,147 @@ public class BDBMessageStore implements 
         }
 
     }
+
+    public class MaxMessageIdTask implements EnvironmentFacadeTask, MessageHandler
+    {
+        private long _maxId;
+
+        @Override
+        public void execute(EnvironmentFacade facade)
+        {
+            visitMessagesInternal(this, facade);
+            _messageId.set(_maxId);
+        }
+
+        @Override
+        public boolean handle(StoredMessage<?> storedMessage)
+        {
+            long id = storedMessage.getMessageNumber();
+            if (_maxId<id)
+            {
+                _maxId = id;
+            }
+            return true;
+        }
+
+    }
+
+    @Override
+    public void visitMessages(MessageHandler handler) throws StoreException
+    {
+        checkMessageStoreOpen();
+        visitMessagesInternal(handler, _environmentFacade);
+    }
+
+    private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade)
+    {
+        Cursor cursor = null;
+        try
+        {
+            cursor = environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME).openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+            MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
+
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                long messageId = LongBinding.entryToLong(key);
+                StorableMessageMetaData metaData = valueBinding.entryToObject(value);
+                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+
+                if (!handler.handle(message))
+                {
+                    break;
+                }
+            }
+        }
+        catch (DatabaseException e)
+        {
+            throw environmentFacade.handleDatabaseException("Cannot recover messages", e);
+        }
+        finally
+        {
+            if (cursor != null)
+            {
+                try
+                {
+                    cursor.close();
+                }
+                catch(DatabaseException e)
+                {
+                    throw environmentFacade.handleDatabaseException("Cannot close cursor", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+    {
+        checkMessageStoreOpen();
+
+        Cursor cursor = null;
+        try
+        {
+            cursor = getDeliveryDb().openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+
+            DatabaseEntry value = new DatabaseEntry();
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                QueueEntryKey entry = keyBinding.entryToObject(key);
+                UUID queueId = entry.getQueueId();
+                long messageId = entry.getMessageId();
+                if (!handler.handle(queueId, messageId))
+                {
+                    break;
+                }
+            }
+        }
+        catch (DatabaseException e)
+        {
+            throw _environmentFacade.handleDatabaseException("Cannot visit message instances", e);
+        }
+        finally
+        {
+            closeCursorSafely(cursor);
+        }
+    }
+
+    @Override
+    public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+    {
+        checkMessageStoreOpen();
+
+        Cursor cursor = null;
+        try
+        {
+            cursor = getXidDb().openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            XidBinding keyBinding = XidBinding.getInstance();
+            PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
+            DatabaseEntry value = new DatabaseEntry();
+
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                Xid xid = keyBinding.entryToObject(key);
+                PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
+                if (!handler.handle(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
+                                preparedTransaction.getEnqueues(),preparedTransaction.getDequeues()))
+                {
+                    break;
+                }
+            }
+
+        }
+        catch (DatabaseException e)
+        {
+            throw _environmentFacade.handleDatabaseException("Cannot recover distributed transactions", e);
+        }
+        finally
+        {
+            closeCursorSafely(cursor);
+        }
+    }
 }

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java Fri Apr  4 22:10:24 2014
@@ -25,7 +25,7 @@ import com.sleepycat.bind.tuple.TupleBin
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
 
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+import org.apache.qpid.server.store.Xid;
 
 public class XidBinding extends TupleBinding<Xid>
 {

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java Fri Apr  4 22:10:24 2014
@@ -48,7 +48,7 @@ import org.apache.qpid.server.model.Life
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+import org.apache.qpid.server.store.Xid;
 import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
 import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey;
 import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding;

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java Fri Apr  4 22:10:24 2014
@@ -35,12 +35,12 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Model;
 import org.apache.qpid.server.model.SystemContext;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader;
 import org.apache.qpid.server.store.NonNullUpgrader;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 
 public class BrokerStoreUpgrader
 {
@@ -583,17 +583,17 @@ public class BrokerStoreUpgrader
 
     public Broker upgrade(DurableConfigurationStore store)
     {
-        final BrokerStoreRecoveryHandler recoveryHandler = new BrokerStoreRecoveryHandler(_systemContext);
+        final BrokerStoreRecoveryHandler recoveryHandler = new BrokerStoreRecoveryHandler(_systemContext, store);
         store.openConfigurationStore(_systemContext, Collections.<String,Object>emptyMap());
-        store.recoverConfigurationStore(recoveryHandler);
+        store.visitConfiguredObjectRecords(recoveryHandler);
 
         return recoveryHandler.getBroker();
     }
 
 
-    private static class BrokerStoreRecoveryHandler implements ConfigurationRecoveryHandler
+    private static class BrokerStoreRecoveryHandler implements ConfiguredObjectRecordHandler
     {
-        private static Logger LOGGER = Logger.getLogger(ConfigurationRecoveryHandler.class);
+        private static Logger LOGGER = Logger.getLogger(BrokerStoreRecoveryHandler.class);
 
         private DurableConfigurationStoreUpgrader _upgrader;
         private DurableConfigurationStore _store;
@@ -601,27 +601,28 @@ public class BrokerStoreUpgrader
         private int _version;
         private final SystemContext _systemContext;
 
-        private BrokerStoreRecoveryHandler(final SystemContext systemContext)
+        private BrokerStoreRecoveryHandler(final SystemContext systemContext, DurableConfigurationStore store)
         {
             _systemContext = systemContext;
+            _store = store;
         }
 
 
         @Override
-        public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+        public void begin(final int configVersion)
         {
-            _store = store;
             _version = configVersion;
         }
 
         @Override
-        public void configuredObject(final ConfiguredObjectRecord object)
+        public boolean handle(final ConfiguredObjectRecord object)
         {
             _records.put(object.getId(), object);
+            return true;
         }
 
         @Override
-        public int completeConfigurationRecovery()
+        public int end()
         {
             String version = getCurrentVersion();
 

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java Fri Apr  4 22:10:24 2014
@@ -20,14 +20,6 @@
  */
 package org.apache.qpid.server.configuration.store;
 
-import org.apache.qpid.server.configuration.ConfigurationEntry;
-import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.StoreException;
-
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -35,6 +27,14 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.qpid.server.configuration.ConfigurationEntry;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+
 public class JsonConfigurationEntryStore extends MemoryConfigurationEntryStore
 {
     public static final String STORE_TYPE = "json";
@@ -124,30 +124,31 @@ public class JsonConfigurationEntryStore
         else
         {
             final Collection<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>();
-            final ConfigurationRecoveryHandler replayHandler = new ConfigurationRecoveryHandler()
+            final ConfiguredObjectRecordHandler replayHandler = new ConfiguredObjectRecordHandler()
             {
                 private int _configVersion;
                 @Override
-                public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+                public void begin(final int configVersion)
                 {
                     _configVersion = configVersion;
                 }
 
                 @Override
-                public void configuredObject(ConfiguredObjectRecord record)
+                public boolean handle(ConfiguredObjectRecord record)
                 {
                     records.add(record);
+                    return true;
                 }
 
                 @Override
-                public int completeConfigurationRecovery()
+                public int end()
                 {
                     return _configVersion;
                 }
             };
 
             initialStore.openConfigurationStore(_parentObject, Collections.<String,Object>emptyMap());
-            initialStore.recoverConfigurationStore(replayHandler);
+            initialStore.visitConfiguredObjectRecords(replayHandler);
 
             update(true, records.toArray(new ConfiguredObjectRecord[records.size()]));
         }

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java Fri Apr  4 22:10:24 2014
@@ -38,11 +38,11 @@ import org.apache.qpid.server.model.Port
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.server.util.MapValueConverter;
 
 public class ManagementModeStoreHandler implements DurableConfigurationStore
@@ -80,20 +80,21 @@ public class ManagementModeStoreHandler 
 
 
         _records = new HashMap<UUID, ConfiguredObjectRecord>();
-        final ConfigurationRecoveryHandler localRecoveryHandler = new ConfigurationRecoveryHandler()
+        final ConfiguredObjectRecordHandler localRecoveryHandler = new ConfiguredObjectRecordHandler()
         {
             private int _version;
             private boolean _quiesceRmiPort = _options.getManagementModeRmiPortOverride() > 0;
             private boolean _quiesceJmxPort = _options.getManagementModeJmxPortOverride() > 0;
             private boolean _quiesceHttpPort = _options.getManagementModeHttpPortOverride() > 0;
+
             @Override
-            public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+            public void begin(final int configVersion)
             {
                 _version = configVersion;
             }
 
             @Override
-            public void configuredObject(final ConfiguredObjectRecord object)
+            public boolean handle(final ConfiguredObjectRecord object)
             {
                 String entryType = object.getType();
                 Map<String, Object> attributes = object.getAttributes();
@@ -153,11 +154,12 @@ public class ManagementModeStoreHandler 
                 {
                     _records.put(object.getId(), object);
                 }
+                return true;
             }
 
 
             @Override
-            public int completeConfigurationRecovery()
+            public int end()
             {
                 return _version;
             }
@@ -166,7 +168,7 @@ public class ManagementModeStoreHandler 
 
 
 
-        _store.recoverConfigurationStore(localRecoveryHandler);
+        _store.visitConfiguredObjectRecords(localRecoveryHandler);
 
         _cliEntries = createPortsFromCommandLineOptions(_options);
 
@@ -179,17 +181,20 @@ public class ManagementModeStoreHandler 
     }
 
     @Override
-    public void recoverConfigurationStore(final ConfigurationRecoveryHandler recoveryHandler) throws StoreException
+    public void visitConfiguredObjectRecords(final ConfiguredObjectRecordHandler recoveryHandler) throws StoreException
     {
 
 
-        recoveryHandler.beginConfigurationRecovery(this,0);
+        recoveryHandler.begin(0);
 
         for(ConfiguredObjectRecord record : _records.values())
         {
-            recoveryHandler.configuredObject(record);
+            if(!recoveryHandler.handle(record))
+            {
+                break;
+            }
         }
-        recoveryHandler.completeConfigurationRecovery();
+        recoveryHandler.end();
     }
 
 
@@ -357,16 +362,16 @@ public class ManagementModeStoreHandler 
         final int managementModeJmxPortOverride = options.getManagementModeJmxPortOverride();
         final int managementModeHttpPortOverride = options.getManagementModeHttpPortOverride();
 
-        _store.recoverConfigurationStore(new ConfigurationRecoveryHandler()
+        _store.visitConfiguredObjectRecords(new ConfiguredObjectRecordHandler()
         {
             @Override
-            public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+            public void begin(final int configVersion)
             {
 
             }
 
             @Override
-            public void configuredObject(final ConfiguredObjectRecord entry)
+            public boolean handle(final ConfiguredObjectRecord entry)
             {
                 String entryType = entry.getType();
                 Map<String, Object> attributes = entry.getAttributes();
@@ -417,11 +422,12 @@ public class ManagementModeStoreHandler 
                     // save original state
                     quiescedEntries.put(entry.getId(), attributes.get(ATTRIBUTE_STATE));
                 }
+                return true;
             }
 
 
             @Override
-            public int completeConfigurationRecovery()
+            public int end()
             {
                 return 0;
             }

Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java Fri Apr  4 22:10:24 2014
@@ -58,10 +58,9 @@ import org.apache.qpid.server.model.Conf
 import org.apache.qpid.server.model.Model;
 import org.apache.qpid.server.model.SystemContext;
 import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.util.Strings;
 import org.apache.qpid.util.Strings.ChainedResolver;
 
@@ -128,30 +127,31 @@ public class MemoryConfigurationEntrySto
                 _storeLocation = initialStore.getStoreLocation();
             }
             final Collection<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>();
-            final ConfigurationRecoveryHandler replayHandler = new ConfigurationRecoveryHandler()
+            final ConfiguredObjectRecordHandler replayHandler = new ConfiguredObjectRecordHandler()
             {
                 private int _configVersion;
                 @Override
-                public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+                public void begin(final int configVersion)
                 {
                     _configVersion = configVersion;
                 }
 
                 @Override
-                public void configuredObject(ConfiguredObjectRecord record)
+                public boolean handle(ConfiguredObjectRecord record)
                 {
                     records.add(record);
+                    return true;
                 }
 
                 @Override
-                public int completeConfigurationRecovery()
+                public int end()
                 {
                     return _configVersion;
                 }
             };
 
             initialStore.openConfigurationStore(parentObject, Collections.<String,Object>emptyMap());
-            initialStore.recoverConfigurationStore(replayHandler);
+            initialStore.visitConfiguredObjectRecords(replayHandler);
 
             update(true, records.toArray(new ConfiguredObjectRecord[records.size()]));
 
@@ -365,10 +365,10 @@ public class MemoryConfigurationEntrySto
     }
 
     @Override
-    public void recoverConfigurationStore(final ConfigurationRecoveryHandler recoveryHandler) throws StoreException
+    public void visitConfiguredObjectRecords(final ConfiguredObjectRecordHandler recoveryHandler) throws StoreException
     {
 
-        recoveryHandler.beginConfigurationRecovery(this,0);
+        recoveryHandler.begin(0);
 
         final Map<UUID,Map<String,UUID>> parentMap = new HashMap<UUID, Map<String, UUID>>();
 
@@ -435,9 +435,12 @@ public class MemoryConfigurationEntrySto
         }
         for(ConfiguredObjectRecord record : records.values())
         {
-            recoveryHandler.configuredObject(record);
+            if(!recoveryHandler.handle(record))
+            {
+                break;
+            }
         }
-        recoveryHandler.completeConfigurationRecovery();
+        recoveryHandler.end();
 
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org