You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/04/30 15:08:49 UTC

svn commit: r1591281 [1/2] - in /qpid/trunk/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/ bdbstore/src/test/java/org/apache/qpid/server/s...

Author: orudyy
Date: Wed Apr 30 13:08:49 2014
New Revision: 1591281

URL: http://svn.apache.org/r1591281
Log:
QPID-5715,QPID-5412: Add remote replication nodes

Added:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
    qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java
      - copied, changed from r1591247, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
Modified:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
    qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1591281&r1=1591280&r2=1591281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Wed Apr 30 13:08:49 2014
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -67,6 +68,7 @@ import com.sleepycat.je.rep.NodeState;
 import com.sleepycat.je.rep.RepInternal;
 import com.sleepycat.je.rep.ReplicatedEnvironment;
 import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationGroup;
 import com.sleepycat.je.rep.ReplicationMutableConfig;
 import com.sleepycat.je.rep.ReplicationNode;
 import com.sleepycat.je.rep.RestartRequiredException;
@@ -147,6 +149,8 @@ public class ReplicatedEnvironmentFacade
     private final ScheduledExecutorService _groupChangeExecutor;
     private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING);
     private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>();
+    private final ConcurrentMap<String, ReplicationNode> _remoteReplicationNodes = new ConcurrentHashMap<String, ReplicationNode>();
+    private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
     private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
     private final AtomicBoolean _initialised;
     private final EnvironmentFacadeTask[] _initialisationTasks;
@@ -178,10 +182,11 @@ public class ReplicatedEnvironmentFacade
         // we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread
         _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
         _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
-        _groupChangeExecutor.schedule(new RemoteNodeStateLearner(), 100, TimeUnit.MILLISECONDS);  // TODO make configurable
 
         // create environment in a separate thread to avoid renaming of the current thread by JE
         _environment = createEnvironment(true);
+        populateExistingRemoteReplicationNodes();
+        _groupChangeExecutor.submit(new RemoteNodeStateLearner());
     }
 
     @Override
@@ -805,6 +810,19 @@ public class ReplicatedEnvironmentFacade
                         LOGGER.warn("Ignoring an exception whilst closing databases", e);
                     }
                 }
+                else
+                {
+                    // reset database holders for invalid environments
+                    for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
+                    {
+                        DatabaseHolder databaseHolder = entry.getValue();
+                        Database database = databaseHolder.getDatabase();
+                        if (database != null)
+                        {
+                            databaseHolder.setDatabase(null);
+                        }
+                    }
+                }
                 environment.close();
             }
             catch (EnvironmentFailureException efe)
@@ -1004,7 +1022,7 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    public NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException
+    NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException
     {
         if (repNode == null)
         {
@@ -1023,84 +1041,229 @@ public class ReplicatedEnvironmentFacade
         return _environment.getGroup().getElectableNodes().size();
     }
 
+    public void setReplicationGroupListener(ReplicationGroupListener replicationGroupListener)
+    {
+        if (_replicationGroupListener.compareAndSet(null, replicationGroupListener))
+        {
+            notifyExistingRemoteReplicationNodes(replicationGroupListener);
+        }
+        else
+        {
+            throw new IllegalStateException("ReplicationGroupListener is already set on " + _prettyGroupNodeName);
+        }
+    }
+
+    private void populateExistingRemoteReplicationNodes()
+    {
+        ReplicationGroup group = _environment.getGroup();
+        Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes());
+        String localNodeName = getNodeName();
+
+        for (ReplicationNode replicationNode : nodes)
+        {
+            String discoveredNodeName = replicationNode.getName();
+            if (!discoveredNodeName.equals(localNodeName))
+            {
+               _remoteReplicationNodes.put(replicationNode.getName(), replicationNode);
+            }
+        }
+     }
+
+    private void notifyExistingRemoteReplicationNodes(ReplicationGroupListener listener)
+    {
+        for (ReplicationNode value : _remoteReplicationNodes.values())
+        {
+            listener.onReplicationNodeRecovered(value);
+        }
+    }
+
     private class RemoteNodeStateLearner implements Callable<Void>
     {
         private Map<String, ReplicatedEnvironment.State> _previousGroupState = Collections.emptyMap();
+
         @Override
         public Void call()
         {
-            final Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<String, ReplicatedEnvironment.State>();
             try
             {
-                Set<Future<Void>> futures = new HashSet<Future<Void>>();
+                if (_state.get() == State.OPEN)
+                {
+                    try
+                    {
+                        detectGroupChangesAndNotify();
+                    }
+                    catch(DatabaseException e)
+                    {
+                        handleDatabaseException("Exception on replication group check", e);
+                    }
+
+                    Map<ReplicationNode, NodeState> nodeStates = discoverNodeStates(_remoteReplicationNodes.values());
+
+                    executeDabasePingerOnNodeChangesIfMaster(nodeStates);
 
-                for (final ReplicationNode node : _environment.getGroup().getElectableNodes())
+                    notifyGroupListenerAboutNodeStates(nodeStates);
+                }
+
+            }
+            finally
+            {
+                State state = _state.get();
+                if (state != State.CLOSED && state != State.CLOSING)
                 {
-                    Future<Void> future = _groupChangeExecutor.submit(new Callable<Void>()
+                    _groupChangeExecutor.schedule(this, REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+                }
+            }
+            return null;
+        }
+
+        private void detectGroupChangesAndNotify()
+        {
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Checking for changes in the group " + _configuration.getGroupName() + " on node " + _configuration.getName());
+            }
+
+            String groupName = _configuration.getGroupName();
+            ReplicatedEnvironment env = _environment;
+            ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get();
+            if (env != null)
+            {
+                ReplicationGroup group = env.getGroup();
+                Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes());
+                String localNodeName = getNodeName();
+
+                Map<String, ReplicationNode> removalMap = new HashMap<String, ReplicationNode>(_remoteReplicationNodes);
+                for (ReplicationNode replicationNode : nodes)
+                {
+                    String discoveredNodeName = replicationNode.getName();
+                    if (!discoveredNodeName.equals(localNodeName))
                     {
-                        @Override
-                        public Void call()
+                        if (!_remoteReplicationNodes.containsKey(discoveredNodeName))
                         {
-                            DbPing ping = new DbPing(node, _configuration.getGroupName(), REMOTE_NODE_MONITOR_INTERVAL);
-                            ReplicatedEnvironment.State nodeState;
-                            try
-                            {
-                                nodeState = ping.getNodeState().getNodeState();
-                            }
-                            catch (IOException e)
+                            if (LOGGER.isDebugEnabled())
                             {
-                                nodeState = ReplicatedEnvironment.State.UNKNOWN;
+                                LOGGER.debug("Remote replication node added '" + replicationNode + "' to '" + groupName + "'");
                             }
-                            catch (ServiceConnectFailedException e)
+
+                            _remoteReplicationNodes.put(discoveredNodeName, replicationNode);
+
+                            if (replicationGroupListener != null)
                             {
-                                nodeState = ReplicatedEnvironment.State.UNKNOWN;
+                                replicationGroupListener.onReplicationNodeAddedToGroup(replicationNode);
                             }
-
-                            currentGroupState.put(node.getName(), nodeState);
-                            return null;
                         }
-                    });
-                    futures.add(future);
+                        else
+                        {
+                            removalMap.remove(discoveredNodeName);
+                        }
+                    }
                 }
 
-                for (Future<Void> future : futures)
+                if (!removalMap.isEmpty())
                 {
-                    try
+                    for (Map.Entry<String, ReplicationNode> replicationNodeEntry : removalMap.entrySet())
                     {
-                        future.get(REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
-                    }
-                    catch (InterruptedException e)
-                    {
-                        Thread.currentThread().interrupt();
-                    }
-                    catch (ExecutionException e)
-                    {
-                        LOGGER.warn("Cannot update node state for group " + _configuration.getGroupName(), e.getCause());
-                    }
-                    catch (TimeoutException e)
-                    {
-                        LOGGER.warn("Timeout whilst updating node state for group " + _configuration.getGroupName());
-                        future.cancel(true);
+                        String replicationNodeName = replicationNodeEntry.getKey();
+                        if (LOGGER.isDebugEnabled())
+                        {
+                            LOGGER.debug("Remote replication node removed '" + replicationNodeName + "' from '" + groupName + "'");
+                        }
+                        _remoteReplicationNodes.remove(replicationNodeName);
+                        if (replicationGroupListener != null)
+                        {
+                            replicationGroupListener.onReplicationNodeRemovedFromGroup(replicationNodeEntry.getValue());
+                        }
                     }
                 }
+            }
+        }
+
+        private Map<ReplicationNode, NodeState> discoverNodeStates(Collection<ReplicationNode> electableNodes)
+        {
+            final Map<ReplicationNode, NodeState> nodeStates = new HashMap<ReplicationNode, NodeState>();
+            Set<Future<Void>> futures = new HashSet<Future<Void>>();
 
-                if (ReplicatedEnvironment.State.MASTER == _environment.getState())
+            for (final ReplicationNode node : electableNodes)
+            {
+                Future<Void> future = _groupChangeExecutor.submit(new Callable<Void>()
                 {
-                    boolean stateChanged = !_previousGroupState.equals(currentGroupState);
-                    _previousGroupState = currentGroupState;
-                    if (stateChanged && State.OPEN == _state.get())
+                    @Override
+                    public Void call()
                     {
-                        new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this);
+                        NodeState nodeStateObject = null;
+                        try
+                        {
+                            nodeStateObject = getRemoteNodeState(node);
+                        }
+                        catch (IOException | ServiceConnectFailedException e )
+                        {
+                            // Cannot discover node states. The node state should be treated as UNKNOWN
+                        }
+
+                        nodeStates.put(node, nodeStateObject);
+                        return null;
                     }
+                });
+                futures.add(future);
+            }
+
+            for (Future<Void> future : futures)
+            {
+                try
+                {
+                    future.get(REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+                }
+                catch (InterruptedException e)
+                {
+                    Thread.currentThread().interrupt();
+                }
+                catch (ExecutionException e)
+                {
+                    LOGGER.warn("Cannot update node state for group " + _configuration.getGroupName(), e.getCause());
+                }
+                catch (TimeoutException e)
+                {
+                    LOGGER.warn("Timeout whilst updating node state for group " + _configuration.getGroupName());
+                    future.cancel(true);
                 }
             }
-            finally
+            return nodeStates;
+        }
+
+        private void executeDabasePingerOnNodeChangesIfMaster(final Map<ReplicationNode, NodeState> nodeStates)
+        {
+            if (ReplicatedEnvironment.State.MASTER == _environment.getState())
             {
-                _groupChangeExecutor.schedule(this, REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+                Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<String, ReplicatedEnvironment.State>();
+                for (Map.Entry<ReplicationNode, NodeState> entry : nodeStates.entrySet())
+                {
+                    ReplicationNode node = entry.getKey();
+                    NodeState nodeState = entry.getValue();
+                    ReplicatedEnvironment.State state = nodeState == null? ReplicatedEnvironment.State.UNKNOWN : nodeState.getNodeState();
+                    currentGroupState.put(node.getName(), state);
+                }
+                boolean stateChanged = !_previousGroupState.equals(currentGroupState);
+                _previousGroupState = currentGroupState;
+                if (stateChanged && State.OPEN == _state.get())
+                {
+                    new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this);
+                }
+            }
+        }
+
+        private void notifyGroupListenerAboutNodeStates(final Map<ReplicationNode, NodeState> nodeStates)
+        {
+            ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get();
+            if (replicationGroupListener != null)
+            {
+                for (Map.Entry<ReplicationNode, NodeState> entry : nodeStates.entrySet())
+                {
+                    replicationGroupListener.onNodeState(entry.getKey(), entry.getValue());
+                }
             }
-            return null;
         }
     }
+
     public static enum State
     {
         OPENING,

Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java?rev=1591281&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java Wed Apr 30 13:08:49 2014
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.ReplicationNode;
+
+public interface ReplicationGroupListener
+{
+    /**
+     * Fired when a remote replication node is added to a group.  This event happens
+     * exactly once just after a new replication node is created.
+     */
+    void onReplicationNodeAddedToGroup(ReplicationNode node);
+
+    /**
+     * Fired exactly once for each existing remote node.  Used to inform the application
+     * on any existing nodes as it starts up for the first time.
+     */
+    void onReplicationNodeRecovered(ReplicationNode node);
+
+    /**
+     * Fired when a remote replication node is (permanently) removed from group.  This event
+     * happens exactly once just after the existing replication node is deleted.
+     */
+    void onReplicationNodeRemovedFromGroup(ReplicationNode node);
+
+    /**
+     * Invoked to notify listener on node state update
+     */
+    void onNodeState(ReplicationNode node, NodeState nodeState);
+
+}

Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java?rev=1591281&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java Wed Apr 30 13:08:49 2014
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode.berkeleydb;
+
+import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.RemoteReplicationNode;
+
+@ManagedObject(category=false, managesChildren=false, creatable=false)
+public interface BDBHARemoteReplicationNode<X extends BDBHARemoteReplicationNode<X>> extends RemoteReplicationNode<X>
+{
+    String GROUP_NAME = "groupName";
+    String ADDRESS = "address";
+    String ROLE = "role";
+    String LAST_KNOWN_REPLICATION_TRANSACTION_ID = "lastKnownReplicationTransactionId";
+    String JOIN_TIME = "joinTime";
+
+    @ManagedAttribute(derived = true)
+    String getGroupName();
+
+    @ManagedAttribute(derived = true)
+    String getAddress();
+
+    @ManagedAttribute(automate = true)
+    String getRole();
+
+    @ManagedAttribute(derived = true)
+    long getJoinTime();
+
+    @ManagedAttribute(derived = true)
+    long getLastKnownReplicationTransactionId();
+
+}

Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java?rev=1591281&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java Wed Apr 30 13:08:49 2014
@@ -0,0 +1,201 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.virtualhostnode.berkeleydb;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+
+import com.sleepycat.je.rep.MasterStateException;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+
+public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDBHARemoteReplicationNodeImpl> implements BDBHARemoteReplicationNode<BDBHARemoteReplicationNodeImpl>
+{
+    private static final Logger LOGGER = Logger.getLogger(BDBHARemoteReplicationNodeImpl.class);
+
+    private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
+    private final String _address;
+
+    private volatile long _joinTime;
+    private volatile long _lastTransactionId;
+
+    @ManagedAttributeField(afterSet="afterSetRole")
+    private volatile String _role;
+
+    private final AtomicReference<State> _state;
+
+    public BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl virtualHostNode, Map<String, Object> attributes, ReplicatedEnvironmentFacade replicatedEnvironmentFacade)
+    {
+        super(parentsMap(virtualHostNode), attributes);
+        _address = (String)attributes.get(ADDRESS);
+        _replicatedEnvironmentFacade = replicatedEnvironmentFacade;
+        _state = new AtomicReference<State>(State.ACTIVE);
+    }
+
+    @Override
+    public State getState()
+    {
+        return _state.get();
+    }
+
+    @Override
+    public String getGroupName()
+    {
+        return _replicatedEnvironmentFacade.getGroupName();
+    }
+
+    @Override
+    public String getAddress()
+    {
+        return _address;
+    }
+
+    @Override
+    public String getRole()
+    {
+        return _role;
+    }
+
+    @Override
+    public long getJoinTime()
+    {
+        return _joinTime;
+    }
+
+    @Override
+    public long getLastKnownReplicationTransactionId()
+    {
+        return _lastTransactionId;
+    }
+
+    public void delete()
+    {
+        this.deleted();
+    }
+
+    protected void afterSetRole()
+    {
+        try
+        {
+            String nodeName = getName();
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Trying to transfer master to " + nodeName);
+            }
+
+            _replicatedEnvironmentFacade.transferMasterAsynchronously(nodeName);
+
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("The mastership has been transfered to " + nodeName);
+            }
+        }
+        catch(Exception e)
+        {
+            throw new IllegalConfigurationException("Cannot transfer mastership to " + getName(), e);
+        }
+    }
+
+    @Override
+    protected boolean setState(State currentState, State desiredState)
+    {
+        if (desiredState == State.DELETED)
+        {
+            String nodeName = getName();
+
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Deleting node '"  + nodeName + "' from group '" + getGroupName() + "'");
+            }
+
+            try
+            {
+                _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName);
+                _state.set(State.DELETED);
+                delete();
+                return true;
+            }
+            catch(MasterStateException e)
+            {
+                throw new IllegalStateTransitionException("Node '" + nodeName + "' cannot be deleted when role is a master");
+            }
+            catch (Exception e)
+            {
+                throw new IllegalStateTransitionException("Unexpected exception on node '" + nodeName + "' deletion", e);
+            }
+        }
+        return false;
+    }
+
+    @Override
+    protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
+    {
+        super.validateChange(proxyForValidation, changedAttributes);
+        if (changedAttributes.contains(ROLE))
+        {
+            String currentRole = getRole();
+            if (!ReplicatedEnvironment.State.REPLICA.name().equals(currentRole))
+            {
+                throw new IllegalArgumentException("Cannot transfer mastership when not a replica");
+            }
+            if (!ReplicatedEnvironment.State.MASTER.name().equals(((BDBHARemoteReplicationNode<?>)proxyForValidation).getRole()))
+            {
+                throw new IllegalArgumentException("Changing role to other value then " + ReplicatedEnvironment.State.MASTER.name() + " is unsupported");
+            }
+        }
+
+        if (changedAttributes.contains(JOIN_TIME))
+        {
+            throw new IllegalArgumentException("Cannot change derived attribute " + JOIN_TIME);
+        }
+
+        if (changedAttributes.contains(LAST_KNOWN_REPLICATION_TRANSACTION_ID))
+        {
+            throw new IllegalArgumentException("Cannot change derived attribute " + LAST_KNOWN_REPLICATION_TRANSACTION_ID);
+        }
+    }
+
+    void setRole(String role)
+    {
+        _role = role;
+    }
+
+    void setJoinTime(long joinTime)
+    {
+        _joinTime = joinTime;
+    }
+
+    void setLastTransactionId(long lastTransactionId)
+    {
+        _lastTransactionId = lastTransactionId;
+    }
+
+}

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1591281&r1=1591280&r2=1591281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java Wed Apr 30 13:08:49 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.virtualhostnode.berkeleydb;
 
 import java.security.PrivilegedAction;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -30,12 +31,14 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.security.auth.Subject;
 
+import com.sleepycat.je.rep.NodeState;
 import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationNode;
 import com.sleepycat.je.rep.StateChangeEvent;
 import com.sleepycat.je.rep.StateChangeListener;
-import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.log4j.Logger;
 import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.BrokerModel;
@@ -43,6 +46,7 @@ import org.apache.qpid.server.model.Conf
 import org.apache.qpid.server.model.ManagedAttributeField;
 import org.apache.qpid.server.model.ManagedObject;
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
+import org.apache.qpid.server.model.RemoteReplicationNode;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
@@ -54,12 +58,14 @@ import org.apache.qpid.server.store.berk
 import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
 import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
 import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicationGroupListener;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostState;
 import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
 
 @ManagedObject( category = false, type = "BDB_HA" )
-public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtualHostNodeImpl> implements BDBHAVirtualHostNode<BDBHAVirtualHostNodeImpl>
+public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtualHostNodeImpl> implements
+        BDBHAVirtualHostNode<BDBHAVirtualHostNodeImpl>
 {
     /**
      * Length of time we synchronously await the a JE mutation to complete.  It is not considered an error if we exceed this timeout, although a
@@ -183,7 +189,7 @@ public class BDBHAVirtualHostNodeImpl ex
     @Override
     public String getRole()
     {
-        ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+        ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
         if (environmentFacade != null)
         {
             return environmentFacade.getNodeState();
@@ -194,7 +200,7 @@ public class BDBHAVirtualHostNodeImpl ex
     @Override
     public Long getLastKnownReplicationTransactionId()
     {
-        ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+        ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
         if (environmentFacade != null)
         {
             return environmentFacade.getLastKnownReplicationTransactionId();
@@ -205,7 +211,7 @@ public class BDBHAVirtualHostNodeImpl ex
     @Override
     public Long getJoinTime()
     {
-        ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+        ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
         if (environmentFacade != null)
         {
             return environmentFacade.getJoinTime();
@@ -219,6 +225,14 @@ public class BDBHAVirtualHostNodeImpl ex
         return _replicatedEnvironmentConfiguration;
     }
 
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Collection<? extends RemoteReplicationNode> getRemoteReplicationNodes()
+    {
+        Collection<RemoteReplicationNode> remoteNodes = getChildren(RemoteReplicationNode.class);
+        return (Collection<? extends RemoteReplicationNode>)remoteNodes;
+    }
+
     @Override
     public String toString()
     {
@@ -255,6 +269,12 @@ public class BDBHAVirtualHostNodeImpl ex
         return (BDBMessageStore) super.getConfigurationStore();
     }
 
+    protected ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade()
+    {
+        return _environmentFacade.get();
+    }
+
+    @Override
     protected DurableConfigurationStore createConfigurationStore()
     {
         return new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory());
@@ -276,8 +296,11 @@ public class BDBHAVirtualHostNodeImpl ex
         getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.STORE_LOCATION(getStorePath()));
 
         ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) getConfigurationStore().getEnvironmentFacade();
-        environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
-        _environmentFacade.set(environmentFacade);
+        if (_environmentFacade.compareAndSet(null, environmentFacade))
+        {
+            environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
+            environmentFacade.setReplicationGroupListener(new RemoteNodesDiscoverer());
+        }
     }
 
     @Override
@@ -289,7 +312,7 @@ public class BDBHAVirtualHostNodeImpl ex
         }
         finally
         {
-            ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+            ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
             if (_environmentFacade.compareAndSet(environmentFacade, null))
             {
                 environmentFacade.close();
@@ -421,7 +444,7 @@ public class BDBHAVirtualHostNodeImpl ex
     @SuppressWarnings("unused")
     private void postSetPriority()
     {
-        ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+        ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
         if (environmentFacade != null)
         {
             try
@@ -451,7 +474,7 @@ public class BDBHAVirtualHostNodeImpl ex
     @SuppressWarnings("unused")
     private void postSetDesignatedPrimary()
     {
-        ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+        ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
         if (environmentFacade != null)
         {
             try
@@ -481,7 +504,7 @@ public class BDBHAVirtualHostNodeImpl ex
     @SuppressWarnings("unused")
     private void postSetQuorumOverride()
     {
-        ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+        ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
         if (environmentFacade != null)
         {
             try
@@ -511,7 +534,7 @@ public class BDBHAVirtualHostNodeImpl ex
     @SuppressWarnings("unused")
     private void preSetRole()
     {
-        ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+        ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
         if (environmentFacade != null)
         {
             String currentRole = environmentFacade.getNodeState();
@@ -531,7 +554,7 @@ public class BDBHAVirtualHostNodeImpl ex
     @SuppressWarnings("unused")
     private void postSetRole()
     {
-        ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+        ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
         if (environmentFacade != null)
         {
             try
@@ -561,6 +584,63 @@ public class BDBHAVirtualHostNodeImpl ex
         }
     }
 
+    private class RemoteNodesDiscoverer implements ReplicationGroupListener
+    {
+        @Override
+        public void onReplicationNodeAddedToGroup(ReplicationNode node)
+        {
+            BDBHARemoteReplicationNodeImpl remoteNode = new BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl.this, nodeToAttributes(node), getReplicatedEnvironmentFacade());
+            remoteNode.create();
+            childAdded(remoteNode);
+        }
+
+        @Override
+        public void onReplicationNodeRecovered(ReplicationNode node)
+        {
+            BDBHARemoteReplicationNodeImpl remoteNode = new BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl.this, nodeToAttributes(node), getReplicatedEnvironmentFacade());
+            remoteNode.open();
+        }
+
+        @Override
+        public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+        {
+            BDBHARemoteReplicationNodeImpl remoteNode = getChildByName(BDBHARemoteReplicationNodeImpl.class, node.getName());
+            if (remoteNode != null)
+            {
+                remoteNode.delete();
+                childRemoved(remoteNode);
+            }
+        }
+
+        @Override
+        public void onNodeState(ReplicationNode node, NodeState nodeState)
+        {
+            BDBHARemoteReplicationNodeImpl remoteNode = getChildByName(BDBHARemoteReplicationNodeImpl.class, node.getName());
+            if (remoteNode != null)
+            {
+                if (nodeState == null)
+                {
+                    remoteNode.setRole(ReplicatedEnvironment.State.UNKNOWN.name());
+                }
+                else
+                {
+                    remoteNode.setRole(nodeState.getNodeState().name());
+                    remoteNode.setJoinTime(nodeState.getJoinTime());
+                    remoteNode.setLastTransactionId(nodeState.getKnownMasterTxnEndVLSN());
+                }
+            }
+        }
+
+        private Map<String, Object> nodeToAttributes(ReplicationNode replicationNode)
+        {
+            Map<String, Object> attributes = new HashMap<String, Object>();
+            attributes.put(ConfiguredObject.NAME, replicationNode.getName());
+            attributes.put(ConfiguredObject.DURABLE, false);
+            attributes.put(BDBHARemoteReplicationNode.ADDRESS, replicationNode.getHostName() + ":" + replicationNode.getPort());
+            return attributes;
+        }
+    }
+
     private class ReplicaVirtualHost extends BDBHAVirtualHost
     {
         ReplicaVirtualHost(Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)

Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java?rev=1591281&r1=1591280&r2=1591281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java Wed Apr 30 13:08:49 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.ber
 import static org.mockito.Mockito.when;
 
 import java.io.File;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -39,11 +40,13 @@ import org.apache.qpid.server.model.Brok
 import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.RemoteReplicationNode;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.util.FileUtils;
@@ -159,8 +162,11 @@ public class BDBHAVirtualHostNodeTest ex
             @Override
             public void childAdded(ConfiguredObject object, ConfiguredObject child)
             {
-                child.addChangeListener(this);
-                virtualHostAddedLatch.countDown();
+                if (child instanceof VirtualHost)
+                {
+                    child.addChangeListener(this);
+                    virtualHostAddedLatch.countDown();
+                }
             }
 
             @Override
@@ -314,7 +320,100 @@ public class BDBHAVirtualHostNodeTest ex
         while(!"MASTER".equals(replica.getRole()))
         {
             Thread.sleep(100);
-            if (awaitMastershipCount > 20)
+            if (awaitMastershipCount > 50)
+            {
+                fail("Replica did not assume master role");
+            }
+            awaitMastershipCount++;
+        }
+    }
+
+
+    public void testTransferMasterToReplica() throws Exception
+    {
+        int node1PortNumber = findFreePort();
+        String helperAddress = "localhost:" + node1PortNumber;
+        String groupName = "group";
+
+        Map<String, Object> node1Attributes = new HashMap<String, Object>();
+        node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+        node1Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+        node1Attributes.put(BDBHAVirtualHostNode.NAME, "node1");
+        node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+        node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress);
+        node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+        node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1");
+
+        BDBHAVirtualHostNode<?> node1 = createHaVHN(node1Attributes);
+        assertEquals("Failed to activate node", State.ACTIVE, node1.setDesiredState(node1.getState(), State.ACTIVE));
+
+        final CountDownLatch remoteNodeLatch = new CountDownLatch(2);
+        node1.addChangeListener(new ConfigurationChangeListener()
+        {
+            @Override
+            public void stateChanged(ConfiguredObject object, State oldState, State newState)
+            {
+            }
+
+            @Override
+            public void childRemoved(ConfiguredObject object, ConfiguredObject child)
+            {
+            }
+
+            @Override
+            public void childAdded(ConfiguredObject object, ConfiguredObject child)
+            {
+                if (child instanceof RemoteReplicationNode)
+                {
+                    remoteNodeLatch.countDown();
+                }
+            }
+
+            @Override
+            public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue,
+                    Object newAttributeValue)
+            {
+            }
+        });
+
+        int node2PortNumber = getNextAvailable(node1PortNumber+1);
+
+        Map<String, Object> node2Attributes = new HashMap<String, Object>();
+        node2Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+        node2Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+        node2Attributes.put(BDBHAVirtualHostNode.NAME, "node2");
+        node2Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+        node2Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node2PortNumber);
+        node2Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+        node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2");
+
+        BDBHAVirtualHostNode<?> node2 = createHaVHN(node2Attributes);
+        assertEquals("Failed to activate node2", State.ACTIVE, node2.setDesiredState(node2.getState(), State.ACTIVE));
+
+        int node3PortNumber = getNextAvailable(node2PortNumber+1);
+        Map<String, Object> node3Attributes = new HashMap<String, Object>();
+        node3Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+        node3Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+        node3Attributes.put(BDBHAVirtualHostNode.NAME, "node3");
+        node3Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+        node3Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node3PortNumber);
+        node3Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+        node3Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "3");
+        BDBHAVirtualHostNode<?> node3 = createHaVHN(node3Attributes);
+        assertEquals("Failed to activate node3", State.ACTIVE, node3.setDesiredState(node3.getState(), State.ACTIVE));
+
+        assertTrue("Replication nodes have not been seen during 5s", remoteNodeLatch.await(5, TimeUnit.SECONDS));
+
+        Collection<? extends RemoteReplicationNode> remoteNodes = node1.getRemoteReplicationNodes();
+        RemoteReplicationNode replicaRemoteNode = remoteNodes.iterator().next();
+        replicaRemoteNode.setAttribute(BDBHARemoteReplicationNode.ROLE, "REPLICA", "MASTER");
+
+        BDBHAVirtualHostNode<?> replica = replicaRemoteNode.getName().equals(node2.getName())? node2 : node3;
+        int awaitMastershipCount = 0;
+        while(!"MASTER".equals(replica.getRole()))
+        {
+            Thread.sleep(100);
+            if (awaitMastershipCount > 50)
             {
                 fail("Replica did not assume master role");
             }

Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1591281&r1=1591280&r2=1591281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java Wed Apr 30 13:08:49 2014
@@ -24,12 +24,14 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -40,15 +42,17 @@ import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
 import com.sleepycat.je.Durability;
 import com.sleepycat.je.Environment;
+import com.sleepycat.je.rep.InsufficientReplicasException;
+import com.sleepycat.je.rep.NodeState;
 import com.sleepycat.je.rep.ReplicatedEnvironment;
 import com.sleepycat.je.rep.ReplicatedEnvironment.State;
 import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationNode;
 import com.sleepycat.je.rep.StateChangeEvent;
 import com.sleepycat.je.rep.StateChangeListener;
 
 public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
 {
-
     private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
     private static final int LISTENER_TIMEOUT = 5;
     private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
@@ -220,6 +224,304 @@ public class ReplicatedEnvironmentFacade
         assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride());
     }
 
+    public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception
+    {
+        ReplicatedEnvironmentFacade master = createMaster();
+        String nodeName2 = TEST_NODE_NAME + "_2";
+        String host = "localhost";
+        int port = getNextAvailable(TEST_NODE_PORT + 1);
+        String node2NodeHostPort = host + ":" + port;
+
+        final AtomicInteger invocationCount = new AtomicInteger();
+        final CountDownLatch nodeRecoveryLatch = new CountDownLatch(1);
+        ReplicationGroupListener listener = new NoopReplicationGroupListener()
+        {
+            @Override
+            public void onReplicationNodeRecovered(ReplicationNode node)
+            {
+                nodeRecoveryLatch.countDown();
+                invocationCount.incrementAndGet();
+            }
+        };
+
+        createReplica(nodeName2, node2NodeHostPort, listener);
+
+        assertEquals("Unexpected number of nodes", 2, master.getNumberOfElectableGroupMembers());
+
+        assertTrue("Listener not fired within timeout", nodeRecoveryLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+        assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
+    }
+
+    public void testReplicationGroupListenerHearsNodeAdded() throws Exception
+    {
+        final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
+        final AtomicInteger invocationCount = new AtomicInteger();
+        ReplicationGroupListener listener = new NoopReplicationGroupListener()
+        {
+            @Override
+            public void onReplicationNodeAddedToGroup(ReplicationNode node)
+            {
+                invocationCount.getAndIncrement();
+                nodeAddedLatch.countDown();
+            }
+        };
+
+        TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
+        assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+        assertEquals("Unexpected number of nodes at start of test", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
+
+        String node2Name = TEST_NODE_NAME + "_2";
+        String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
+        createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
+
+        assertTrue("Listener not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+        assertEquals("Unexpected number of nodes", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
+
+        assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
+    }
+
+    public void testReplicationGroupListenerHearsNodeRemoved() throws Exception
+    {
+        final CountDownLatch nodeDeletedLatch = new CountDownLatch(1);
+        final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
+        final AtomicInteger invocationCount = new AtomicInteger();
+        ReplicationGroupListener listener = new NoopReplicationGroupListener()
+        {
+            @Override
+            public void onReplicationNodeRecovered(ReplicationNode node)
+            {
+                nodeAddedLatch.countDown();
+            }
+
+            @Override
+            public void onReplicationNodeAddedToGroup(ReplicationNode node)
+            {
+                nodeAddedLatch.countDown();
+            }
+
+            @Override
+            public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+            {
+                invocationCount.getAndIncrement();
+                nodeDeletedLatch.countDown();
+            }
+        };
+
+        TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
+        assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+        String node2Name = TEST_NODE_NAME + "_2";
+        String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
+        createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
+
+        assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
+
+        // Need to await the listener hearing the addition of the node to the model.
+        assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+        // Now remove the node and ensure we hear the event
+        replicatedEnvironmentFacade.removeNodeFromGroup(node2Name);
+
+        assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+        assertEquals("Unexpected number of nodes after node removal", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
+
+        assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
+    }
+
+    public void testMasterHearsRemoteNodeRoles() throws Exception
+    {
+        final String node2Name = TEST_NODE_NAME + "_2";
+        final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
+        final AtomicReference<ReplicationNode> nodeRef = new AtomicReference<ReplicationNode>();
+        final CountDownLatch stateLatch = new CountDownLatch(1);
+        final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>();
+        ReplicationGroupListener listener = new NoopReplicationGroupListener()
+        {
+            @Override
+            public void onReplicationNodeAddedToGroup(ReplicationNode node)
+            {
+                nodeRef.set(node);
+                nodeAddedLatch.countDown();
+            }
+
+            @Override
+            public void onNodeState(ReplicationNode node, NodeState nodeState)
+            {
+                if (node2Name.equals(node.getName()))
+                {
+                    stateRef.set(nodeState);
+                    stateLatch.countDown();
+                }
+            }
+        };
+
+        TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
+        assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+        String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
+        createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
+
+        assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
+
+        assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+        ReplicationNode remoteNode = (ReplicationNode)nodeRef.get();
+        assertEquals("Unexpcted node name", node2Name, remoteNode.getName());
+
+        assertTrue("Node state not fired within timeout", stateLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+        assertEquals("Unexpcted node state", State.REPLICA, stateRef.get().getNodeState());
+    }
+
+    public void testRemoveNodeFromGroup() throws Exception
+    {
+        ReplicatedEnvironmentFacade environmentFacade = createMaster();
+
+        String node2Name = TEST_NODE_NAME + "_2";
+        String node2NodeHostPort = "localhost:" + getNextAvailable(TEST_NODE_PORT + 1);
+        ReplicatedEnvironmentFacade ref2 = createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
+
+        assertEquals("Unexpected group members count", 2, environmentFacade.getNumberOfElectableGroupMembers());
+        ref2.close();
+
+        environmentFacade.removeNodeFromGroup(node2Name);
+        assertEquals("Unexpected group members count", 1, environmentFacade.getNumberOfElectableGroupMembers());
+    }
+
+
+    public void testEnvironmentFacadeDetectsRemovalOfRemoteNode() throws Exception
+    {
+        final String replicaName = TEST_NODE_NAME + "_1";
+        final CountDownLatch nodeRemovedLatch = new CountDownLatch(1);
+        final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
+        final AtomicReference<ReplicationNode> addedNodeRef = new AtomicReference<ReplicationNode>();
+        final AtomicReference<ReplicationNode> removedNodeRef = new AtomicReference<ReplicationNode>();
+        final CountDownLatch stateLatch = new CountDownLatch(1);
+        final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>();
+
+        ReplicationGroupListener listener = new NoopReplicationGroupListener()
+        {
+            @Override
+            public void onReplicationNodeAddedToGroup(ReplicationNode node)
+            {
+                if (addedNodeRef.compareAndSet(null, node))
+                {
+                    nodeAddedLatch.countDown();
+                }
+            }
+
+            @Override
+            public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+            {
+                removedNodeRef.set(node);
+                nodeRemovedLatch.countDown();
+            }
+
+            @Override
+            public void onNodeState(ReplicationNode node, NodeState nodeState)
+            {
+                if (replicaName.equals(node.getName()))
+                {
+                    stateRef.set(nodeState);
+                    stateLatch.countDown();
+                }
+            }
+        };
+
+        TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+        final ReplicatedEnvironmentFacade masterEnvironment = addNode(State.MASTER, stateChangeListener, listener);
+        assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+        masterEnvironment.setDesignatedPrimary(true);
+
+        int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+        String node1NodeHostPort = "localhost:" + replica1Port;
+
+        ReplicatedEnvironmentFacade replica = createReplica(replicaName, node1NodeHostPort, new NoopReplicationGroupListener());
+
+        assertTrue("Node should be added", nodeAddedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
+
+        ReplicationNode node = addedNodeRef.get();
+        assertEquals("Unexpected node name", replicaName, node.getName());
+
+        assertTrue("Node state was not heared", stateLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
+        assertEquals("Unexpected node role", State.REPLICA, stateRef.get().getNodeState());
+        assertEquals("Unexpected node name", replicaName, stateRef.get().getNodeName());
+
+        replica.close();
+        masterEnvironment.removeNodeFromGroup(node.getName());
+
+        assertTrue("Node deleting is undetected by the environment facade", nodeRemovedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
+        assertEquals("Unexpected node is deleted", node, removedNodeRef.get());
+    }
+
+    public void testCloseStateTransitions() throws Exception
+    {
+        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster();
+
+        assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState());
+        replicatedEnvironmentFacade.close();
+        assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState());
+    }
+
+    public void testEnvironmentRestartOnInsufficientReplicas() throws Exception
+    {
+
+        ReplicatedEnvironmentFacade master = createMaster();
+
+        int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+        String replica1NodeName = TEST_NODE_NAME + "_1";
+        String replica1NodeHostPort = "localhost:" + replica1Port;
+        ReplicatedEnvironmentFacade replica1 = createReplica(replica1NodeName, replica1NodeHostPort, new NoopReplicationGroupListener());
+
+        int replica2Port = getNextAvailable(replica1Port + 1);
+        String replica2NodeName = TEST_NODE_NAME + "_2";
+        String replica2NodeHostPort = "localhost:" + replica2Port;
+        ReplicatedEnvironmentFacade replica2 = createReplica(replica2NodeName, replica2NodeHostPort, new NoopReplicationGroupListener());
+
+        String databaseName = "test";
+
+        DatabaseConfig dbConfig = createDatabase(master, databaseName);
+
+        // close replicas
+        replica1.close();
+        replica2.close();
+
+        Environment e = master.getEnvironment();
+        master.getOpenDatabase(databaseName);
+        try
+        {
+            master.openDatabases(dbConfig, "test2");
+            fail("Opening of new database without quorum should fail");
+        }
+        catch(InsufficientReplicasException ex)
+        {
+            master.handleDatabaseException(null, ex);
+        }
+
+        EnumSet<State> states = EnumSet.of(State.MASTER, State.REPLICA);
+        replica1 = createReplica(replica1NodeName, replica1NodeHostPort, new TestStateChangeListener(states), new NoopReplicationGroupListener());
+        replica2 = createReplica(replica2NodeName, replica2NodeHostPort, new TestStateChangeListener(states), new NoopReplicationGroupListener());
+
+        // Need to poll to await the remote node updating itself
+        long timeout = System.currentTimeMillis() + 5000;
+        while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout)
+        {
+            Thread.sleep(200);
+        }
+
+        assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(),
+                State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) );
+
+        Environment e2 = master.getEnvironment();
+        assertNotSame("Environment has not been restarted", e2, e);
+    }
+
     public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception
     {
         final CountDownLatch masterLatch = new CountDownLatch(1);
@@ -244,7 +546,7 @@ public class ReplicatedEnvironmentFacade
             }
         };
 
-        addNode(State.MASTER, stateChangeListener);
+        addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
         assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
 
         int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
@@ -252,8 +554,8 @@ public class ReplicatedEnvironmentFacade
         int replica2Port = getNextAvailable(replica1Port + 1);
         String node2NodeHostPort = "localhost:" + replica2Port;
 
-        ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
-        ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort);
+        ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
+        ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort, new NoopReplicationGroupListener());
 
         // close replicas
         replica1.close();
@@ -266,15 +568,6 @@ public class ReplicatedEnvironmentFacade
         assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get());
     }
 
-    public void testCloseStateTransitions() throws Exception
-    {
-        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster();
-
-        assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState());
-        replicatedEnvironmentFacade.close();
-        assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState());
-    }
-
     public void testTransferMasterToSelf() throws Exception
     {
         final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1);
@@ -295,12 +588,12 @@ public class ReplicatedEnvironmentFacade
                 }
             }
         };
-        ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener);
+        ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
         assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
 
         int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
         String node1NodeHostPort = "localhost:" + replica1Port;
-        ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+        ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
         assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
 
         int replica2Port = getNextAvailable(replica1Port + 1);
@@ -323,7 +616,7 @@ public class ReplicatedEnvironmentFacade
                 }
             }
         };
-        ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener);
+        ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener());
         assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
         assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
 
@@ -353,12 +646,12 @@ public class ReplicatedEnvironmentFacade
                 }
             }
         };
-        ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener);
+        ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
         assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
 
         int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
         String node1NodeHostPort = "localhost:" + replica1Port;
-        ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+        ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
         assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
 
         int replica2Port = getNextAvailable(replica1Port + 1);
@@ -382,7 +675,7 @@ public class ReplicatedEnvironmentFacade
             }
         };
         String thirdNodeName = TEST_NODE_NAME + "_2";
-        ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener);
+        ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener());
         assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
         assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
 
@@ -394,34 +687,56 @@ public class ReplicatedEnvironmentFacade
 
     private ReplicatedEnvironmentFacade createMaster() throws Exception
     {
+        return createMaster(new NoopReplicationGroupListener());
+    }
+
+    private ReplicatedEnvironmentFacade createMaster(ReplicationGroupListener replicationGroupListener) throws Exception
+    {
         TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
-        ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener);
+        ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener, replicationGroupListener);
         assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
         return env;
     }
 
-    private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort) throws Exception
+    private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort, ReplicationGroupListener replicationGroupListener) throws Exception
     {
         TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA);
-        ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener);
+        return createReplica(nodeName, nodeHostPort, testStateChangeListener, replicationGroupListener);
+    }
+
+    private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort,
+            TestStateChangeListener testStateChangeListener, ReplicationGroupListener replicationGroupListener)
+            throws InterruptedException
+    {
+        ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, replicationGroupListener);
         boolean awaitForStateChange = testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS);
         assertTrue("Replica " + nodeName + " did not go into desired state; current actual state is " + testStateChangeListener.getCurrentActualState(), awaitForStateChange);
         return replicaEnvironmentFacade;
     }
 
     private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
-            State desiredState, StateChangeListener stateChangeListener)
+            State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
     {
         ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary);
         ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config, null);
         ref.setStateChangeListener(stateChangeListener);
+        ref.setReplicationGroupListener(replicationGroupListener);
         _nodes.put(nodeName, ref);
         return ref;
     }
 
-    private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener)
+    private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
+    {
+        return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener, replicationGroupListener);
+    }
+
+    private DatabaseConfig createDatabase(ReplicatedEnvironmentFacade environmentFacade, String databaseName)
     {
-        return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener);
+        DatabaseConfig dbConfig = new DatabaseConfig();
+        dbConfig.setTransactional(true);
+        dbConfig.setAllowCreate(true);
+        environmentFacade.openDatabases(dbConfig,  databaseName);
+        return dbConfig;
     }
 
     private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary)
@@ -444,4 +759,29 @@ public class ReplicatedEnvironmentFacade
         when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath());
         return node;
     }
+
+    class NoopReplicationGroupListener implements ReplicationGroupListener
+    {
+
+        @Override
+        public void onReplicationNodeAddedToGroup(ReplicationNode node)
+        {
+        }
+
+        @Override
+        public void onReplicationNodeRecovered(ReplicationNode node)
+        {
+        }
+
+        @Override
+        public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+        {
+        }
+
+        @Override
+        public void onNodeState(ReplicationNode node, NodeState nodeState)
+        {
+        }
+
+    }
 }

Added: qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java?rev=1591281&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java (added)
+++ qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java Wed Apr 30 13:08:49 2014
@@ -0,0 +1,176 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.systest.rest.QpidRestTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.util.FileUtils;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+
+public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase
+{
+    private static final String NODE1 = "node1";
+    private static final String NODE2 = "node2";
+    private static final String NODE3 = "node3";
+
+    private int _node1HaPort;
+    private int _node2HaPort;
+    private int _node3HaPort;
+
+    private String _hostName;
+    private File _storeBaseDir;
+    private String _baseNodeRestUrl;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000");
+
+        super.setUp();
+        _hostName = getTestName();
+        _baseNodeRestUrl = "/rest/virtualhostnode/";
+
+        _storeBaseDir = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis());
+
+        _node1HaPort = findFreePort();
+        _node2HaPort = getNextAvailable(_node1HaPort + 1);
+        _node3HaPort = getNextAvailable(_node2HaPort + 1);
+
+    }
+
+    @Override
+    public void tearDown() throws Exception
+    {
+        try
+        {
+            super.tearDown();
+        }
+        finally
+        {
+            if (_storeBaseDir != null)
+            {
+                FileUtils.delete(_storeBaseDir, true);
+            }
+        }
+    }
+
+    @Override
+    protected void customizeConfiguration() throws IOException
+    {
+        super.customizeConfiguration();
+        TestBrokerConfiguration config = getBrokerConfiguration();
+        config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST);
+        config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST);
+    }
+
+    public void testCreate3NodesCluster() throws Exception
+    {
+        createHANode(NODE1, _node1HaPort, _node1HaPort);
+        assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1);
+        createHANode(NODE2, _node2HaPort, _node1HaPort);
+        assertNode(NODE2, _node2HaPort, _node1HaPort, NODE1);
+        createHANode(NODE3, _node3HaPort, _node1HaPort);
+        assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1);
+        assertRemoteNodes(NODE1, NODE2, NODE3);
+    }
+
+    private void createHANode(String nodeName, int nodePort, int helperPort) throws IOException, JsonGenerationException, JsonMappingException
+    {
+        Map<String, Object> nodeData = new HashMap<String, Object>();
+        nodeData.put(BDBHAVirtualHostNode.NAME, nodeName);
+        nodeData.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+        nodeData.put(BDBHAVirtualHostNode.STORE_PATH, _storeBaseDir.getPath() + File.separator + nodeName);
+        nodeData.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName);
+        nodeData.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + nodePort);
+        nodeData.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + helperPort);
+
+        int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName, "PUT", nodeData);
+        assertEquals("Unexpected response code for virtual host node " + nodeName + " creation request", 201, responseCode);
+    }
+
+    private void assertNode(String nodeName, int nodePort, int nodeHelperPort, String masterNode) throws Exception
+    {
+        boolean isMaster = nodeName.equals(masterNode);
+        waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, isMaster? "MASTER" : "REPLICA");
+
+        Map<String, Object> nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + nodeName);
+        assertEquals("Unexpected name", nodeName, nodeData.get(BDBHAVirtualHostNode.NAME));
+        assertEquals("Unexpected type", "BDB_HA", nodeData.get(BDBHAVirtualHostNode.TYPE));
+        assertEquals("Unexpected path", new File(_storeBaseDir, nodeName).getPath(), nodeData.get(BDBHAVirtualHostNode.STORE_PATH));
+        assertEquals("Unexpected address", "localhost:" + nodePort, nodeData.get(BDBHAVirtualHostNode.ADDRESS));
+        assertEquals("Unexpected helper address", "localhost:" + nodeHelperPort, nodeData.get(BDBHAVirtualHostNode.HELPER_ADDRESS));
+        assertEquals("Unexpected group name", _hostName, nodeData.get(BDBHAVirtualHostNode.GROUP_NAME));;
+
+        if (isMaster)
+        {
+            assertEquals("Unexpected role", "MASTER", nodeData.get(BDBHAVirtualHostNode.ROLE));
+            Map<String, Object> hostData = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/" + masterNode + "/" + _hostName);
+            assertEquals("Unexpected host name", _hostName, hostData.get(VirtualHost.NAME));
+        }
+        else
+        {
+            assertEquals("Unexpected role", "REPLICA", nodeData.get(BDBHAVirtualHostNode.ROLE));
+        }
+    }
+
+    private void assertRemoteNodes(String masterNode, String... replicaNodes) throws Exception
+    {
+        List<String> clusterNodes = new ArrayList<String>(Arrays.asList(replicaNodes));
+        clusterNodes.add(masterNode);
+
+        for (String clusterNodeName : clusterNodes)
+        {
+            List<String> remotes = new ArrayList<String>(clusterNodes);
+            remotes.remove(clusterNodeName);
+            for (String remote : remotes)
+            {
+                String remoteUrl = "/rest/replicationnode/" + clusterNodeName + "/" + remote;
+                waitForAttributeChanged(remoteUrl, BDBHARemoteReplicationNode.ROLE, remote.equals(masterNode) ? "MASTER" : "REPLICA");
+            }
+        }
+    }
+
+    private void waitForAttributeChanged(String url, String attributeName, Object newValue) throws Exception
+    {
+        List<Map<String, Object>> nodeAttributes = getRestTestHelper().getJsonAsList(url);
+        long limit = System.currentTimeMillis() + 5000;
+        while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName))))
+        {
+            Thread.sleep(100l);
+            nodeAttributes = getRestTestHelper().getJsonAsList(url);
+        }
+        assertEquals("Unexpected attribute " + attributeName, newValue, nodeAttributes.get(0).get(attributeName));
+    }
+}

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java?rev=1591281&r1=1591280&r2=1591281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java Wed Apr 30 13:08:49 2014
@@ -72,6 +72,7 @@ public final class BrokerModel extends M
         addRelationship(Broker.class, Plugin.class);
 
         addRelationship(VirtualHostNode.class, VirtualHost.class);
+        addRelationship(VirtualHostNode.class, RemoteReplicationNode.class);
 
         addRelationship(VirtualHost.class, Exchange.class);
         addRelationship(VirtualHost.class, Queue.class);

Copied: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java (from r1591247, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java?p2=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java&p1=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java&r1=1591247&r2=1591281&rev=1591281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java Wed Apr 30 13:08:49 2014
@@ -20,17 +20,10 @@
  */
 package org.apache.qpid.server.model;
 
-import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ManagedObject;
 
-@ManagedObject(category=true, managesChildren=false)
-public interface VirtualHostNode<X extends VirtualHostNode<X>> extends ConfiguredObject<X>
+@ManagedObject(category=true, managesChildren=false, creatable=false)
+public interface RemoteReplicationNode<X extends RemoteReplicationNode<X>> extends ConfiguredObject<X>
 {
-    public static final String IS_MESSAGE_STORE_PROVIDER = "messageStoreProvider";
-
-    @ManagedAttribute (automate = true, defaultValue = "false")
-    boolean isMessageStoreProvider();
-
-    VirtualHost<?,?,?> getVirtualHost();
-
-    DurableConfigurationStore getConfigurationStore();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java?rev=1591281&r1=1591280&r2=1591281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java Wed Apr 30 13:08:49 2014
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.model;
 
+import java.util.Collection;
+
 import org.apache.qpid.server.store.DurableConfigurationStore;
 
 @ManagedObject(category=true, managesChildren=false)
@@ -33,4 +35,7 @@ public interface VirtualHostNode<X exten
     VirtualHost<?,?,?> getVirtualHost();
 
     DurableConfigurationStore getConfigurationStore();
+
+    @SuppressWarnings("rawtypes")
+    Collection<? extends RemoteReplicationNode> getRemoteReplicationNodes();
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org