You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/07/31 14:02:11 UTC

svn commit: r1614866 - in /qpid/trunk/qpid/java: bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/store/ broker-core/src/ma...

Author: kwall
Date: Thu Jul 31 12:02:10 2014
New Revision: 1614866

URL: http://svn.apache.org/r1614866
Log:
QPID-5926: [Java Broker] When transitioning from STOPPED to ACTIVE the virtualhost re-recovers children beneath it

Modified:
    qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
    qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes

Modified: qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java Thu Jul 31 12:02:10 2014
@@ -29,11 +29,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import javax.servlet.http.HttpServletResponse;
+
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.systest.rest.Asserts;
 import org.apache.qpid.systest.rest.QpidRestTestCase;
 import org.apache.qpid.test.utils.TestBrokerConfiguration;
 import org.apache.qpid.util.FileUtils;
@@ -99,7 +102,7 @@ public class BDBHAVirtualHostRestTest ex
         assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY));
 
         Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "NO_SYNC");
-        getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200);
+        getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK);
 
         hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
         assertEquals("Unexpected synchronization policy after change", "NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY));
@@ -111,9 +114,36 @@ public class BDBHAVirtualHostRestTest ex
         assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY));
 
         Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC");
-        getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200);
+        getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK);
 
         hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
         assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY));
     }
+
+    public void testMutateState() throws Exception
+    {
+        waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
+        assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE");
+
+        Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
+        getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
+
+        waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "STOPPED");
+        assertActualAndDesireStates(_virtualhostUrl, "STOPPED", "STOPPED");
+
+        newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
+        getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
+
+        waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
+        assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE");
+    }
+
+    private void assertActualAndDesireStates(final String restUrl,
+                                             final String expectedDesiredState,
+                                             final String expectedActualState) throws IOException
+    {
+        Map<String, Object> virtualhost = getRestTestHelper().getJsonAsSingletonList(restUrl);
+        Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, virtualhost);
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Thu Jul 31 12:02:10 2014
@@ -574,7 +574,7 @@ public abstract class AbstractConfigured
         }
     }
 
-    private void applyToChildren(Action<ConfiguredObject<?>> action)
+    protected void applyToChildren(Action<ConfiguredObject<?>> action)
     {
         for (Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
         {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java Thu Jul 31 12:02:10 2014
@@ -477,7 +477,7 @@ public class BrokerStoreUpgraderAndRecov
     public Broker<?> perform(final DurableConfigurationStore store)
     {
         List<ConfiguredObjectRecord> upgradedRecords = upgrade(store);
-        new GenericRecoverer(_systemConfig, Broker.class.getSimpleName()).recover(upgradedRecords);
+        new GenericRecoverer(_systemConfig).recover(upgradedRecords);
 
         final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(store);
         applyRecursively(_systemConfig.getBroker(), new Action<ConfiguredObject<?>>()

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java Thu Jul 31 12:02:10 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.store;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -40,18 +41,16 @@ public class GenericRecoverer
 {
     private static final Logger LOGGER = Logger.getLogger(GenericRecoverer.class);
 
-    private final ConfiguredObject<?> _parentOfRoot;
-    private final String _rootCategory;
+    private final ConfiguredObject<?> _root;
 
-    public GenericRecoverer(ConfiguredObject<?> parentOfRoot, String rootCategory)
+    public GenericRecoverer(ConfiguredObject<?> root)
     {
-        _parentOfRoot = parentOfRoot;
-        _rootCategory = rootCategory;
+        _root = root;
     }
 
     public void recover(final List<ConfiguredObjectRecord> records)
     {
-        _parentOfRoot.getTaskExecutor().run(new VoidTask()
+        _root.getTaskExecutor().run(new VoidTask()
         {
             @Override
             public void execute()
@@ -62,44 +61,70 @@ public class GenericRecoverer
             @Override
             public String toString()
             {
-                return _rootCategory + " recovery";
+                return  "RecoveringChildrenOf_" + _root.getCategoryClass().getSimpleName();
             }
         });
-
     }
 
     private void performRecover(List<ConfiguredObjectRecord> records)
     {
-        ConfiguredObjectRecord rootRecord = null;
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Recovering the children of " + _root);
+        }
+
+        records = resolveDiscontinuity(records);
+        resolveObjects(_root, records);
+    }
+
+    private List<ConfiguredObjectRecord> resolveDiscontinuity(final List<ConfiguredObjectRecord> records)
+    {
+        Collection<Class<? extends ConfiguredObject>> childTypesOfRoot = _root.getModel().getChildTypes(_root.getCategoryClass());
+        List<ConfiguredObjectRecord> newRecords = new ArrayList<>(records.size());
+
         for (ConfiguredObjectRecord record : records)
         {
-            if (_rootCategory.equals(record.getType()))
+            if (record.getId().equals(_root.getId()))
             {
-                rootRecord = record;
-                break;
+                // If the parent is already in the records, we skip it, this supports partial recovery
+                // (required when restarting a virtualhost).  In the long term, when the objects take responsibility
+                // for the recovery of immediate descendants only, this will disappear.
+            }
+            else if ((record.getParents() == null || record.getParents().size() == 0))
+            {
+                if (containsCategory(childTypesOfRoot, record.getType()))
+                {
+                    String parentOfRootCategory = _root.getCategoryClass().getSimpleName();
+                    Map<String, UUID> rootParents = Collections.singletonMap(parentOfRootCategory, _root.getId());
+                    newRecords.add(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes(), rootParents));
+                }
+                else
+                {
+                    throw new IllegalArgumentException("Recovered configured object record " + record
+                                                       + " has no recorded parents and is not a valid child type"
+                                                       + " [" + Arrays.toString(childTypesOfRoot.toArray()) + "]"
+                                                       + " for the root " + _root);
+                }
+            }
+            else
+            {
+                newRecords.add(record);
             }
         }
 
-        if (LOGGER.isDebugEnabled())
-        {
-            LOGGER.debug("Root record " + rootRecord);
-        }
+        return newRecords;
+    }
 
-        if (rootRecord != null)
+    private boolean containsCategory(Collection<Class<? extends ConfiguredObject>> childCategories, String categorySimpleName)
+    {
+        for (Class<? extends ConfiguredObject> child : childCategories)
         {
-
-            if (rootRecord.getParents() == null || rootRecord.getParents().isEmpty())
+            if (child.getSimpleName().equals(categorySimpleName))
             {
-                records = new ArrayList<ConfiguredObjectRecord>(records);
-
-                String parentOfRootCategory = _parentOfRoot.getCategoryClass().getSimpleName();
-                Map<String, UUID> rootParents = Collections.singletonMap(parentOfRootCategory, _parentOfRoot.getId());
-                records.remove(rootRecord);
-                records.add(new ConfiguredObjectRecordImpl(rootRecord.getId(), _rootCategory, rootRecord.getAttributes(), rootParents));
+                return true;
             }
-
-            resolveObjects(_parentOfRoot, records);
         }
+        return false;
     }
 
     private void resolveObjects(ConfiguredObject<?> parentObject, List<ConfiguredObjectRecord> records)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java Thu Jul 31 12:02:10 2014
@@ -406,6 +406,6 @@ public class VirtualHostStoreUpgraderAnd
         GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders);
         upgraderHandler.upgrade();
 
-        new GenericRecoverer(_virtualHostNode, virtualHostCategory).recover(upgraderHandler.getRecords());
+        new GenericRecoverer(_virtualHostNode).recover(upgraderHandler.getRecords());
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Thu Jul 31 12:02:10 2014
@@ -75,6 +75,7 @@ import org.apache.qpid.server.store.Conf
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.Event;
 import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.GenericRecoverer;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreProvider;
 import org.apache.qpid.server.store.StoreException;
@@ -82,6 +83,7 @@ import org.apache.qpid.server.store.hand
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.MapValueConverter;
 
@@ -341,7 +343,6 @@ public abstract class AbstractVirtualHos
     public Collection<Connection> getConnections()
     {
         return getChildren(Connection.class);
-
     }
 
     @Override
@@ -1317,7 +1318,7 @@ public abstract class AbstractVirtualHos
         getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes()));
     }
 
-    @StateTransition( currentState = { State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
+    @StateTransition( currentState = { State.UNINITIALIZED }, desiredState = State.ACTIVE )
     private void onActivate()
     {
         _houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount());
@@ -1339,9 +1340,6 @@ public abstract class AbstractVirtualHos
         }
 
         MessageStoreRecoverer messageStoreRecoverer;
-
-
-
         if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY))
         {
             messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
@@ -1364,7 +1362,58 @@ public abstract class AbstractVirtualHos
             _state.set(finalState);
             reportIfError(_state.get());
         }
+    }
+
+    @StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
+    private void onRestart()
+    {
+        resetStatistics();
+
+        final List<ConfiguredObjectRecord> records = new ArrayList<>();
+
+        // Transitioning to STOPPED will have closed all our children.  Now we are transition
+        // back to ACTIVE, we need to recover and re-open them.
+
+        getDurableConfigurationStore().visitConfiguredObjectRecords(new ConfiguredObjectRecordHandler()
+        {
+            @Override
+            public void begin()
+            {
+            }
+
+            @Override
+            public boolean handle(final ConfiguredObjectRecord record)
+            {
+                records.add(record);
+                return true;
+            }
+
+            @Override
+            public void end()
+            {
+            }
+        });
+
+        new GenericRecoverer(this).recover(records);
+
+        Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
+        {
+            @Override
+            public Object run()
+            {
+                applyToChildren(new Action<ConfiguredObject<?>>()
+                {
+                    @Override
+                    public void performAction(final ConfiguredObject<?> object)
+                    {
+                        object.open();
+                    }
+                });
+                return null;
+            }
+        });
 
+        onActivate();
     }
 
     private class StoreUpdatingChangeListener implements ConfigurationChangeListener

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Thu Jul 31 12:02:10 2014
@@ -20,34 +20,40 @@
  */
 package org.apache.qpid.server.model;
 
+import static java.util.Arrays.asList;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import java.security.AccessControlException;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
 
 import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
+import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.connection.IConnectionRegistry.RegistryChangeListener;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.access.Operation;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -157,6 +163,60 @@ public class VirtualHostTest extends Qpi
         verify(_configStore, times(2)).update(eq(false), matchesRecord(virtualHost.getId(), virtualHost.getType()));
     }
 
+    public void testRestartingVirtualHostRecoversChildren()
+    {
+        String virtualHostName = getName();
+
+        VirtualHost<?,?,?> virtualHost = createVirtualHost(virtualHostName);
+        assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
+        final ConfiguredObjectRecord virtualHostCor = virtualHost.asObjectRecord();
+
+        // Give virtualhost a queue and an exchange
+        Queue queue = virtualHost.createChild(Queue.class, Collections.<String, Object>singletonMap(Queue.NAME, "myQueue"));
+        final ConfiguredObjectRecord queueCor = queue.asObjectRecord();
+
+        Map<String, Object> exchangeArgs = new HashMap<>();
+        exchangeArgs.put(Exchange.NAME, "myExchange");
+        exchangeArgs.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+        Exchange exchange = virtualHost.createChild(Exchange.class, exchangeArgs);
+        final ConfiguredObjectRecord exchangeCor = exchange.asObjectRecord();
+
+        assertEquals("Unexpected number of queues before stop", 1, virtualHost.getChildren(Queue.class).size());
+        assertEquals("Unexpected number of exchanges before stop", 5, virtualHost.getChildren(Exchange.class).size());
+
+        virtualHost.stop();
+        assertEquals("Unexpected state", State.STOPPED, virtualHost.getState());
+        assertEquals("Unexpected number of queues after stop", 0, virtualHost.getChildren(Queue.class).size());
+        assertEquals("Unexpected number of exchanges after stop", 0, virtualHost.getChildren(Exchange.class).size());
+
+        // Setup an answer that will return the configured object records
+        doAnswer(new Answer()
+        {
+            final Iterator<ConfiguredObjectRecord> corIterator = asList(queueCor, exchangeCor, virtualHostCor).iterator();
+
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable
+            {
+                ConfiguredObjectRecordHandler handler = (ConfiguredObjectRecordHandler) invocation.getArguments()[0];
+                boolean handlerContinue = true;
+                while(corIterator.hasNext() && handlerContinue)
+                {
+                    handlerContinue = handler.handle(corIterator.next());
+                }
+
+                return null;
+            }
+        }).when(_configStore).visitConfiguredObjectRecords(any(ConfiguredObjectRecordHandler.class));
+
+        virtualHost.start();
+        assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
+
+        assertEquals("Unexpected number of queues after restart", 1, virtualHost.getChildren(Queue.class).size());
+        assertEquals("Unexpected number of exchanges after restart", 5, virtualHost.getChildren(Exchange.class).size());
+    }
+
+
     public void testStopVirtualHost_ClosesConnections()
     {
         String virtualHostName = getName();

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java Thu Jul 31 12:02:10 2014
@@ -325,7 +325,7 @@ public class BrokerRecovererTest extends
 
     private void resolveObjects(ConfiguredObjectRecord... records)
     {
-        GenericRecoverer recoverer = new GenericRecoverer(_systemConfig, Broker.class.getSimpleName());
+        GenericRecoverer recoverer = new GenericRecoverer(_systemConfig);
         recoverer.recover(Arrays.asList(records));
     }
 

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java Thu Jul 31 12:02:10 2014
@@ -112,14 +112,15 @@ public class QpidRestTestCase extends Qp
     public Map<String, Object> waitForAttributeChanged(String url, String attributeName, Object newValue) throws Exception
     {
         List<Map<String, Object>> nodeAttributes = getRestTestHelper().getJsonAsList(url);
-        long limit = System.currentTimeMillis() + 5000;
+        int timeout = 5000;
+        long limit = System.currentTimeMillis() + timeout;
         while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName))))
         {
             Thread.sleep(100l);
             nodeAttributes = getRestTestHelper().getJsonAsList(url);
         }
         Map<String, Object> nodeData = nodeAttributes.get(0);
-        assertEquals("Unexpected attribute " + attributeName, newValue, nodeData.get(attributeName));
+        assertEquals("Attribute " + attributeName + " did not reach expected value within permitted timeout "  + timeout + "ms.", newValue, nodeData.get(attributeName));
         return nodeData;
     }
 }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java Thu Jul 31 12:02:10 2014
@@ -27,12 +27,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.Session;
 import javax.servlet.http.HttpServletResponse;
 
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
-
 import org.apache.qpid.server.virtualhost.ProvidedStoreVirtualHostImpl;
 import org.apache.qpid.server.virtualhostnode.JsonVirtualHostNode;
 import org.apache.qpid.client.AMQConnection;
@@ -163,22 +162,60 @@ public class VirtualHostRestTest extends
 
     public void testMutateState() throws Exception
     {
-        String hostToUpdate = TEST3_VIRTUALHOST;
-        String restHostUrl = "virtualhost/" + hostToUpdate + "/" + hostToUpdate;
+        String restHostUrl = "virtualhost/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST;
 
+        waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
         assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
 
         Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
         getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
 
+        waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
         assertActualAndDesireStates(restHostUrl, "STOPPED", "STOPPED");
 
         newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
         getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
 
+        waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+
         assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
     }
 
+    public void testMutateStateOfVirtualHostWithQueuesAndMessages() throws Exception
+    {
+        String testQueueName = getTestQueueName();
+        String restHostUrl = "virtualhost/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST;
+        String restQueueUrl = "queue/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST + "/" + testQueueName;
+
+        waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+        assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
+
+        Connection connection = getConnection();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Destination dest = session.createQueue(testQueueName);
+        session.createConsumer(dest).close();
+        session.createProducer(dest).send(session.createTextMessage("My test message"));
+        session.commit();
+        connection.close();
+
+        assertQueueDepth(restQueueUrl, "Unexpected number of messages before stopped", 1);
+
+        Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
+        getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
+
+        waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
+        assertActualAndDesireStates(restHostUrl, "STOPPED", "STOPPED");
+
+        newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
+        getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
+
+        waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+
+        assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
+
+        assertQueueDepth(restQueueUrl, "Unexpected number of messages after restart", 1);
+    }
+
     public void testRecoverVirtualHostInDesiredStateStoppedWithDescription() throws Exception
     {
         String hostToUpdate = TEST3_VIRTUALHOST;
@@ -502,15 +539,13 @@ public class VirtualHostRestTest extends
         assertEquals("Unexpected response code", 201, statusCode);
     }
 
-    private void createQueue(String queueName, String queueType, Map<String, Object> attributes) throws IOException,
-            JsonGenerationException, JsonMappingException
+    private void createQueue(String queueName, String queueType, Map<String, Object> attributes) throws Exception
     {
         int responseCode = tryCreateQueue(queueName, queueType, attributes);
         assertEquals("Unexpected response code", 201, responseCode);
     }
 
-    private int tryCreateQueue(String queueName, String queueType, Map<String, Object> attributes) throws IOException,
-            JsonGenerationException, JsonMappingException
+    private int tryCreateQueue(String queueName, String queueType, Map<String, Object> attributes) throws Exception
     {
         Map<String, Object> queueData = new HashMap<String, Object>();
         queueData.put(Queue.NAME, queueName);
@@ -580,11 +615,21 @@ public class VirtualHostRestTest extends
     }
 
     private void assertActualAndDesireStates(final String restUrl,
-                                                            final String expectedDesiredState,
-                                                            final String expectedActualState) throws IOException
+                                             final String expectedDesiredState,
+                                             final String expectedActualState) throws IOException
     {
         Map<String, Object> virtualhost = getRestTestHelper().getJsonAsSingletonList(restUrl);
         Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, virtualhost);
     }
 
+    private void assertQueueDepth(String restQueueUrl, String message, int expectedDepth) throws IOException
+    {
+        Map<String, Object> queueDetails = getRestTestHelper().getJsonAsSingletonList(restQueueUrl);
+        assertNotNull(queueDetails);
+        Map<String, Object> statistics = (Map<String, Object>) queueDetails.get(Asserts.STATISTICS_ATTRIBUTE);
+        assertNotNull(statistics);
+
+        assertEquals(message, expectedDepth, statistics.get("queueDepthMessages"));
+    }
+
 }

Modified: qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes Thu Jul 31 12:02:10 2014
@@ -57,6 +57,7 @@ org.apache.qpid.test.unit.client.MaxDeli
 
 org.apache.qpid.systest.rest.VirtualHostRestTest#testPutCreateVirtualHostUsingProfileNodeType
 org.apache.qpid.systest.rest.VirtualHostRestTest#testRecoverVirtualHostInDesiredStateStoppedWithDescription
+org.apache.qpid.systest.rest.VirtualHostRestTest#testMutateStateOfVirtualHostWithQueuesAndMessages
 
 org.apache.qpid.systest.rest.VirtualHostNodeRestTest#testCreateAndDeleteVirtualHostNode
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org