You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2013/12/10 18:19:49 UTC

svn commit: r1549898 [1/4] - in /qpid/branches/java-broker-bdb-ha/qpid/java: bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/ bdbstore/src/main/java/org/apac...

Author: kwall
Date: Tue Dec 10 17:19:47 2013
New Revision: 1549898

URL: http://svn.apache.org/r1549898
Log:
QPID-5410: Refactor BDBMessageStore and HAMessageStore to encapsulate environment specific operations into facades

Added:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
      - copied, changed from r1549894, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
      - copied, changed from r1549894, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeTestCase.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
      - copied, changed from r1549894, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/DurableConfigurationRecovererTest.java
      - copied, changed from r1549894, qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
Removed:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
Modified:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
    qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
    qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java Tue Dec 10 17:19:47 2013
@@ -39,7 +39,7 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.jmx.AMQManagedObject;
 import org.apache.qpid.server.jmx.ManagedObject;
-import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade;
 
 /**
  * Management mbean for BDB HA.
@@ -63,7 +63,7 @@ public class BDBHAMessageStoreManagerMBe
         try
         {
             GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING};
-            final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT};
+            final String[] itemNames = new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT};
             final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "};
             GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member",
                                                 itemNames,
@@ -71,7 +71,7 @@ public class BDBHAMessageStoreManagerMBe
                                                 GROUP_MEMBER_ATTRIBUTE_TYPES );
             GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers",
                                                 GROUP_MEMBER_ROW,
-                                                new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME});
+                                                new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME});
         }
         catch (final OpenDataException ode)
         {
@@ -79,44 +79,44 @@ public class BDBHAMessageStoreManagerMBe
         }
     }
 
-    private final BDBHAMessageStore _store;
+    private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
 
-    protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store, ManagedObject parent) throws JMException
+    protected BDBHAMessageStoreManagerMBean(ReplicatedEnvironmentFacade replicatedEnvironmentFacade, ManagedObject parent) throws JMException
     {
         super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE, ((AMQManagedObject)parent).getRegistry());
         LOGGER.debug("Creating BDBHAMessageStoreManagerMBean");
-        _store = store;
+        _replicatedEnvironmentFacade = replicatedEnvironmentFacade;
         register();
     }
 
     @Override
     public String getObjectInstanceName()
     {
-        return ObjectName.quote(_store.getName());
+        return ObjectName.quote(_replicatedEnvironmentFacade.getName());
     }
 
     @Override
     public String getGroupName()
     {
-        return _store.getGroupName();
+        return _replicatedEnvironmentFacade.getGroupName();
     }
 
     @Override
     public String getNodeName()
     {
-        return _store.getNodeName();
+        return _replicatedEnvironmentFacade.getNodeName();
     }
 
     @Override
     public String getNodeHostPort()
     {
-        return _store.getNodeHostPort();
+        return _replicatedEnvironmentFacade.getNodeHostPort();
     }
 
     @Override
     public String getHelperHostPort()
     {
-        return _store.getHelperHostPort();
+        return _replicatedEnvironmentFacade.getHelperHostPort();
     }
 
     @Override
@@ -124,7 +124,7 @@ public class BDBHAMessageStoreManagerMBe
     {
         try
         {
-            return _store.getDurability();
+            return _replicatedEnvironmentFacade.getDurability();
         }
         catch (RuntimeException e)
         {
@@ -137,7 +137,7 @@ public class BDBHAMessageStoreManagerMBe
     @Override
     public boolean getCoalescingSync() throws IOException, JMException
     {
-        return _store.isCoalescingSync();
+        return _replicatedEnvironmentFacade.isCoalescingSync();
     }
 
     @Override
@@ -145,7 +145,7 @@ public class BDBHAMessageStoreManagerMBe
     {
         try
         {
-            return _store.getNodeState();
+            return _replicatedEnvironmentFacade.getNodeState();
         }
         catch (RuntimeException e)
         {
@@ -159,7 +159,7 @@ public class BDBHAMessageStoreManagerMBe
     {
         try
         {
-            return _store.isDesignatedPrimary();
+            return _replicatedEnvironmentFacade.isDesignatedPrimary();
         }
         catch (RuntimeException e)
         {
@@ -172,7 +172,7 @@ public class BDBHAMessageStoreManagerMBe
     public TabularData getAllNodesInGroup() throws IOException, JMException
     {
         final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE);
-        final List<Map<String, String>> members = _store.getGroupMembers();
+        final List<Map<String, String>> members = _replicatedEnvironmentFacade.getGroupMembers();
 
         for (Map<String, String> map : members)
         {
@@ -187,7 +187,7 @@ public class BDBHAMessageStoreManagerMBe
     {
         try
         {
-            _store.removeNodeFromGroup(nodeName);
+            _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName);
         }
         catch (AMQStoreException e)
         {
@@ -201,11 +201,11 @@ public class BDBHAMessageStoreManagerMBe
     {
         try
         {
-            _store.setDesignatedPrimary(primary);
+            _replicatedEnvironmentFacade.setDesignatedPrimary(primary);
         }
         catch (AMQStoreException e)
         {
-            LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary", e);
+            LOGGER.error("Failed to set node " + _replicatedEnvironmentFacade.getNodeName() + " as designated primary", e);
             throw new JMException(e.getMessage());
         }
     }
@@ -215,7 +215,7 @@ public class BDBHAMessageStoreManagerMBe
     {
         try
         {
-            _store.updateAddress(nodeName, newHostName, newPort);
+            _replicatedEnvironmentFacade.updateAddress(nodeName, newHostName, newPort);
         }
         catch(AMQStoreException e)
         {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java Tue Dec 10 17:19:47 2013
@@ -28,11 +28,12 @@ import org.apache.qpid.server.jmx.MBeanP
 import org.apache.qpid.server.jmx.ManagedObject;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade;
 
 /**
  * This provide will create a {@link BDBHAMessageStoreManagerMBean} if the child is a virtual
- * host and of type {@link BDBHAMessageStore#TYPE}.
+ * host and of type {@link ReplicatedEnvironmentFacade#TYPE}.
  *
  */
 public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider
@@ -48,7 +49,7 @@ public class BDBHAMessageStoreManagerMBe
     public boolean isChildManageableByMBean(ConfiguredObject child)
     {
         return (child instanceof VirtualHost
-            && BDBHAMessageStore.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
+            && ReplicatedEnvironmentFacade.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
     }
 
     @Override
@@ -56,14 +57,15 @@ public class BDBHAMessageStoreManagerMBe
     {
         VirtualHost virtualHostChild = (VirtualHost) child;
 
-        BDBHAMessageStore messageStore = (BDBHAMessageStore) virtualHostChild.getMessageStore();
+        BDBMessageStore messageStore = (BDBMessageStore) virtualHostChild.getMessageStore();
 
         if (LOGGER.isDebugEnabled())
         {
             LOGGER.debug("Creating mBean for child " + child);
         }
 
-        return new BDBHAMessageStoreManagerMBean(messageStore, (ManagedObject) parent);
+        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = (ReplicatedEnvironmentFacade)messageStore.getEnvironmentFacade();
+        return new BDBHAMessageStoreManagerMBean(replicatedEnvironmentFacade, (ManagedObject) parent);
     }
 
     @Override

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java Tue Dec 10 17:19:47 2013
@@ -43,7 +43,7 @@ import org.apache.qpid.server.jmx.Manage
 import org.apache.qpid.server.logging.SystemOutMessageLogger;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.TestLogActor;
-import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade;
 import org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBean;
 import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
 
@@ -58,7 +58,7 @@ public class BDBHAMessageStoreManagerMBe
     private static final String TEST_STORE_NAME = "testStoreName";
     private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false;
 
-    private BDBHAMessageStore _store;
+    private ReplicatedEnvironmentFacade _replicatedEnvironmentFacadee;
     private BDBHAMessageStoreManagerMBean _mBean;
     private AMQManagedObject _mBeanParent;
 
@@ -68,10 +68,10 @@ public class BDBHAMessageStoreManagerMBe
         super.setUp();
 
         CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
-        _store = mock(BDBHAMessageStore.class);
+        _replicatedEnvironmentFacadee = mock(ReplicatedEnvironmentFacade.class);
         _mBeanParent = mock(AMQManagedObject.class);
         when(_mBeanParent.getRegistry()).thenReturn(mock(ManagedObjectRegistry.class));
-        _mBean = new BDBHAMessageStoreManagerMBean(_store, _mBeanParent);
+        _mBean = new BDBHAMessageStoreManagerMBean(_replicatedEnvironmentFacadee, _mBeanParent);
     }
 
     @Override
@@ -83,7 +83,7 @@ public class BDBHAMessageStoreManagerMBe
 
     public void testObjectName() throws Exception
     {
-        when(_store.getName()).thenReturn(TEST_STORE_NAME);
+        when(_replicatedEnvironmentFacadee.getName()).thenReturn(TEST_STORE_NAME);
 
         String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_STORE_NAME);
         assertEquals(expectedObjectName, _mBean.getObjectName().toString());
@@ -91,56 +91,56 @@ public class BDBHAMessageStoreManagerMBe
 
     public void testGroupName() throws Exception
     {
-        when(_store.getGroupName()).thenReturn(TEST_GROUP_NAME);
+        when(_replicatedEnvironmentFacadee.getGroupName()).thenReturn(TEST_GROUP_NAME);
 
         assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME));
     }
 
     public void testNodeName() throws Exception
     {
-        when(_store.getNodeName()).thenReturn(TEST_NODE_NAME);
+        when(_replicatedEnvironmentFacadee.getNodeName()).thenReturn(TEST_NODE_NAME);
 
         assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME));
     }
 
     public void testNodeHostPort() throws Exception
     {
-        when(_store.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT);
+        when(_replicatedEnvironmentFacadee.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT);
 
         assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT));
     }
 
     public void testHelperHostPort() throws Exception
     {
-        when(_store.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT);
+        when(_replicatedEnvironmentFacadee.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT);
 
         assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT));
     }
 
     public void testDurability() throws Exception
     {
-        when(_store.getDurability()).thenReturn(TEST_DURABILITY);
+        when(_replicatedEnvironmentFacadee.getDurability()).thenReturn(TEST_DURABILITY);
 
         assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY));
     }
 
     public void testCoalescingSync() throws Exception
     {
-        when(_store.isCoalescingSync()).thenReturn(true);
+        when(_replicatedEnvironmentFacadee.isCoalescingSync()).thenReturn(true);
 
         assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC));
     }
 
     public void testNodeState() throws Exception
     {
-        when(_store.getNodeState()).thenReturn(TEST_NODE_STATE);
+        when(_replicatedEnvironmentFacadee.getNodeState()).thenReturn(TEST_NODE_STATE);
 
         assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE));
     }
 
     public void testDesignatedPrimaryFlag() throws Exception
     {
-        when(_store.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
+        when(_replicatedEnvironmentFacadee.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
 
         assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY));
     }
@@ -148,29 +148,29 @@ public class BDBHAMessageStoreManagerMBe
     public void testGroupMembersForGroupWithOneNode() throws Exception
     {
         List<Map<String, String>> members = Collections.singletonList(createTestNodeResult());
-        when(_store.getGroupMembers()).thenReturn(members);
+        when(_replicatedEnvironmentFacadee.getGroupMembers()).thenReturn(members);
 
         final TabularData resultsTable = _mBean.getAllNodesInGroup();
 
-        assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT);
+        assertTableHasHeadingsNamed(resultsTable, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT);
 
         final int numberOfDataRows = resultsTable.size();
         assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows);
         final CompositeData row = (CompositeData) resultsTable.values().iterator().next();
-        assertEquals(TEST_NODE_NAME, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME));
-        assertEquals(TEST_NODE_HOST_PORT, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
+        assertEquals(TEST_NODE_NAME, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME));
+        assertEquals(TEST_NODE_HOST_PORT, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT));
     }
 
     public void testRemoveNodeFromReplicationGroup() throws Exception
     {
         _mBean.removeNodeFromGroup(TEST_NODE_NAME);
 
-        verify(_store).removeNodeFromGroup(TEST_NODE_NAME);
+        verify(_replicatedEnvironmentFacadee).removeNodeFromGroup(TEST_NODE_NAME);
     }
 
     public void testRemoveNodeFromReplicationGroupWithError() throws Exception
     {
-        doThrow(new AMQStoreException("mocked exception")).when(_store).removeNodeFromGroup(TEST_NODE_NAME);
+        doThrow(new AMQStoreException("mocked exception")).when(_replicatedEnvironmentFacadee).removeNodeFromGroup(TEST_NODE_NAME);
 
         try
         {
@@ -187,12 +187,12 @@ public class BDBHAMessageStoreManagerMBe
     {
         _mBean.setDesignatedPrimary(true);
 
-        verify(_store).setDesignatedPrimary(true);
+        verify(_replicatedEnvironmentFacadee).setDesignatedPrimary(true);
     }
 
     public void testSetAsDesignatedPrimaryWithError() throws Exception
     {
-        doThrow(new AMQStoreException("mocked exception")).when(_store).setDesignatedPrimary(true);
+        doThrow(new AMQStoreException("mocked exception")).when(_replicatedEnvironmentFacadee).setDesignatedPrimary(true);
 
         try
         {
@@ -212,7 +212,7 @@ public class BDBHAMessageStoreManagerMBe
 
         _mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort);
 
-        verify(_store).updateAddress(TEST_NODE_NAME, newHostName, newPort);
+        verify(_replicatedEnvironmentFacadee).updateAddress(TEST_NODE_NAME, newHostName, newPort);
     }
 
     private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames)
@@ -227,8 +227,8 @@ public class BDBHAMessageStoreManagerMBe
     private Map<String, String> createTestNodeResult()
     {
         Map<String, String> items = new HashMap<String, String>();
-        items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
-        items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
+        items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
+        items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
         return items;
     }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java Tue Dec 10 17:19:47 2013
@@ -20,8 +20,16 @@ package org.apache.qpid.server.store.ber
  *
  */
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.AbstractActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.stats.StatisticsGatherer;
@@ -37,9 +45,14 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
 public class BDBHAVirtualHost extends AbstractVirtualHost
 {
-    private BDBHAMessageStore _messageStore;
+    private static final Logger LOGGER = Logger.getLogger(BDBHAVirtualHost.class);
+
+    private BDBMessageStore _messageStore;
 
     private boolean _inVhostInitiatedClose;
 
@@ -57,7 +70,7 @@ public class BDBHAVirtualHost extends Ab
 
     protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost) throws Exception
     {
-        _messageStore = new BDBHAMessageStore();
+        _messageStore = new BDBMessageStore(ReplicatedEnvironmentFacade.TYPE, new ReplicatedEnvironmentFacadeFactory());
 
         final MessageStoreLogSubject storeLogSubject =
                 new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
@@ -85,6 +98,8 @@ public class BDBHAVirtualHost extends Ab
                 virtualHost, recoveryHandler,
                 recoveryHandler
         );
+
+        ((ReplicatedEnvironmentFacade)_messageStore.getEnvironmentFacade()).setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
     }
 
 
@@ -202,4 +217,140 @@ public class BDBHAVirtualHost extends Ab
         }
     }
 
+    private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
+    {
+        // TODO shutdown the executor
+        private final Executor _executor = Executors.newSingleThreadExecutor();
+
+        @Override
+        public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+        {
+            com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState();
+
+            if (LOGGER.isInfoEnabled())
+            {
+                LOGGER.info("Received BDB event indicating transition to state " + state);
+            }
+
+            switch (state)
+            {
+            case MASTER:
+                activateStoreAsync();
+                break;
+            case REPLICA:
+                passivateStoreAsync();
+                break;
+            case DETACHED:
+                LOGGER.error("BDB replicated node in detached state, therefore passivating.");
+                passivateStoreAsync();
+                break;
+            case UNKNOWN:
+                LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)");
+                break;
+            default:
+                LOGGER.error("Unexpected state change: " + state);
+                throw new IllegalStateException("Unexpected state change: " + state);
+            }
+        }
+
+        /**
+         * Calls {@link MessageStore#activate()}.
+         *
+         * <p/>
+         *
+         * This is done a background thread, in line with
+         * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because
+         * activate may execute transactions, which can't complete until
+         * {@link StateChangeListener#stateChange(StateChangeEvent)} has returned.
+         */
+        private void activateStoreAsync()
+        {
+            String threadName = "BDBHANodeActivationThread-" + getName();
+            executeStateChangeAsync(new Callable<Void>()
+            {
+                @Override
+                public Void call() throws Exception
+                {
+                    try
+                    {
+                        _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
+                        _messageStore.activate();
+                    }
+                    catch (Exception e)
+                    {
+                        LOGGER.error("Failed to activate on hearing MASTER change event", e);
+                        throw e;
+                    }
+                    return null;
+                }
+            }, threadName);
+        }
+
+        private void passivateStoreAsync()
+        {
+            String threadName = "BDBHANodePassivationThread-" + getName();
+            executeStateChangeAsync(new Callable<Void>()
+            {
+
+                @Override
+                public Void call() throws Exception
+                {
+                    try
+                    {
+                        if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED))
+                        {
+                            LOGGER.debug("Store becoming passive");
+                            _messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
+                        }
+                    }
+                    catch (Exception e)
+                    {
+                        LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event", e);
+                        throw e;
+                    }
+                    return null;
+                }
+            }, threadName);
+        }
+
+        private void executeStateChangeAsync(final Callable<Void> callable, final String threadName)
+        {
+            final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger();
+
+            _executor.execute(new Runnable()
+            {
+
+                @Override
+                public void run()
+                {
+                    final String originalThreadName = Thread.currentThread().getName();
+                    Thread.currentThread().setName(threadName);
+                    try
+                    {
+                        CurrentActor.set(new AbstractActor(_rootLogger)
+                        {
+                            @Override
+                            public String getLogMessage()
+                            {
+                                return threadName;
+                            }
+                        });
+
+                        try
+                        {
+                            callable.call();
+                        }
+                        catch (Exception e)
+                        {
+                            LOGGER.error("Exception during state change", e);
+                        }
+                    }
+                    finally
+                    {
+                        Thread.currentThread().setName(originalThreadName);
+                    }
+                }
+            });
+        }
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org