You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2013/12/10 18:19:49 UTC
svn commit: r1549898 [1/4] - in /qpid/branches/java-broker-bdb-ha/qpid/java:
bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/
bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/
bdbstore/src/main/java/org/apac...
Author: kwall
Date: Tue Dec 10 17:19:47 2013
New Revision: 1549898
URL: http://svn.apache.org/r1549898
Log:
QPID-5410: Refactor BDBMessageStore and HAMessageStore to encapsulate environment specific operations into facades
Added:
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
- copied, changed from r1549894, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
- copied, changed from r1549894, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeTestCase.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
- copied, changed from r1549894, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/DurableConfigurationRecovererTest.java
- copied, changed from r1549894, qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
Removed:
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
Modified:
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java Tue Dec 10 17:19:47 2013
@@ -39,7 +39,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.jmx.AMQManagedObject;
import org.apache.qpid.server.jmx.ManagedObject;
-import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade;
/**
* Management mbean for BDB HA.
@@ -63,7 +63,7 @@ public class BDBHAMessageStoreManagerMBe
try
{
GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING};
- final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT};
+ final String[] itemNames = new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT};
final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "};
GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member",
itemNames,
@@ -71,7 +71,7 @@ public class BDBHAMessageStoreManagerMBe
GROUP_MEMBER_ATTRIBUTE_TYPES );
GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers",
GROUP_MEMBER_ROW,
- new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME});
+ new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME});
}
catch (final OpenDataException ode)
{
@@ -79,44 +79,44 @@ public class BDBHAMessageStoreManagerMBe
}
}
- private final BDBHAMessageStore _store;
+ private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
- protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store, ManagedObject parent) throws JMException
+ protected BDBHAMessageStoreManagerMBean(ReplicatedEnvironmentFacade replicatedEnvironmentFacade, ManagedObject parent) throws JMException
{
super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE, ((AMQManagedObject)parent).getRegistry());
LOGGER.debug("Creating BDBHAMessageStoreManagerMBean");
- _store = store;
+ _replicatedEnvironmentFacade = replicatedEnvironmentFacade;
register();
}
@Override
public String getObjectInstanceName()
{
- return ObjectName.quote(_store.getName());
+ return ObjectName.quote(_replicatedEnvironmentFacade.getName());
}
@Override
public String getGroupName()
{
- return _store.getGroupName();
+ return _replicatedEnvironmentFacade.getGroupName();
}
@Override
public String getNodeName()
{
- return _store.getNodeName();
+ return _replicatedEnvironmentFacade.getNodeName();
}
@Override
public String getNodeHostPort()
{
- return _store.getNodeHostPort();
+ return _replicatedEnvironmentFacade.getNodeHostPort();
}
@Override
public String getHelperHostPort()
{
- return _store.getHelperHostPort();
+ return _replicatedEnvironmentFacade.getHelperHostPort();
}
@Override
@@ -124,7 +124,7 @@ public class BDBHAMessageStoreManagerMBe
{
try
{
- return _store.getDurability();
+ return _replicatedEnvironmentFacade.getDurability();
}
catch (RuntimeException e)
{
@@ -137,7 +137,7 @@ public class BDBHAMessageStoreManagerMBe
@Override
public boolean getCoalescingSync() throws IOException, JMException
{
- return _store.isCoalescingSync();
+ return _replicatedEnvironmentFacade.isCoalescingSync();
}
@Override
@@ -145,7 +145,7 @@ public class BDBHAMessageStoreManagerMBe
{
try
{
- return _store.getNodeState();
+ return _replicatedEnvironmentFacade.getNodeState();
}
catch (RuntimeException e)
{
@@ -159,7 +159,7 @@ public class BDBHAMessageStoreManagerMBe
{
try
{
- return _store.isDesignatedPrimary();
+ return _replicatedEnvironmentFacade.isDesignatedPrimary();
}
catch (RuntimeException e)
{
@@ -172,7 +172,7 @@ public class BDBHAMessageStoreManagerMBe
public TabularData getAllNodesInGroup() throws IOException, JMException
{
final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE);
- final List<Map<String, String>> members = _store.getGroupMembers();
+ final List<Map<String, String>> members = _replicatedEnvironmentFacade.getGroupMembers();
for (Map<String, String> map : members)
{
@@ -187,7 +187,7 @@ public class BDBHAMessageStoreManagerMBe
{
try
{
- _store.removeNodeFromGroup(nodeName);
+ _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName);
}
catch (AMQStoreException e)
{
@@ -201,11 +201,11 @@ public class BDBHAMessageStoreManagerMBe
{
try
{
- _store.setDesignatedPrimary(primary);
+ _replicatedEnvironmentFacade.setDesignatedPrimary(primary);
}
catch (AMQStoreException e)
{
- LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary", e);
+ LOGGER.error("Failed to set node " + _replicatedEnvironmentFacade.getNodeName() + " as designated primary", e);
throw new JMException(e.getMessage());
}
}
@@ -215,7 +215,7 @@ public class BDBHAMessageStoreManagerMBe
{
try
{
- _store.updateAddress(nodeName, newHostName, newPort);
+ _replicatedEnvironmentFacade.updateAddress(nodeName, newHostName, newPort);
}
catch(AMQStoreException e)
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java Tue Dec 10 17:19:47 2013
@@ -28,11 +28,12 @@ import org.apache.qpid.server.jmx.MBeanP
import org.apache.qpid.server.jmx.ManagedObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade;
/**
* This provide will create a {@link BDBHAMessageStoreManagerMBean} if the child is a virtual
- * host and of type {@link BDBHAMessageStore#TYPE}.
+ * host and of type {@link ReplicatedEnvironmentFacade#TYPE}.
*
*/
public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider
@@ -48,7 +49,7 @@ public class BDBHAMessageStoreManagerMBe
public boolean isChildManageableByMBean(ConfiguredObject child)
{
return (child instanceof VirtualHost
- && BDBHAMessageStore.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
+ && ReplicatedEnvironmentFacade.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
}
@Override
@@ -56,14 +57,15 @@ public class BDBHAMessageStoreManagerMBe
{
VirtualHost virtualHostChild = (VirtualHost) child;
- BDBHAMessageStore messageStore = (BDBHAMessageStore) virtualHostChild.getMessageStore();
+ BDBMessageStore messageStore = (BDBMessageStore) virtualHostChild.getMessageStore();
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Creating mBean for child " + child);
}
- return new BDBHAMessageStoreManagerMBean(messageStore, (ManagedObject) parent);
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = (ReplicatedEnvironmentFacade)messageStore.getEnvironmentFacade();
+ return new BDBHAMessageStoreManagerMBean(replicatedEnvironmentFacade, (ManagedObject) parent);
}
@Override
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java Tue Dec 10 17:19:47 2013
@@ -43,7 +43,7 @@ import org.apache.qpid.server.jmx.Manage
import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
-import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBean;
import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
@@ -58,7 +58,7 @@ public class BDBHAMessageStoreManagerMBe
private static final String TEST_STORE_NAME = "testStoreName";
private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false;
- private BDBHAMessageStore _store;
+ private ReplicatedEnvironmentFacade _replicatedEnvironmentFacadee;
private BDBHAMessageStoreManagerMBean _mBean;
private AMQManagedObject _mBeanParent;
@@ -68,10 +68,10 @@ public class BDBHAMessageStoreManagerMBe
super.setUp();
CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
- _store = mock(BDBHAMessageStore.class);
+ _replicatedEnvironmentFacadee = mock(ReplicatedEnvironmentFacade.class);
_mBeanParent = mock(AMQManagedObject.class);
when(_mBeanParent.getRegistry()).thenReturn(mock(ManagedObjectRegistry.class));
- _mBean = new BDBHAMessageStoreManagerMBean(_store, _mBeanParent);
+ _mBean = new BDBHAMessageStoreManagerMBean(_replicatedEnvironmentFacadee, _mBeanParent);
}
@Override
@@ -83,7 +83,7 @@ public class BDBHAMessageStoreManagerMBe
public void testObjectName() throws Exception
{
- when(_store.getName()).thenReturn(TEST_STORE_NAME);
+ when(_replicatedEnvironmentFacadee.getName()).thenReturn(TEST_STORE_NAME);
String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_STORE_NAME);
assertEquals(expectedObjectName, _mBean.getObjectName().toString());
@@ -91,56 +91,56 @@ public class BDBHAMessageStoreManagerMBe
public void testGroupName() throws Exception
{
- when(_store.getGroupName()).thenReturn(TEST_GROUP_NAME);
+ when(_replicatedEnvironmentFacadee.getGroupName()).thenReturn(TEST_GROUP_NAME);
assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME));
}
public void testNodeName() throws Exception
{
- when(_store.getNodeName()).thenReturn(TEST_NODE_NAME);
+ when(_replicatedEnvironmentFacadee.getNodeName()).thenReturn(TEST_NODE_NAME);
assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME));
}
public void testNodeHostPort() throws Exception
{
- when(_store.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT);
+ when(_replicatedEnvironmentFacadee.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT);
assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT));
}
public void testHelperHostPort() throws Exception
{
- when(_store.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT);
+ when(_replicatedEnvironmentFacadee.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT);
assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT));
}
public void testDurability() throws Exception
{
- when(_store.getDurability()).thenReturn(TEST_DURABILITY);
+ when(_replicatedEnvironmentFacadee.getDurability()).thenReturn(TEST_DURABILITY);
assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY));
}
public void testCoalescingSync() throws Exception
{
- when(_store.isCoalescingSync()).thenReturn(true);
+ when(_replicatedEnvironmentFacadee.isCoalescingSync()).thenReturn(true);
assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC));
}
public void testNodeState() throws Exception
{
- when(_store.getNodeState()).thenReturn(TEST_NODE_STATE);
+ when(_replicatedEnvironmentFacadee.getNodeState()).thenReturn(TEST_NODE_STATE);
assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE));
}
public void testDesignatedPrimaryFlag() throws Exception
{
- when(_store.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
+ when(_replicatedEnvironmentFacadee.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY));
}
@@ -148,29 +148,29 @@ public class BDBHAMessageStoreManagerMBe
public void testGroupMembersForGroupWithOneNode() throws Exception
{
List<Map<String, String>> members = Collections.singletonList(createTestNodeResult());
- when(_store.getGroupMembers()).thenReturn(members);
+ when(_replicatedEnvironmentFacadee.getGroupMembers()).thenReturn(members);
final TabularData resultsTable = _mBean.getAllNodesInGroup();
- assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT);
+ assertTableHasHeadingsNamed(resultsTable, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT);
final int numberOfDataRows = resultsTable.size();
assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows);
final CompositeData row = (CompositeData) resultsTable.values().iterator().next();
- assertEquals(TEST_NODE_NAME, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME));
- assertEquals(TEST_NODE_HOST_PORT, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
+ assertEquals(TEST_NODE_NAME, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME));
+ assertEquals(TEST_NODE_HOST_PORT, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT));
}
public void testRemoveNodeFromReplicationGroup() throws Exception
{
_mBean.removeNodeFromGroup(TEST_NODE_NAME);
- verify(_store).removeNodeFromGroup(TEST_NODE_NAME);
+ verify(_replicatedEnvironmentFacadee).removeNodeFromGroup(TEST_NODE_NAME);
}
public void testRemoveNodeFromReplicationGroupWithError() throws Exception
{
- doThrow(new AMQStoreException("mocked exception")).when(_store).removeNodeFromGroup(TEST_NODE_NAME);
+ doThrow(new AMQStoreException("mocked exception")).when(_replicatedEnvironmentFacadee).removeNodeFromGroup(TEST_NODE_NAME);
try
{
@@ -187,12 +187,12 @@ public class BDBHAMessageStoreManagerMBe
{
_mBean.setDesignatedPrimary(true);
- verify(_store).setDesignatedPrimary(true);
+ verify(_replicatedEnvironmentFacadee).setDesignatedPrimary(true);
}
public void testSetAsDesignatedPrimaryWithError() throws Exception
{
- doThrow(new AMQStoreException("mocked exception")).when(_store).setDesignatedPrimary(true);
+ doThrow(new AMQStoreException("mocked exception")).when(_replicatedEnvironmentFacadee).setDesignatedPrimary(true);
try
{
@@ -212,7 +212,7 @@ public class BDBHAMessageStoreManagerMBe
_mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort);
- verify(_store).updateAddress(TEST_NODE_NAME, newHostName, newPort);
+ verify(_replicatedEnvironmentFacadee).updateAddress(TEST_NODE_NAME, newHostName, newPort);
}
private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames)
@@ -227,8 +227,8 @@ public class BDBHAMessageStoreManagerMBe
private Map<String, String> createTestNodeResult()
{
Map<String, String> items = new HashMap<String, String>();
- items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
- items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
+ items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
+ items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
return items;
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java Tue Dec 10 17:19:47 2013
@@ -20,8 +20,16 @@ package org.apache.qpid.server.store.ber
*
*/
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.AbstractActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.stats.StatisticsGatherer;
@@ -37,9 +45,14 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
public class BDBHAVirtualHost extends AbstractVirtualHost
{
- private BDBHAMessageStore _messageStore;
+ private static final Logger LOGGER = Logger.getLogger(BDBHAVirtualHost.class);
+
+ private BDBMessageStore _messageStore;
private boolean _inVhostInitiatedClose;
@@ -57,7 +70,7 @@ public class BDBHAVirtualHost extends Ab
protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost) throws Exception
{
- _messageStore = new BDBHAMessageStore();
+ _messageStore = new BDBMessageStore(ReplicatedEnvironmentFacade.TYPE, new ReplicatedEnvironmentFacadeFactory());
final MessageStoreLogSubject storeLogSubject =
new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
@@ -85,6 +98,8 @@ public class BDBHAVirtualHost extends Ab
virtualHost, recoveryHandler,
recoveryHandler
);
+
+ ((ReplicatedEnvironmentFacade)_messageStore.getEnvironmentFacade()).setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
}
@@ -202,4 +217,140 @@ public class BDBHAVirtualHost extends Ab
}
}
+ private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
+ {
+ // TODO shutdown the executor
+ private final Executor _executor = Executors.newSingleThreadExecutor();
+
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState();
+
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Received BDB event indicating transition to state " + state);
+ }
+
+ switch (state)
+ {
+ case MASTER:
+ activateStoreAsync();
+ break;
+ case REPLICA:
+ passivateStoreAsync();
+ break;
+ case DETACHED:
+ LOGGER.error("BDB replicated node in detached state, therefore passivating.");
+ passivateStoreAsync();
+ break;
+ case UNKNOWN:
+ LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)");
+ break;
+ default:
+ LOGGER.error("Unexpected state change: " + state);
+ throw new IllegalStateException("Unexpected state change: " + state);
+ }
+ }
+
+ /**
+ * Calls {@link MessageStore#activate()}.
+ *
+ * <p/>
+ *
+ * This is done a background thread, in line with
+ * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because
+ * activate may execute transactions, which can't complete until
+ * {@link StateChangeListener#stateChange(StateChangeEvent)} has returned.
+ */
+ private void activateStoreAsync()
+ {
+ String threadName = "BDBHANodeActivationThread-" + getName();
+ executeStateChangeAsync(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
+ _messageStore.activate();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to activate on hearing MASTER change event", e);
+ throw e;
+ }
+ return null;
+ }
+ }, threadName);
+ }
+
+ private void passivateStoreAsync()
+ {
+ String threadName = "BDBHANodePassivationThread-" + getName();
+ executeStateChangeAsync(new Callable<Void>()
+ {
+
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED))
+ {
+ LOGGER.debug("Store becoming passive");
+ _messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
+ }
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event", e);
+ throw e;
+ }
+ return null;
+ }
+ }, threadName);
+ }
+
+ private void executeStateChangeAsync(final Callable<Void> callable, final String threadName)
+ {
+ final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger();
+
+ _executor.execute(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ final String originalThreadName = Thread.currentThread().getName();
+ Thread.currentThread().setName(threadName);
+ try
+ {
+ CurrentActor.set(new AbstractActor(_rootLogger)
+ {
+ @Override
+ public String getLogMessage()
+ {
+ return threadName;
+ }
+ });
+
+ try
+ {
+ callable.call();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Exception during state change", e);
+ }
+ }
+ finally
+ {
+ Thread.currentThread().setName(originalThreadName);
+ }
+ }
+ });
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org