You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/07/31 14:02:11 UTC
svn commit: r1614866 - in /qpid/trunk/qpid/java:
bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/store/ broker-core/src/ma...
Author: kwall
Date: Thu Jul 31 12:02:10 2014
New Revision: 1614866
URL: http://svn.apache.org/r1614866
Log:
QPID-5926: [Java Broker] When transitioning from STOPPED to ACTIVE the virtualhost re-recovers children beneath it
Modified:
qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes
Modified: qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java Thu Jul 31 12:02:10 2014
@@ -29,11 +29,14 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import javax.servlet.http.HttpServletResponse;
+
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.systest.rest.Asserts;
import org.apache.qpid.systest.rest.QpidRestTestCase;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
import org.apache.qpid.util.FileUtils;
@@ -99,7 +102,7 @@ public class BDBHAVirtualHostRestTest ex
assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY));
Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "NO_SYNC");
- getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200);
+ getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK);
hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
assertEquals("Unexpected synchronization policy after change", "NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY));
@@ -111,9 +114,36 @@ public class BDBHAVirtualHostRestTest ex
assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY));
Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC");
- getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200);
+ getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK);
hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY));
}
+
+ public void testMutateState() throws Exception
+ {
+ waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
+ assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE");
+
+ Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
+ getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
+
+ waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "STOPPED");
+ assertActualAndDesireStates(_virtualhostUrl, "STOPPED", "STOPPED");
+
+ newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
+ getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
+
+ waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
+ assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE");
+ }
+
+ private void assertActualAndDesireStates(final String restUrl,
+ final String expectedDesiredState,
+ final String expectedActualState) throws IOException
+ {
+ Map<String, Object> virtualhost = getRestTestHelper().getJsonAsSingletonList(restUrl);
+ Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, virtualhost);
+ }
+
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Thu Jul 31 12:02:10 2014
@@ -574,7 +574,7 @@ public abstract class AbstractConfigured
}
}
- private void applyToChildren(Action<ConfiguredObject<?>> action)
+ protected void applyToChildren(Action<ConfiguredObject<?>> action)
{
for (Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java Thu Jul 31 12:02:10 2014
@@ -477,7 +477,7 @@ public class BrokerStoreUpgraderAndRecov
public Broker<?> perform(final DurableConfigurationStore store)
{
List<ConfiguredObjectRecord> upgradedRecords = upgrade(store);
- new GenericRecoverer(_systemConfig, Broker.class.getSimpleName()).recover(upgradedRecords);
+ new GenericRecoverer(_systemConfig).recover(upgradedRecords);
final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(store);
applyRecursively(_systemConfig.getBroker(), new Action<ConfiguredObject<?>>()
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java Thu Jul 31 12:02:10 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -40,18 +41,16 @@ public class GenericRecoverer
{
private static final Logger LOGGER = Logger.getLogger(GenericRecoverer.class);
- private final ConfiguredObject<?> _parentOfRoot;
- private final String _rootCategory;
+ private final ConfiguredObject<?> _root;
- public GenericRecoverer(ConfiguredObject<?> parentOfRoot, String rootCategory)
+ public GenericRecoverer(ConfiguredObject<?> root)
{
- _parentOfRoot = parentOfRoot;
- _rootCategory = rootCategory;
+ _root = root;
}
public void recover(final List<ConfiguredObjectRecord> records)
{
- _parentOfRoot.getTaskExecutor().run(new VoidTask()
+ _root.getTaskExecutor().run(new VoidTask()
{
@Override
public void execute()
@@ -62,44 +61,70 @@ public class GenericRecoverer
@Override
public String toString()
{
- return _rootCategory + " recovery";
+ return "RecoveringChildrenOf_" + _root.getCategoryClass().getSimpleName();
}
});
-
}
private void performRecover(List<ConfiguredObjectRecord> records)
{
- ConfiguredObjectRecord rootRecord = null;
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Recovering the children of " + _root);
+ }
+
+ records = resolveDiscontinuity(records);
+ resolveObjects(_root, records);
+ }
+
+ private List<ConfiguredObjectRecord> resolveDiscontinuity(final List<ConfiguredObjectRecord> records)
+ {
+ Collection<Class<? extends ConfiguredObject>> childTypesOfRoot = _root.getModel().getChildTypes(_root.getCategoryClass());
+ List<ConfiguredObjectRecord> newRecords = new ArrayList<>(records.size());
+
for (ConfiguredObjectRecord record : records)
{
- if (_rootCategory.equals(record.getType()))
+ if (record.getId().equals(_root.getId()))
{
- rootRecord = record;
- break;
+ // If the parent is already in the records, we skip it, this supports partial recovery
+ // (required when restarting a virtualhost). In the long term, when the objects take responsibility
+ // for the recovery of immediate descendants only, this will disappear.
+ }
+ else if ((record.getParents() == null || record.getParents().size() == 0))
+ {
+ if (containsCategory(childTypesOfRoot, record.getType()))
+ {
+ String parentOfRootCategory = _root.getCategoryClass().getSimpleName();
+ Map<String, UUID> rootParents = Collections.singletonMap(parentOfRootCategory, _root.getId());
+ newRecords.add(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes(), rootParents));
+ }
+ else
+ {
+ throw new IllegalArgumentException("Recovered configured object record " + record
+ + " has no recorded parents and is not a valid child type"
+ + " [" + Arrays.toString(childTypesOfRoot.toArray()) + "]"
+ + " for the root " + _root);
+ }
+ }
+ else
+ {
+ newRecords.add(record);
}
}
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Root record " + rootRecord);
- }
+ return newRecords;
+ }
- if (rootRecord != null)
+ private boolean containsCategory(Collection<Class<? extends ConfiguredObject>> childCategories, String categorySimpleName)
+ {
+ for (Class<? extends ConfiguredObject> child : childCategories)
{
-
- if (rootRecord.getParents() == null || rootRecord.getParents().isEmpty())
+ if (child.getSimpleName().equals(categorySimpleName))
{
- records = new ArrayList<ConfiguredObjectRecord>(records);
-
- String parentOfRootCategory = _parentOfRoot.getCategoryClass().getSimpleName();
- Map<String, UUID> rootParents = Collections.singletonMap(parentOfRootCategory, _parentOfRoot.getId());
- records.remove(rootRecord);
- records.add(new ConfiguredObjectRecordImpl(rootRecord.getId(), _rootCategory, rootRecord.getAttributes(), rootParents));
+ return true;
}
-
- resolveObjects(_parentOfRoot, records);
}
+ return false;
}
private void resolveObjects(ConfiguredObject<?> parentObject, List<ConfiguredObjectRecord> records)
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java Thu Jul 31 12:02:10 2014
@@ -406,6 +406,6 @@ public class VirtualHostStoreUpgraderAnd
GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders);
upgraderHandler.upgrade();
- new GenericRecoverer(_virtualHostNode, virtualHostCategory).recover(upgraderHandler.getRecords());
+ new GenericRecoverer(_virtualHostNode).recover(upgraderHandler.getRecords());
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Thu Jul 31 12:02:10 2014
@@ -75,6 +75,7 @@ import org.apache.qpid.server.store.Conf
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.GenericRecoverer;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreProvider;
import org.apache.qpid.server.store.StoreException;
@@ -82,6 +83,7 @@ import org.apache.qpid.server.store.hand
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.MapValueConverter;
@@ -341,7 +343,6 @@ public abstract class AbstractVirtualHos
public Collection<Connection> getConnections()
{
return getChildren(Connection.class);
-
}
@Override
@@ -1317,7 +1318,7 @@ public abstract class AbstractVirtualHos
getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes()));
}
- @StateTransition( currentState = { State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
+ @StateTransition( currentState = { State.UNINITIALIZED }, desiredState = State.ACTIVE )
private void onActivate()
{
_houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount());
@@ -1339,9 +1340,6 @@ public abstract class AbstractVirtualHos
}
MessageStoreRecoverer messageStoreRecoverer;
-
-
-
if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY))
{
messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
@@ -1364,7 +1362,58 @@ public abstract class AbstractVirtualHos
_state.set(finalState);
reportIfError(_state.get());
}
+ }
+
+ @StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
+ private void onRestart()
+ {
+ resetStatistics();
+
+ final List<ConfiguredObjectRecord> records = new ArrayList<>();
+
+ // Transitioning to STOPPED will have closed all our children. Now we are transition
+ // back to ACTIVE, we need to recover and re-open them.
+
+ getDurableConfigurationStore().visitConfiguredObjectRecords(new ConfiguredObjectRecordHandler()
+ {
+ @Override
+ public void begin()
+ {
+ }
+
+ @Override
+ public boolean handle(final ConfiguredObjectRecord record)
+ {
+ records.add(record);
+ return true;
+ }
+
+ @Override
+ public void end()
+ {
+ }
+ });
+
+ new GenericRecoverer(this).recover(records);
+
+ Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ applyToChildren(new Action<ConfiguredObject<?>>()
+ {
+ @Override
+ public void performAction(final ConfiguredObject<?> object)
+ {
+ object.open();
+ }
+ });
+ return null;
+ }
+ });
+ onActivate();
}
private class StoreUpdatingChangeListener implements ConfigurationChangeListener
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Thu Jul 31 12:02:10 2014
@@ -20,34 +20,40 @@
*/
package org.apache.qpid.server.model;
+import static java.util.Arrays.asList;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.security.AccessControlException;
-import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.connection.IConnectionRegistry.RegistryChangeListener;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -157,6 +163,60 @@ public class VirtualHostTest extends Qpi
verify(_configStore, times(2)).update(eq(false), matchesRecord(virtualHost.getId(), virtualHost.getType()));
}
+ public void testRestartingVirtualHostRecoversChildren()
+ {
+ String virtualHostName = getName();
+
+ VirtualHost<?,?,?> virtualHost = createVirtualHost(virtualHostName);
+ assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
+ final ConfiguredObjectRecord virtualHostCor = virtualHost.asObjectRecord();
+
+ // Give virtualhost a queue and an exchange
+ Queue queue = virtualHost.createChild(Queue.class, Collections.<String, Object>singletonMap(Queue.NAME, "myQueue"));
+ final ConfiguredObjectRecord queueCor = queue.asObjectRecord();
+
+ Map<String, Object> exchangeArgs = new HashMap<>();
+ exchangeArgs.put(Exchange.NAME, "myExchange");
+ exchangeArgs.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+ Exchange exchange = virtualHost.createChild(Exchange.class, exchangeArgs);
+ final ConfiguredObjectRecord exchangeCor = exchange.asObjectRecord();
+
+ assertEquals("Unexpected number of queues before stop", 1, virtualHost.getChildren(Queue.class).size());
+ assertEquals("Unexpected number of exchanges before stop", 5, virtualHost.getChildren(Exchange.class).size());
+
+ virtualHost.stop();
+ assertEquals("Unexpected state", State.STOPPED, virtualHost.getState());
+ assertEquals("Unexpected number of queues after stop", 0, virtualHost.getChildren(Queue.class).size());
+ assertEquals("Unexpected number of exchanges after stop", 0, virtualHost.getChildren(Exchange.class).size());
+
+ // Setup an answer that will return the configured object records
+ doAnswer(new Answer()
+ {
+ final Iterator<ConfiguredObjectRecord> corIterator = asList(queueCor, exchangeCor, virtualHostCor).iterator();
+
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ ConfiguredObjectRecordHandler handler = (ConfiguredObjectRecordHandler) invocation.getArguments()[0];
+ boolean handlerContinue = true;
+ while(corIterator.hasNext() && handlerContinue)
+ {
+ handlerContinue = handler.handle(corIterator.next());
+ }
+
+ return null;
+ }
+ }).when(_configStore).visitConfiguredObjectRecords(any(ConfiguredObjectRecordHandler.class));
+
+ virtualHost.start();
+ assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
+
+ assertEquals("Unexpected number of queues after restart", 1, virtualHost.getChildren(Queue.class).size());
+ assertEquals("Unexpected number of exchanges after restart", 5, virtualHost.getChildren(Exchange.class).size());
+ }
+
+
public void testStopVirtualHost_ClosesConnections()
{
String virtualHostName = getName();
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java Thu Jul 31 12:02:10 2014
@@ -325,7 +325,7 @@ public class BrokerRecovererTest extends
private void resolveObjects(ConfiguredObjectRecord... records)
{
- GenericRecoverer recoverer = new GenericRecoverer(_systemConfig, Broker.class.getSimpleName());
+ GenericRecoverer recoverer = new GenericRecoverer(_systemConfig);
recoverer.recover(Arrays.asList(records));
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java Thu Jul 31 12:02:10 2014
@@ -112,14 +112,15 @@ public class QpidRestTestCase extends Qp
public Map<String, Object> waitForAttributeChanged(String url, String attributeName, Object newValue) throws Exception
{
List<Map<String, Object>> nodeAttributes = getRestTestHelper().getJsonAsList(url);
- long limit = System.currentTimeMillis() + 5000;
+ int timeout = 5000;
+ long limit = System.currentTimeMillis() + timeout;
while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName))))
{
Thread.sleep(100l);
nodeAttributes = getRestTestHelper().getJsonAsList(url);
}
Map<String, Object> nodeData = nodeAttributes.get(0);
- assertEquals("Unexpected attribute " + attributeName, newValue, nodeData.get(attributeName));
+ assertEquals("Attribute " + attributeName + " did not reach expected value within permitted timeout " + timeout + "ms.", newValue, nodeData.get(attributeName));
return nodeData;
}
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java Thu Jul 31 12:02:10 2014
@@ -27,12 +27,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.Session;
import javax.servlet.http.HttpServletResponse;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
-
import org.apache.qpid.server.virtualhost.ProvidedStoreVirtualHostImpl;
import org.apache.qpid.server.virtualhostnode.JsonVirtualHostNode;
import org.apache.qpid.client.AMQConnection;
@@ -163,22 +162,60 @@ public class VirtualHostRestTest extends
public void testMutateState() throws Exception
{
- String hostToUpdate = TEST3_VIRTUALHOST;
- String restHostUrl = "virtualhost/" + hostToUpdate + "/" + hostToUpdate;
+ String restHostUrl = "virtualhost/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST;
+ waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
+ waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
assertActualAndDesireStates(restHostUrl, "STOPPED", "STOPPED");
newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
+ waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+
assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
}
+ public void testMutateStateOfVirtualHostWithQueuesAndMessages() throws Exception
+ {
+ String testQueueName = getTestQueueName();
+ String restHostUrl = "virtualhost/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST;
+ String restQueueUrl = "queue/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST + "/" + testQueueName;
+
+ waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+ assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
+
+ Connection connection = getConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination dest = session.createQueue(testQueueName);
+ session.createConsumer(dest).close();
+ session.createProducer(dest).send(session.createTextMessage("My test message"));
+ session.commit();
+ connection.close();
+
+ assertQueueDepth(restQueueUrl, "Unexpected number of messages before stopped", 1);
+
+ Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
+ getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
+
+ waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
+ assertActualAndDesireStates(restHostUrl, "STOPPED", "STOPPED");
+
+ newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
+ getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
+
+ waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+
+ assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
+
+ assertQueueDepth(restQueueUrl, "Unexpected number of messages after restart", 1);
+ }
+
public void testRecoverVirtualHostInDesiredStateStoppedWithDescription() throws Exception
{
String hostToUpdate = TEST3_VIRTUALHOST;
@@ -502,15 +539,13 @@ public class VirtualHostRestTest extends
assertEquals("Unexpected response code", 201, statusCode);
}
- private void createQueue(String queueName, String queueType, Map<String, Object> attributes) throws IOException,
- JsonGenerationException, JsonMappingException
+ private void createQueue(String queueName, String queueType, Map<String, Object> attributes) throws Exception
{
int responseCode = tryCreateQueue(queueName, queueType, attributes);
assertEquals("Unexpected response code", 201, responseCode);
}
- private int tryCreateQueue(String queueName, String queueType, Map<String, Object> attributes) throws IOException,
- JsonGenerationException, JsonMappingException
+ private int tryCreateQueue(String queueName, String queueType, Map<String, Object> attributes) throws Exception
{
Map<String, Object> queueData = new HashMap<String, Object>();
queueData.put(Queue.NAME, queueName);
@@ -580,11 +615,21 @@ public class VirtualHostRestTest extends
}
private void assertActualAndDesireStates(final String restUrl,
- final String expectedDesiredState,
- final String expectedActualState) throws IOException
+ final String expectedDesiredState,
+ final String expectedActualState) throws IOException
{
Map<String, Object> virtualhost = getRestTestHelper().getJsonAsSingletonList(restUrl);
Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, virtualhost);
}
+ private void assertQueueDepth(String restQueueUrl, String message, int expectedDepth) throws IOException
+ {
+ Map<String, Object> queueDetails = getRestTestHelper().getJsonAsSingletonList(restQueueUrl);
+ assertNotNull(queueDetails);
+ Map<String, Object> statistics = (Map<String, Object>) queueDetails.get(Asserts.STATISTICS_ATTRIBUTE);
+ assertNotNull(statistics);
+
+ assertEquals(message, expectedDepth, statistics.get("queueDepthMessages"));
+ }
+
}
Modified: qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes?rev=1614866&r1=1614865&r2=1614866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes Thu Jul 31 12:02:10 2014
@@ -57,6 +57,7 @@ org.apache.qpid.test.unit.client.MaxDeli
org.apache.qpid.systest.rest.VirtualHostRestTest#testPutCreateVirtualHostUsingProfileNodeType
org.apache.qpid.systest.rest.VirtualHostRestTest#testRecoverVirtualHostInDesiredStateStoppedWithDescription
+org.apache.qpid.systest.rest.VirtualHostRestTest#testMutateStateOfVirtualHostWithQueuesAndMessages
org.apache.qpid.systest.rest.VirtualHostNodeRestTest#testCreateAndDeleteVirtualHostNode
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org