You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/04/22 14:32:23 UTC

svn commit: r1589112 [2/2] - in /qpid/trunk/qpid/java/broker-core/src: main/java/org/apache/qpid/server/ main/java/org/apache/qpid/server/configuration/startup/ main/java/org/apache/qpid/server/model/ main/java/org/apache/qpid/server/registry/ main/jav...

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1589112&r1=1589111&r2=1589112&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java Tue Apr 22 12:32:23 2014
@@ -20,9 +20,6 @@
  */
 package org.apache.qpid.server.store;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -31,13 +28,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.UUID;
 
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.startup.StoreUpgraderPhase;
-import org.apache.qpid.server.configuration.startup.UpgraderPhaseFactory;
 import org.apache.qpid.server.filter.FilterSupport;
 import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.BrokerModel;
-import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObjectFactory;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
@@ -45,14 +37,11 @@ import org.apache.qpid.server.model.UUID
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public class VirtualHostStoreUpgraderAndRecoverer
 {
-    private final ConfiguredObjectFactory _objectFactory;
     private final VirtualHostNode<?> _virtualHostNode;
-    private Map<String, UpgraderPhaseFactory> _upgraders = new HashMap<String, UpgraderPhaseFactory>();
+    private Map<String, StoreUpgraderPhase> _upgraders = new HashMap<String, StoreUpgraderPhase>();
 
     @SuppressWarnings("serial")
     private static final Map<String, String> DEFAULT_EXCHANGES = Collections.unmodifiableMap(new HashMap<String, String>()
@@ -68,12 +57,11 @@ public class VirtualHostStoreUpgraderAnd
     public VirtualHostStoreUpgraderAndRecoverer(VirtualHostNode<?> virtualHostNode, ConfiguredObjectFactory objectFactory)
     {
         _virtualHostNode = virtualHostNode;
-        _objectFactory = objectFactory;
-        register(new UpgraderFactory_0_0());
-        register(new UpgraderFactory_0_1());
-        register(new UpgraderFactory_0_2());
-        register(new UpgraderFactory_0_3());
-        register(new UpgraderFactory_0_4());
+        register(new Upgrader_0_0_to_0_1());
+        register(new Upgrader_0_1_to_0_2());
+        register(new Upgrader_0_2_to_0_3());
+        register(new Upgrader_0_3_to_0_4());
+        register(new Upgrader_0_4_to_0_5());
 
         Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>();
         for (String exchangeName : DEFAULT_EXCHANGES.keySet())
@@ -84,9 +72,9 @@ public class VirtualHostStoreUpgraderAnd
         _defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds);
     }
 
-    private void register(UpgraderPhaseFactory factory)
+    private void register(StoreUpgraderPhase upgrader)
     {
-        _upgraders.put(factory.getFromVersion(), factory);
+        _upgraders.put(upgrader.getFromVersion(), upgrader);
     }
 
     /*
@@ -94,100 +82,91 @@ public class VirtualHostStoreUpgraderAnd
      * such bindings would have been ignored, starting from the point at which the config version changed, these
      * arguments would actually cause selectors to be enforced, thus changing which messages would reach a queue.
      */
-    private class UpgraderFactory_0_0  extends UpgraderPhaseFactory
+    private class Upgrader_0_0_to_0_1  extends StoreUpgraderPhase
     {
         private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>();
 
-        public UpgraderFactory_0_0()
+        public Upgrader_0_0_to_0_1()
         {
-            super("0.0", "0.1");
+            super("modelVersion", "0.0", "0.1");
         }
 
-
         @Override
-        public StoreUpgraderPhase newInstance()
+        public void configuredObject(final ConfiguredObjectRecord record)
         {
-            return new StoreUpgraderPhase("modelVersion", getToVersion())
-            {
+            _records.put(record.getId(), record);
+        }
 
-                @Override
-                public void configuredObject(final ConfiguredObjectRecord record)
-                {
-                    _records.put(record.getId(), record);
-                }
+        private void removeSelectorArguments(Map<String, Object> binding)
+        {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
 
-                private void removeSelectorArguments(Map<String, Object> binding)
-                {
-                    @SuppressWarnings("unchecked")
-                    Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
+            FilterSupport.removeFilters(arguments);
+            binding.put(Binding.ARGUMENTS, arguments);
+        }
 
-                    FilterSupport.removeFilters(arguments);
-                    binding.put(Binding.ARGUMENTS, arguments);
-                }
+        private boolean isTopicExchange(ConfiguredObjectRecord entry)
+        {
+            ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange");
+            if (exchangeRecord == null)
+            {
+                return false;
+            }
+            UUID exchangeId = exchangeRecord.getId();
 
-                private boolean isTopicExchange(ConfiguredObjectRecord entry)
+            if(_records.containsKey(exchangeId))
+            {
+                return "topic".equals(_records.get(exchangeId)
+                        .getAttributes()
+                        .get(org.apache.qpid.server.model.Exchange.TYPE));
+            }
+            else
+            {
+                if (_defaultExchangeIds.get("amq.topic").equals(exchangeId))
                 {
-                    ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange");
-                    if (exchangeRecord == null)
-                    {
-                        return false;
-                    }
-                    UUID exchangeId = exchangeRecord.getId();
+                    return true;
+                }
 
-                    if(_records.containsKey(exchangeId))
-                    {
-                        return "topic".equals(_records.get(exchangeId)
-                                .getAttributes()
-                                .get(org.apache.qpid.server.model.Exchange.TYPE));
-                    }
-                    else
-                    {
-                        if (_defaultExchangeIds.get("amq.topic").equals(exchangeId))
-                        {
-                            return true;
-                        }
+                return false;
+            }
 
-                        return false;
-                    }
+        }
 
-                }
+        private boolean hasSelectorArguments(Map<String, Object> binding)
+        {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
+            return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
+        }
 
-                private boolean hasSelectorArguments(Map<String, Object> binding)
+        @Override
+        public void complete()
+        {
+            for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _records.entrySet())
+            {
+                ConfiguredObjectRecord record = entry.getValue();
+                String type = record.getType();
+                Map<String, Object> attributes = record.getAttributes();
+                UUID id = record.getId();
+                if ("org.apache.qpid.server.model.VirtualHost".equals(type))
                 {
-                    @SuppressWarnings("unchecked")
-                    Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
-                    return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
+                    record = upgradeRootRecord(record);
                 }
-
-                @Override
-                public void complete()
+                else if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record))
                 {
-                    for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _records.entrySet())
-                    {
-                        ConfiguredObjectRecord record = entry.getValue();
-                        String type = record.getType();
-                        Map<String, Object> attributes = record.getAttributes();
-                        UUID id = record.getId();
-                        if ("org.apache.qpid.server.model.VirtualHost".equals(type))
-                        {
-                            record = upgradeRootRecord(record);
-                        }
-                        else if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record))
-                        {
-                            attributes = new LinkedHashMap<String, Object>(attributes);
-                            removeSelectorArguments(attributes);
-
-                            record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents());
-                            getUpdateMap().put(id, record);
-                            entry.setValue(record);
+                    attributes = new LinkedHashMap<String, Object>(attributes);
+                    removeSelectorArguments(attributes);
 
-                        }
-                        getNextUpgrader().configuredObject(record);
-                    }
+                    record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents());
+                    getUpdateMap().put(id, record);
+                    entry.setValue(record);
 
-                    getNextUpgrader().complete();
                 }
-            };
+                getNextUpgrader().configuredObject(record);
+            }
+
+            getNextUpgrader().complete();
         }
 
     }
@@ -196,76 +175,68 @@ public class VirtualHostStoreUpgraderAnd
      * Change the type string from org.apache.qpid.server.model.Foo to Foo (in line with the practice in the broker
      * configuration store).  Also remove bindings which reference nonexistent queues or exchanges.
      */
-    private class UpgraderFactory_0_1 extends UpgraderPhaseFactory
+    private class Upgrader_0_1_to_0_2 extends StoreUpgraderPhase
     {
-        protected UpgraderFactory_0_1()
+        public Upgrader_0_1_to_0_2()
         {
-            super("0.1", "0.2");
+            super("modelVersion", "0.1", "0.2");
         }
 
         @Override
-        public StoreUpgraderPhase newInstance()
+        public void configuredObject(final ConfiguredObjectRecord record)
         {
-            return new StoreUpgraderPhase("modelVersion", getToVersion())
+            String type = record.getType().substring(1 + record.getType().lastIndexOf('.'));
+            ConfiguredObjectRecord newRecord = new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents());
+            getUpdateMap().put(record.getId(), newRecord);
+
+            if ("VirtualHost".equals(type))
             {
+                newRecord = upgradeRootRecord(newRecord);
+            }
+        }
 
-                @Override
-                public void configuredObject(final ConfiguredObjectRecord record)
+        @Override
+        public void complete()
+        {
+            for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();)
+            {
+                Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next();
+                final ConfiguredObjectRecord record = entry.getValue();
+                final ConfiguredObjectRecord exchangeParent = record.getParents().get(Exchange.class.getSimpleName());
+                final ConfiguredObjectRecord queueParent = record.getParents().get(Queue.class.getSimpleName());
+                if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent.getId())
+                                                   || queueParent == null || unknownQueue(queueParent.getId())))
                 {
-                    String type = record.getType().substring(1 + record.getType().lastIndexOf('.'));
-                    ConfiguredObjectRecord newRecord = new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents());
-                    getUpdateMap().put(record.getId(), newRecord);
-
-                    if ("VirtualHost".equals(type))
-                    {
-                        newRecord = upgradeRootRecord(newRecord);
-                    }
+                    getDeleteMap().put(entry.getKey(), entry.getValue());
+                    iterator.remove();
                 }
-
-                @Override
-                public void complete()
+                else
                 {
-                    for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();)
-                    {
-                        Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next();
-                        final ConfiguredObjectRecord record = entry.getValue();
-                        final ConfiguredObjectRecord exchangeParent = record.getParents().get(Exchange.class.getSimpleName());
-                        final ConfiguredObjectRecord queueParent = record.getParents().get(Queue.class.getSimpleName());
-                        if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent.getId())
-                                                           || queueParent == null || unknownQueue(queueParent.getId())))
-                        {
-                            getDeleteMap().put(entry.getKey(), entry.getValue());
-                            iterator.remove();
-                        }
-                        else
-                        {
-                            getNextUpgrader().configuredObject(record);
-                        }
-                    }
-                    getNextUpgrader().complete();
+                    getNextUpgrader().configuredObject(record);
                 }
+            }
+            getNextUpgrader().complete();
+        }
 
-                private boolean unknownExchange(final UUID exchangeId)
-                {
-                    if (_defaultExchangeIds.containsValue(exchangeId))
-                    {
-                        return false;
-                    }
-                    ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId);
-                    return !(localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName()));
-                }
+        private boolean unknownExchange(final UUID exchangeId)
+        {
+            if (_defaultExchangeIds.containsValue(exchangeId))
+            {
+                return false;
+            }
+            ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId);
+            return !(localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName()));
+        }
 
-                private boolean unknownQueue(final UUID queueId)
-                {
-                    ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId);
-                    return !(localRecord != null  && localRecord.getType().equals(Queue.class.getSimpleName()));
-                }
+        private boolean unknownQueue(final UUID queueId)
+        {
+            ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId);
+            return !(localRecord != null  && localRecord.getType().equals(Queue.class.getSimpleName()));
+        }
 
-                private boolean isBinding(final String type)
-                {
-                    return Binding.class.getSimpleName().equals(type);
-                }
-            };
+        private boolean isBinding(final String type)
+        {
+            return Binding.class.getSimpleName().equals(type);
         }
     }
 
@@ -274,52 +245,46 @@ public class VirtualHostStoreUpgraderAnd
      * Convert the storage of queue attributes to remove the separate "ARGUMENT" attribute, and flatten the
      * attributes into the map using the model attribute names rather than the wire attribute names
      */
-    private class UpgraderFactory_0_2 extends UpgraderPhaseFactory
+    private class Upgrader_0_2_to_0_3 extends StoreUpgraderPhase
     {
-        protected UpgraderFactory_0_2()
+        private static final String ARGUMENTS = "arguments";
+
+        public Upgrader_0_2_to_0_3()
         {
-            super("0.2", "0.3");
+            super("modelVersion", "0.2", "0.3");
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        public StoreUpgraderPhase newInstance()
+        public void configuredObject(ConfiguredObjectRecord record)
         {
-            return new StoreUpgraderPhase("modelVersion", getToVersion())
+            if("VirtualHost".equals(record.getType()))
             {
-                private static final String ARGUMENTS = "arguments";
-
-                @SuppressWarnings("unchecked")
-                @Override
-                public void configuredObject(ConfiguredObjectRecord record)
+                record = upgradeRootRecord(record);
+            }
+            else if("Queue".equals(record.getType()))
+            {
+                Map<String, Object> newAttributes = new LinkedHashMap<String, Object>();
+                if(record.getAttributes().get(ARGUMENTS) instanceof Map)
                 {
-                    if("VirtualHost".equals(record.getType()))
-                    {
-                        record = upgradeRootRecord(record);
-                    }
-                    else if("Queue".equals(record.getType()))
-                    {
-                        Map<String, Object> newAttributes = new LinkedHashMap<String, Object>();
-                        if(record.getAttributes().get(ARGUMENTS) instanceof Map)
-                        {
-                            newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) record.getAttributes()
-                                    .get(ARGUMENTS)));
-                        }
-                        newAttributes.putAll(record.getAttributes());
+                    newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) record.getAttributes()
+                            .get(ARGUMENTS)));
+                }
+                newAttributes.putAll(record.getAttributes());
 
-                        record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents());
-                        getUpdateMap().put(record.getId(), record);
-                    }
+                record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents());
+                getUpdateMap().put(record.getId(), record);
+            }
 
-                    getNextUpgrader().configuredObject(record);
-                }
+            getNextUpgrader().configuredObject(record);
+        }
 
-                @Override
-                public void complete()
-                {
-                    getNextUpgrader().complete();
-                }
-            };
+        @Override
+        public void complete()
+        {
+            getNextUpgrader().complete();
         }
+
     }
 
     /*
@@ -327,387 +292,125 @@ public class VirtualHostStoreUpgraderAnd
      * where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER"
      * ensure OWNER is null unless the exclusivity policy is CONTAINER
      */
-    private class UpgraderFactory_0_3 extends UpgraderPhaseFactory
+    private class Upgrader_0_3_to_0_4 extends StoreUpgraderPhase
     {
-        protected UpgraderFactory_0_3()
+        private static final String EXCLUSIVE = "exclusive";
+
+        public Upgrader_0_3_to_0_4()
         {
-            super("0.3", "0.4");
+            super("modelVersion", "0.3", "0.4");
         }
 
+
         @Override
-        public StoreUpgraderPhase newInstance()
+        public void configuredObject(ConfiguredObjectRecord record)
         {
-            return new StoreUpgraderPhase("modelVersion", getToVersion())
+            if("VirtualHost".equals(record.getType()))
             {
-                private static final String EXCLUSIVE = "exclusive";
-
-                @Override
-                public void configuredObject(ConfiguredObjectRecord record)
+                record = upgradeRootRecord(record);
+            }
+            else if(Queue.class.getSimpleName().equals(record.getType()))
+            {
+                Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(record.getAttributes());
+                if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean)
                 {
-                    if("VirtualHost".equals(record.getType()))
+                    boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE);
+                    newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE");
+                    if(!isExclusive && record.getAttributes().containsKey("owner"))
                     {
-                        record = upgradeRootRecord(record);
+                        newAttributes.remove("owner");
                     }
-                    else if(Queue.class.getSimpleName().equals(record.getType()))
-                    {
-                        Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(record.getAttributes());
-                        if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean)
-                        {
-                            boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE);
-                            newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE");
-                            if(!isExclusive && record.getAttributes().containsKey("owner"))
-                            {
-                                newAttributes.remove("owner");
-                            }
-                        }
-                        else
-                        {
-                            newAttributes.remove("owner");
-                        }
-                        if(!record.getAttributes().containsKey("durable"))
-                        {
-                            newAttributes.put("durable","true");
-                        }
-
-                        record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents());
-                        getUpdateMap().put(record.getId(), record);
-                    }
-
-                    getNextUpgrader().configuredObject(record);
                 }
-
-                @Override
-                public void complete()
+                else
                 {
-                    getNextUpgrader().complete();
+                    newAttributes.remove("owner");
                 }
-            };
-        }
-    }
-
-    private class UpgraderFactory_0_4 extends UpgraderPhaseFactory
-    {
-        protected UpgraderFactory_0_4()
-        {
-            super("0.4", "2.0");
-        }
-
-        @Override
-        public StoreUpgraderPhase newInstance()
-        {
-            return new StoreUpgraderPhase("modelVersion", getToVersion())
-            {
-                private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES);
-                private static final String EXCHANGE_NAME = "name";
-                private static final String EXCHANGE_TYPE = "type";
-                private static final String EXCHANGE_DURABLE = "durable";
-                private ConfiguredObjectRecord _virtualHostRecord;
-
-                @Override
-                public void configuredObject(ConfiguredObjectRecord record)
+                if(!record.getAttributes().containsKey("durable"))
                 {
-                    if("VirtualHost".equals(record.getType()))
-                    {
-                        record = upgradeRootRecord(record);
-                        Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(record.getAttributes());
-                        virtualHostAttributes.put("name", _virtualHostNode.getName());
-                        virtualHostAttributes.put("modelVersion", getToVersion());
-                        record = new ConfiguredObjectRecordImpl(record.getId(), "VirtualHost", virtualHostAttributes, Collections.<String, ConfiguredObjectRecord>emptyMap());
-                        _virtualHostRecord = record;
-                    }
-                    else if("Exchange".equals(record.getType()))
-                    {
-                        Map<String, Object> attributes = record.getAttributes();
-                        String name = (String)attributes.get(EXCHANGE_NAME);
-                        _missingAmqpExchanges.remove(name);
-                    }
-                    getNextUpgrader().configuredObject(record);
+                    newAttributes.put("durable","true");
                 }
 
-                @Override
-                public void complete()
-                {
-                    for (Entry<String, String> entry : _missingAmqpExchanges.entrySet())
-                    {
-                        String name = entry.getKey();
-                        String type = entry.getValue();
-                        UUID id = _defaultExchangeIds.get(name);
-
-                        Map<String, Object> attributes = new HashMap<String, Object>();
-                        attributes.put(EXCHANGE_NAME, name);
-                        attributes.put(EXCHANGE_TYPE, type);
-                        attributes.put(EXCHANGE_DURABLE, true);
-
-                        ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord));
-                        getUpdateMap().put(id, record);
-
-                        getNextUpgrader().configuredObject(record);
-
-                    }
+                record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents());
+                getUpdateMap().put(record.getId(), record);
+            }
 
-                    getNextUpgrader().complete();
-                }
-            };
+            getNextUpgrader().configuredObject(record);
         }
 
-    }
+        @Override
+        public void complete()
+        {
+            getNextUpgrader().complete();
+        }
 
-    public void perform(DurableConfigurationStore durableConfigurationStore)
-    {
-        UpgradeAndRecoveryHandler vhrh = new UpgradeAndRecoveryHandler(_virtualHostNode, _objectFactory, durableConfigurationStore, _upgraders);
-        durableConfigurationStore.visitConfiguredObjectRecords(vhrh);
     }
 
-    //TODO: generalize this class
-    private static class  UpgradeAndRecoveryHandler implements ConfiguredObjectRecordHandler
+    private class Upgrader_0_4_to_0_5 extends StoreUpgraderPhase
     {
-        private static Logger LOGGER = Logger.getLogger(UpgradeAndRecoveryHandler.class);
-
-        private final Map<UUID, ConfiguredObjectRecord> _records = new LinkedHashMap<UUID, ConfiguredObjectRecord>();
-        private Map<String, UpgraderPhaseFactory> _upgraders;
-
-        private final VirtualHostNode<?> _parent;
-        private final ConfiguredObjectFactory _configuredObjectFactory;
-        private final DurableConfigurationStore _store;
-
-        public UpgradeAndRecoveryHandler(VirtualHostNode<?> parent, ConfiguredObjectFactory configuredObjectFactory, DurableConfigurationStore durableConfigurationStore, Map<String, UpgraderPhaseFactory> upgraders)
-        {
-            super();
-            _parent = parent;
-            _configuredObjectFactory = configuredObjectFactory;
-            _upgraders = upgraders;
-            _store = durableConfigurationStore;
-        }
+        private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES);
+        private static final String EXCHANGE_NAME = "name";
+        private static final String EXCHANGE_TYPE = "type";
+        private static final String EXCHANGE_DURABLE = "durable";
+        private ConfiguredObjectRecord _virtualHostRecord;
 
-        @Override
-        public void begin()
+        public Upgrader_0_4_to_0_5()
         {
+            super("modelVersion", "0.4", "2.0");
         }
 
         @Override
-        public boolean handle(final ConfiguredObjectRecord record)
-        {
-            _records.put(record.getId(), record);
-            return true;
-        }
-
-        @Override
-        public void end()
+        public void configuredObject(ConfiguredObjectRecord record)
         {
-            String version = getCurrentVersion();
-
-            if (LOGGER.isInfoEnabled())
-            {
-                LOGGER.info("Store has model version " + version + ". Number of record(s) " + _records.size());
-            }
-
-            DurableConfigurationStoreUpgrader upgrader = buildUpgraderChain(version);
-
-            for(ConfiguredObjectRecord record : _records.values())
-            {
-                upgrader.configuredObject(record);
-            }
-
-            upgrader.complete();
-
-            Map<UUID, ConfiguredObjectRecord> deletedRecords = upgrader.getDeletedRecords();
-            Map<UUID, ConfiguredObjectRecord> updatedRecords = upgrader.getUpdatedRecords();
-
-            if (LOGGER.isDebugEnabled())
+            if("VirtualHost".equals(record.getType()))
             {
-                LOGGER.debug("VirtualHost store upgrade: " + deletedRecords.size() + " record(s) deleted");
-                LOGGER.debug("VirtualHost store upgrade: " + updatedRecords.size() + " record(s) updated");
-                LOGGER.debug("VirtualHost store upgrade: " + _records.size() + " total record(s)");
+                record = upgradeRootRecord(record);
+                Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(record.getAttributes());
+                virtualHostAttributes.put("name", _virtualHostNode.getName());
+                virtualHostAttributes.put("modelVersion", getToVersion());
+                record = new ConfiguredObjectRecordImpl(record.getId(), "VirtualHost", virtualHostAttributes, Collections.<String, ConfiguredObjectRecord>emptyMap());
+                _virtualHostRecord = record;
             }
-
-            _store.update(true, updatedRecords.values().toArray(new ConfiguredObjectRecord[updatedRecords.size()]));
-            _store.remove(deletedRecords.values().toArray(new ConfiguredObjectRecord[deletedRecords.size()]));
-
-            _records.keySet().removeAll(deletedRecords.keySet());
-            _records.putAll(updatedRecords);
-
-            ConfiguredObjectRecord virtualHostRecord = null;
-            for (ConfiguredObjectRecord record : _records.values())
+            else if("Exchange".equals(record.getType()))
             {
-                LOGGER.debug("Found type " +  record.getType());
-                if ("VirtualHost".equals(record.getType()))
-                {
-                    virtualHostRecord = record;
-                    break;
-                }
-            }
-
-            if (virtualHostRecord != null)
-            {
-                String parentCategory = _parent.getCategoryClass().getSimpleName();
-                ConfiguredObjectRecord parentRecord = new ConfiguredObjectRecordImpl(_parent.getId(), parentCategory, Collections.<String, Object>emptyMap());
-                Map<String, ConfiguredObjectRecord> rootParents = Collections.<String, ConfiguredObjectRecord>singletonMap(parentCategory, parentRecord);
-                _records.put(virtualHostRecord.getId(), new ConfiguredObjectRecordImpl(virtualHostRecord.getId(), VirtualHost.class.getSimpleName(), virtualHostRecord.getAttributes(), rootParents));
-                Collection<ConfiguredObjectRecord> records = _records.values();
-                resolveObjects(_configuredObjectFactory, _parent, records.toArray(new ConfiguredObjectRecord[records.size()]));
+                Map<String, Object> attributes = record.getAttributes();
+                String name = (String)attributes.get(EXCHANGE_NAME);
+                _missingAmqpExchanges.remove(name);
             }
+            getNextUpgrader().configuredObject(record);
         }
 
-        private DurableConfigurationStoreUpgrader buildUpgraderChain(String version)
-        {
-            DurableConfigurationStoreUpgrader head = null;
-            while(!BrokerModel.MODEL_VERSION.equals(version))
-            {
-                if (LOGGER.isDebugEnabled())
-                {
-                    LOGGER.debug("Adding virtual host store upgrader from model version: " + version);
-                }
-                final UpgraderPhaseFactory upgraderPhaseFactory = _upgraders.get(version);
-                StoreUpgraderPhase upgrader = upgraderPhaseFactory.newInstance();
-                if(head == null)
-                {
-                    head = upgrader;
-                }
-                else
-                {
-                    head.setNextUpgrader(upgrader);
-                }
-                version = upgraderPhaseFactory.getToVersion();
-            }
-
-            if(head == null)
-            {
-                head = new NullUpgrader();
-            }
-            else
-            {
-                head.setNextUpgrader(new NullUpgrader());
-            }
-
-            return head;
-        }
-
-        private String getCurrentVersion()
+        @Override
+        public void complete()
         {
-            for(ConfiguredObjectRecord record : _records.values())
+            for (Entry<String, String> entry : _missingAmqpExchanges.entrySet())
             {
-                if(record.getType().equals("VirtualHost"))
-                {
-                    return (String) record.getAttributes().get(VirtualHost.MODEL_VERSION);
-                }
-            }
-            return BrokerModel.MODEL_VERSION;
-        }
+                String name = entry.getKey();
+                String type = entry.getValue();
+                UUID id = _defaultExchangeIds.get(name);
 
+                Map<String, Object> attributes = new HashMap<String, Object>();
+                attributes.put(EXCHANGE_NAME, name);
+                attributes.put(EXCHANGE_TYPE, type);
+                attributes.put(EXCHANGE_DURABLE, true);
 
-        public void resolveObjects(ConfiguredObjectFactory factory, ConfiguredObject<?> root, ConfiguredObjectRecord... records)
-        {
-            Map<UUID, ConfiguredObject<?>> resolvedObjects = new HashMap<UUID, ConfiguredObject<?>>();
-            resolvedObjects.put(root.getId(), root);
-
-            Collection<ConfiguredObjectRecord> recordsWithUnresolvedParents = new ArrayList<ConfiguredObjectRecord>(Arrays.asList(records));
-            Collection<UnresolvedConfiguredObject<? extends ConfiguredObject>> recordsWithUnresolvedDependencies =
-                    new ArrayList<UnresolvedConfiguredObject<? extends ConfiguredObject>>();
+                ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord));
+                getUpdateMap().put(id, record);
 
-            boolean updatesMade;
+                getNextUpgrader().configuredObject(record);
 
-            do
-            {
-                updatesMade = false;
-                Iterator<ConfiguredObjectRecord> iter = recordsWithUnresolvedParents.iterator();
-                while (iter.hasNext())
-                {
-                    ConfiguredObjectRecord record = iter.next();
-                    Collection<ConfiguredObject<?>> parents = new ArrayList<ConfiguredObject<?>>();
-                    boolean foundParents = true;
-                    for (ConfiguredObjectRecord parent : record.getParents().values())
-                    {
-                        if (!resolvedObjects.containsKey(parent.getId()))
-                        {
-                            foundParents = false;
-                            break;
-                        }
-                        else
-                        {
-                            parents.add(resolvedObjects.get(parent.getId()));
-                        }
-                    }
-                    if (foundParents)
-                    {
-                        iter.remove();
-                        UnresolvedConfiguredObject<? extends ConfiguredObject> recovered =
-                                factory.recover(record, parents.toArray(new ConfiguredObject<?>[parents.size()]));
-                        Collection<ConfiguredObjectDependency<?>> dependencies =
-                                recovered.getUnresolvedDependencies();
-                        if (dependencies.isEmpty())
-                        {
-                            updatesMade = true;
-                            ConfiguredObject<?> resolved = recovered.resolve();
-                            resolvedObjects.put(resolved.getId(), resolved);
-                        }
-                        else
-                        {
-                            recordsWithUnresolvedDependencies.add(recovered);
-                        }
-                    }
-
-                }
-
-                Iterator<UnresolvedConfiguredObject<? extends ConfiguredObject>> unresolvedIter =
-                        recordsWithUnresolvedDependencies.iterator();
-
-                while(unresolvedIter.hasNext())
-                {
-                    UnresolvedConfiguredObject<? extends ConfiguredObject> unresolvedObject = unresolvedIter.next();
-                    Collection<ConfiguredObjectDependency<?>> dependencies =
-                            new ArrayList<ConfiguredObjectDependency<?>>(unresolvedObject.getUnresolvedDependencies());
+            }
 
-                    for(ConfiguredObjectDependency dependency : dependencies)
-                    {
-                        if(dependency instanceof ConfiguredObjectIdDependency)
-                        {
-                            UUID id = ((ConfiguredObjectIdDependency)dependency).getId();
-                            if(resolvedObjects.containsKey(id))
-                            {
-                                dependency.resolve(resolvedObjects.get(id));
-                            }
-                        }
-                        else if(dependency instanceof ConfiguredObjectNameDependency)
-                        {
-                            ConfiguredObject<?> dependentObject = null;
-                            for(ConfiguredObject<?> parent : unresolvedObject.getParents())
-                            {
-                                dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(), ((ConfiguredObjectNameDependency)dependency).getName());
-                                if(dependentObject != null)
-                                {
-                                    break;
-                                }
-                            }
-                            if(dependentObject != null)
-                            {
-                                dependency.resolve(dependentObject);
-                            }
-                        }
-                        else
-                        {
-                            throw new ServerScopedRuntimeException("Unknown dependency type " + dependency.getClass().getSimpleName());
-                        }
-                    }
-                    if(unresolvedObject.getUnresolvedDependencies().isEmpty())
-                    {
-                        updatesMade = true;
-                        unresolvedIter.remove();
-                        ConfiguredObject<?> resolved = unresolvedObject.resolve();
-                        resolvedObjects.put(resolved.getId(), resolved);
-                    }
-                }
+            getNextUpgrader().complete();
+        }
 
-            } while(updatesMade && !(recordsWithUnresolvedDependencies.isEmpty() && recordsWithUnresolvedParents.isEmpty()));
+    }
 
-            if(!recordsWithUnresolvedDependencies.isEmpty())
-            {
-                throw new IllegalArgumentException("Cannot resolve some objects: " + recordsWithUnresolvedDependencies);
-            }
-            if(!recordsWithUnresolvedParents.isEmpty())
-            {
-                throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found" + recordsWithUnresolvedParents);
-            }
-        }
+    public void perform(DurableConfigurationStore durableConfigurationStore)
+    {
+        String virtualHostCategory = VirtualHost.class.getSimpleName();
+        GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders);
+        upgraderHandler.upgrade();
 
+        new GenericRecoverer(_virtualHostNode, virtualHostCategory).recover(upgraderHandler.getRecords());
     }
 }

Copied: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java (from r1588966, qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java?p2=qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java&p1=qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java&r1=1588966&r2=1589112&rev=1589112&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java Tue Apr 22 12:32:23 2014
@@ -18,11 +18,12 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.configuration.startup;
+package org.apache.qpid.server.store;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -31,9 +32,7 @@ import java.util.UUID;
 import junit.framework.TestCase;
 
 import org.apache.qpid.server.BrokerOptions;
-import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.configuration.RecovererProvider;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogRecorder;
@@ -49,6 +48,7 @@ import org.apache.qpid.server.model.Syst
 import org.apache.qpid.server.model.SystemContextImpl;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
+import org.apache.qpid.server.store.GenericRecoverer;
 import org.apache.qpid.server.store.UnresolvedConfiguredObject;
 
 public class BrokerRecovererTest extends TestCase
@@ -56,9 +56,9 @@ public class BrokerRecovererTest extends
     private ConfiguredObjectRecord _brokerEntry = mock(ConfiguredObjectRecord.class);
 
     private UUID _brokerId = UUID.randomUUID();
-    private AuthenticationProvider _authenticationProvider1;
+    private AuthenticationProvider<?> _authenticationProvider1;
     private UUID _authenticationProvider1Id = UUID.randomUUID();
-    private SystemContext _systemContext;
+    private SystemContext<?> _systemContext;
     private ConfiguredObjectFactory _configuredObjectFactory;
     private TaskExecutor _taskExecutor;
 
@@ -91,8 +91,14 @@ public class BrokerRecovererTest extends
     @Override
     protected void tearDown() throws Exception
     {
-        super.tearDown();
-        _taskExecutor.stop();
+        try
+        {
+            super.tearDown();
+        }
+        finally
+        {
+            _taskExecutor.stop();
+        }
     }
 
     public void testCreateBrokerAttributes()
@@ -115,8 +121,8 @@ public class BrokerRecovererTest extends
 
         when(_brokerEntry.getAttributes()).thenReturn(entryAttributes);
 
-        _systemContext.resolveObjects(_brokerEntry);
-        Broker broker = _systemContext.getBroker();
+        resolveObjects(_brokerEntry);
+        Broker<?> broker = _systemContext.getBroker();
 
         assertNotNull(broker);
 
@@ -171,7 +177,7 @@ public class BrokerRecovererTest extends
         UUID authProviderId = UUID.randomUUID();
         UUID portId = UUID.randomUUID();
 
-        _systemContext.resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"), createPortRecord(
+        resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"), createPortRecord(
                 portId,
                 5672,
                 "authProvider"));
@@ -188,7 +194,7 @@ public class BrokerRecovererTest extends
     {
         UUID authProviderId = UUID.randomUUID();
 
-        _systemContext.resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"));
+        resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"));
         Broker<?> broker = _systemContext.getBroker();
 
 
@@ -206,7 +212,7 @@ public class BrokerRecovererTest extends
         UUID authProvider2Id = UUID.randomUUID();
         UUID port2Id = UUID.randomUUID();
 
-        _systemContext.resolveObjects(_brokerEntry,
+        resolveObjects(_brokerEntry,
                                       createAuthProviderRecord(authProviderId, "authProvider"),
                                       createPortRecord(portId, 5672, "authProvider"),
                                       createAuthProviderRecord(authProvider2Id, "authProvider2"),
@@ -228,7 +234,7 @@ public class BrokerRecovererTest extends
 
         UUID authProviderId = UUID.randomUUID();
 
-        _systemContext.resolveObjects(_brokerEntry, createGroupProviderRecord(authProviderId, "groupProvider"));
+        resolveObjects(_brokerEntry, createGroupProviderRecord(authProviderId, "groupProvider"));
         Broker<?> broker = _systemContext.getBroker();
 
 
@@ -253,7 +259,7 @@ public class BrokerRecovererTest extends
 
             try
             {
-                _systemContext.resolveObjects(_brokerEntry);
+                resolveObjects(_brokerEntry);
                 Broker<?> broker = _systemContext.getBroker();
                 broker.open();
                 fail("The broker creation should fail due to unsupported model version");
@@ -323,33 +329,10 @@ public class BrokerRecovererTest extends
         return String.valueOf(attributeValue);
     }
 
-    private  RecovererProvider createRecoveryProvider(final ConfiguredObjectRecord[] entries, final ConfiguredObject[] objectsToRecoverer)
+    private void resolveObjects(ConfiguredObjectRecord... records)
     {
-        RecovererProvider recovererProvider = new RecovererProvider()
-        {
-            @Override
-            public ConfiguredObjectRecoverer<? extends ConfiguredObject> getRecoverer(String type)
-            {
-                @SuppressWarnings({ "unchecked", "rawtypes" })
-                final ConfiguredObjectRecoverer<?  extends ConfiguredObject> recoverer = new ConfiguredObjectRecoverer()
-                {
-                    public ConfiguredObject create(RecovererProvider recovererProvider, ConfiguredObjectRecord entry, ConfiguredObject... parents)
-                    {
-                        for (int i = 0; i < entries.length; i++)
-                        {
-                            ConfiguredObjectRecord e = entries[i];
-                            if (entry == e)
-                            {
-                                return objectsToRecoverer[i];
-                            }
-                        }
-                        return null;
-                    }
-                };
-
-                return recoverer;
-            }
-        };
-        return recovererProvider;
+        GenericRecoverer recoverer = new GenericRecoverer(_systemContext, Broker.class.getSimpleName());
+        recoverer.recover(Arrays.asList(records));
     }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org