You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/08/27 09:31:40 UTC

qpid-broker-j git commit: QPID-8229: [Java Broker] Ensure changes made to existing durable children of virtualhost after virtualhost restart are persisted

Repository: qpid-broker-j
Updated Branches:
  refs/heads/6.1.x 0b3cefaee -> ca3dd59c3


QPID-8229: [Java Broker] Ensure changes made to existing durable children of virtualhost after virtualhost restart are persisted

(cherry picked from commit d27237d87672dff846603e5d04a849116ee47fac)

# Conflicts:
#	broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
#	broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ca3dd59c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ca3dd59c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ca3dd59c

Branch: refs/heads/6.1.x
Commit: ca3dd59c30112e048636380db54196143028e22a
Parents: 0b3cefa
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Aug 27 07:21:59 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Aug 27 07:21:59 2018 +0100

----------------------------------------------------------------------
 .../VirtualHostStoreUpgraderAndRecoverer.java   | 113 +++++++++++--------
 .../server/virtualhost/AbstractVirtualHost.java |  22 +---
 .../qpid/server/model/VirtualHostTest.java      |  51 +++++++++
 3 files changed, 120 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ca3dd59c/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
index d5cd49d..07d3869 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
@@ -591,12 +591,23 @@ public class VirtualHostStoreUpgraderAndRecoverer
         upgraderHandler.upgrade(records);
 
         List<ConfiguredObjectRecord> upgradedRecords = upgraderHandler.getRecords();
-        recover(durableConfigurationStore, upgradedRecords, isNew);
+        recover(_virtualHostNode, durableConfigurationStore, upgradedRecords, isNew);
         return isNew;
     }
 
     public void reloadAndRecover(final DurableConfigurationStore durableConfigurationStore)
     {
+        reloadAndRecoverInternal(_virtualHostNode, durableConfigurationStore);
+    }
+
+    public void reloadAndRecoverVirtualHost(final DurableConfigurationStore durableConfigurationStore)
+    {
+        reloadAndRecoverInternal(_virtualHostNode.getVirtualHost(), durableConfigurationStore);
+    }
+
+    private void reloadAndRecoverInternal(final ConfiguredObject<?> recoveryRoot,
+                                          final DurableConfigurationStore durableConfigurationStore)
+    {
         final List<ConfiguredObjectRecord> records = new ArrayList<>();
         durableConfigurationStore.reload(new ConfiguredObjectRecordHandler()
         {
@@ -606,13 +617,15 @@ public class VirtualHostStoreUpgraderAndRecoverer
                 records.add(record);
             }
         });
-        recover(durableConfigurationStore, records, false);
+        recover(recoveryRoot, durableConfigurationStore, records, false);
     }
 
-    private void recover(final DurableConfigurationStore durableConfigurationStore,
-                         final List<ConfiguredObjectRecord> records, final boolean isNew)
+    private void recover(final ConfiguredObject<?> recoveryRoot,
+                         final DurableConfigurationStore durableConfigurationStore,
+                         final List<ConfiguredObjectRecord> records,
+                         final boolean isNew)
     {
-        new GenericRecoverer(_virtualHostNode).recover(records, isNew);
+        new GenericRecoverer(recoveryRoot).recover(records, isNew);
 
         final StoreConfigurationChangeListener
                 configChangeListener = new StoreConfigurationChangeListener(durableConfigurationStore);
@@ -627,70 +640,74 @@ public class VirtualHostStoreUpgraderAndRecoverer
                 }
             });
         }
-        _virtualHostNode.addChangeListener(new ConfigurationChangeListener()
+
+        if (recoveryRoot instanceof VirtualHostNode)
         {
-            @Override
-            public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
+            _virtualHostNode.addChangeListener(new ConfigurationChangeListener()
             {
+                @Override
+                public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
+                {
 
-            }
+                }
 
-            @Override
-            public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
-            {
-                if(child instanceof VirtualHost)
+                @Override
+                public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
                 {
-                    applyRecursively(child, new Action<ConfiguredObject<?>>()
+                    if (child instanceof VirtualHost)
                     {
-                        @Override
-                        public void performAction(final ConfiguredObject<?> object)
+                        applyRecursively(child, new Action<ConfiguredObject<?>>()
                         {
-                            if(object.isDurable())
+                            @Override
+                            public void performAction(final ConfiguredObject<?> object)
                             {
-                                durableConfigurationStore.update(true, object.asObjectRecord());
-                                object.addChangeListener(configChangeListener);
+                                if (object.isDurable())
+                                {
+                                    durableConfigurationStore.update(true, object.asObjectRecord());
+                                    object.addChangeListener(configChangeListener);
+                                }
                             }
-                        }
-                    });
+                        });
 
+                    }
                 }
-            }
 
-            @Override
-            public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
-            {
-                if(child instanceof VirtualHost)
+                @Override
+                public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
                 {
-                    child.removeChangeListener(configChangeListener);
+                    if (child instanceof VirtualHost)
+                    {
+                        child.removeChangeListener(configChangeListener);
+                    }
                 }
-            }
 
-            @Override
-            public void attributeSet(final ConfiguredObject<?> object,
-                                     final String attributeName,
-                                     final Object oldAttributeValue,
-                                     final Object newAttributeValue)
-            {
+                @Override
+                public void attributeSet(final ConfiguredObject<?> object,
+                                         final String attributeName,
+                                         final Object oldAttributeValue,
+                                         final Object newAttributeValue)
+                {
 
-            }
+                }
 
-            @Override
-            public void bulkChangeStart(final ConfiguredObject<?> object)
-            {
+                @Override
+                public void bulkChangeStart(final ConfiguredObject<?> object)
+                {
 
-            }
+                }
 
-            @Override
-            public void bulkChangeEnd(final ConfiguredObject<?> object)
-            {
+                @Override
+                public void bulkChangeEnd(final ConfiguredObject<?> object)
+                {
 
-            }
-        });
-        if(isNew)
-        {
-            if(_virtualHostNode instanceof AbstractConfiguredObject)
+                }
+            });
+            if (isNew)
             {
-                ((AbstractConfiguredObject)_virtualHostNode).forceUpdateAllSecureAttributes();
+                if (_virtualHostNode instanceof AbstractConfiguredObject)
+                {
+                    ((AbstractConfiguredObject) _virtualHostNode).forceUpdateAllSecureAttributes();
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ca3dd59c/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index c005863..355d3b2 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -122,6 +122,7 @@ import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreProvider;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer;
 import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
 import org.apache.qpid.server.store.handler.MessageHandler;
@@ -2619,24 +2620,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     {
         resetStatistics();
 
-
-        final List<ConfiguredObjectRecord> records = new ArrayList<>();
-
-        // Transitioning to STOPPED will have closed all our children.  Now we are transition
-        // back to ACTIVE, we need to recover and re-open them.
-
-        getDurableConfigurationStore().reload(new ConfiguredObjectRecordHandler()
-        {
-
-            @Override
-            public void handle(final ConfiguredObjectRecord record)
-            {
-                records.add(record);
-            }
-
-        });
-
-        new GenericRecoverer(this).recover(records, false);
+        final VirtualHostStoreUpgraderAndRecoverer virtualHostStoreUpgraderAndRecoverer =
+                new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
+        virtualHostStoreUpgraderAndRecoverer.reloadAndRecoverVirtualHost(getDurableConfigurationStore());
 
         final List<ListenableFuture<Void>> childOpenFutures = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ca3dd59c/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
index 3760146..e9ceecf 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
@@ -34,6 +34,7 @@ import static org.mockito.Mockito.when;
 import java.security.AccessControlException;
 import java.security.Principal;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -248,6 +249,55 @@ public class VirtualHostTest extends QpidTestCase
     }
 
 
+    public void testModifyDurableChildAfterRestartingVirtualHost()
+    {
+        String virtualHostName = getName();
+
+        VirtualHost<?> virtualHost = createVirtualHost(virtualHostName);
+        final ConfiguredObjectRecord virtualHostCor = virtualHost.asObjectRecord();
+
+        // Give virtualhost a queue and an exchange
+        Queue queue = virtualHost.createChild(Queue.class, Collections.<String, Object>singletonMap(Queue.NAME, "myQueue"));
+        final ConfiguredObjectRecord queueCor = queue.asObjectRecord();
+
+        final List<ConfiguredObjectRecord> allObjects = new ArrayList<>();
+        allObjects.add(virtualHostCor);
+        allObjects.add(queueCor);
+
+        virtualHost.stop();
+        assertEquals("Unexpected state", State.STOPPED, virtualHost.getState());
+
+        // Setup an answer that will return the configured object records
+        doAnswer(new Answer()
+        {
+            final Iterator<ConfiguredObjectRecord> corIterator = allObjects.iterator();
+
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable
+            {
+                ConfiguredObjectRecordHandler handler = (ConfiguredObjectRecordHandler) invocation.getArguments()[0];
+                while (corIterator.hasNext())
+                {
+                    handler.handle(corIterator.next());
+                }
+
+                return null;
+            }
+        }).when(_configStore).reload(any(ConfiguredObjectRecordHandler.class));
+
+        virtualHost.start();
+        assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
+        final Collection<Queue> queues = virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after restart", 1, queues.size());
+
+        final Queue recoveredQueue = queues.iterator().next();
+        recoveredQueue.setAttributes(Collections.singletonMap(ConfiguredObject.DESCRIPTION, "testDescription"));
+        final ConfiguredObjectRecord recoveredQueueCor = queue.asObjectRecord();
+
+        verify(_configStore).update(eq(false), matchesRecord(recoveredQueueCor.getId(), recoveredQueueCor.getType()));
+    }
+
+
     public void testStopVirtualHost_ClosesConnections()
     {
         String virtualHostName = getName();
@@ -550,6 +600,7 @@ public class VirtualHostTest extends QpidTestCase
         // Fire the child added event on the node
         _storeConfigurationChangeListener.childAdded(_virtualHostNode,host);
         _virtualHost = host;
+        when(_virtualHostNode.getVirtualHost()).thenReturn(_virtualHost);
         return host;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org