You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/04/30 15:08:49 UTC
svn commit: r1591281 [1/2] - in /qpid/trunk/qpid/java:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/
bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/
bdbstore/src/test/java/org/apache/qpid/server/s...
Author: orudyy
Date: Wed Apr 30 13:08:49 2014
New Revision: 1591281
URL: http://svn.apache.org/r1591281
Log:
QPID-5715,QPID-5412: Add remote replication nodes
Added:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java
- copied, changed from r1591247, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1591281&r1=1591280&r2=1591281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Wed Apr 30 13:08:49 2014
@@ -24,6 +24,7 @@ import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -67,6 +68,7 @@ import com.sleepycat.je.rep.NodeState;
import com.sleepycat.je.rep.RepInternal;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationGroup;
import com.sleepycat.je.rep.ReplicationMutableConfig;
import com.sleepycat.je.rep.ReplicationNode;
import com.sleepycat.je.rep.RestartRequiredException;
@@ -147,6 +149,8 @@ public class ReplicatedEnvironmentFacade
private final ScheduledExecutorService _groupChangeExecutor;
private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING);
private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>();
+ private final ConcurrentMap<String, ReplicationNode> _remoteReplicationNodes = new ConcurrentHashMap<String, ReplicationNode>();
+ private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
private final AtomicBoolean _initialised;
private final EnvironmentFacadeTask[] _initialisationTasks;
@@ -178,10 +182,11 @@ public class ReplicatedEnvironmentFacade
// we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread
_environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
_groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
- _groupChangeExecutor.schedule(new RemoteNodeStateLearner(), 100, TimeUnit.MILLISECONDS); // TODO make configurable
// create environment in a separate thread to avoid renaming of the current thread by JE
_environment = createEnvironment(true);
+ populateExistingRemoteReplicationNodes();
+ _groupChangeExecutor.submit(new RemoteNodeStateLearner());
}
@Override
@@ -805,6 +810,19 @@ public class ReplicatedEnvironmentFacade
LOGGER.warn("Ignoring an exception whilst closing databases", e);
}
}
+ else
+ {
+ // reset database holders for invalid environments
+ for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
+ {
+ DatabaseHolder databaseHolder = entry.getValue();
+ Database database = databaseHolder.getDatabase();
+ if (database != null)
+ {
+ databaseHolder.setDatabase(null);
+ }
+ }
+ }
environment.close();
}
catch (EnvironmentFailureException efe)
@@ -1004,7 +1022,7 @@ public class ReplicatedEnvironmentFacade
}
}
- public NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException
+ NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException
{
if (repNode == null)
{
@@ -1023,84 +1041,229 @@ public class ReplicatedEnvironmentFacade
return _environment.getGroup().getElectableNodes().size();
}
+ public void setReplicationGroupListener(ReplicationGroupListener replicationGroupListener)
+ {
+ if (_replicationGroupListener.compareAndSet(null, replicationGroupListener))
+ {
+ notifyExistingRemoteReplicationNodes(replicationGroupListener);
+ }
+ else
+ {
+ throw new IllegalStateException("ReplicationGroupListener is already set on " + _prettyGroupNodeName);
+ }
+ }
+
+ private void populateExistingRemoteReplicationNodes()
+ {
+ ReplicationGroup group = _environment.getGroup();
+ Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes());
+ String localNodeName = getNodeName();
+
+ for (ReplicationNode replicationNode : nodes)
+ {
+ String discoveredNodeName = replicationNode.getName();
+ if (!discoveredNodeName.equals(localNodeName))
+ {
+ _remoteReplicationNodes.put(replicationNode.getName(), replicationNode);
+ }
+ }
+ }
+
+ private void notifyExistingRemoteReplicationNodes(ReplicationGroupListener listener)
+ {
+ for (ReplicationNode value : _remoteReplicationNodes.values())
+ {
+ listener.onReplicationNodeRecovered(value);
+ }
+ }
+
private class RemoteNodeStateLearner implements Callable<Void>
{
private Map<String, ReplicatedEnvironment.State> _previousGroupState = Collections.emptyMap();
+
@Override
public Void call()
{
- final Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<String, ReplicatedEnvironment.State>();
try
{
- Set<Future<Void>> futures = new HashSet<Future<Void>>();
+ if (_state.get() == State.OPEN)
+ {
+ try
+ {
+ detectGroupChangesAndNotify();
+ }
+ catch(DatabaseException e)
+ {
+ handleDatabaseException("Exception on replication group check", e);
+ }
+
+ Map<ReplicationNode, NodeState> nodeStates = discoverNodeStates(_remoteReplicationNodes.values());
+
+ executeDabasePingerOnNodeChangesIfMaster(nodeStates);
- for (final ReplicationNode node : _environment.getGroup().getElectableNodes())
+ notifyGroupListenerAboutNodeStates(nodeStates);
+ }
+
+ }
+ finally
+ {
+ State state = _state.get();
+ if (state != State.CLOSED && state != State.CLOSING)
{
- Future<Void> future = _groupChangeExecutor.submit(new Callable<Void>()
+ _groupChangeExecutor.schedule(this, REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+ }
+ return null;
+ }
+
+ private void detectGroupChangesAndNotify()
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Checking for changes in the group " + _configuration.getGroupName() + " on node " + _configuration.getName());
+ }
+
+ String groupName = _configuration.getGroupName();
+ ReplicatedEnvironment env = _environment;
+ ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get();
+ if (env != null)
+ {
+ ReplicationGroup group = env.getGroup();
+ Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes());
+ String localNodeName = getNodeName();
+
+ Map<String, ReplicationNode> removalMap = new HashMap<String, ReplicationNode>(_remoteReplicationNodes);
+ for (ReplicationNode replicationNode : nodes)
+ {
+ String discoveredNodeName = replicationNode.getName();
+ if (!discoveredNodeName.equals(localNodeName))
{
- @Override
- public Void call()
+ if (!_remoteReplicationNodes.containsKey(discoveredNodeName))
{
- DbPing ping = new DbPing(node, _configuration.getGroupName(), REMOTE_NODE_MONITOR_INTERVAL);
- ReplicatedEnvironment.State nodeState;
- try
- {
- nodeState = ping.getNodeState().getNodeState();
- }
- catch (IOException e)
+ if (LOGGER.isDebugEnabled())
{
- nodeState = ReplicatedEnvironment.State.UNKNOWN;
+ LOGGER.debug("Remote replication node added '" + replicationNode + "' to '" + groupName + "'");
}
- catch (ServiceConnectFailedException e)
+
+ _remoteReplicationNodes.put(discoveredNodeName, replicationNode);
+
+ if (replicationGroupListener != null)
{
- nodeState = ReplicatedEnvironment.State.UNKNOWN;
+ replicationGroupListener.onReplicationNodeAddedToGroup(replicationNode);
}
-
- currentGroupState.put(node.getName(), nodeState);
- return null;
}
- });
- futures.add(future);
+ else
+ {
+ removalMap.remove(discoveredNodeName);
+ }
+ }
}
- for (Future<Void> future : futures)
+ if (!removalMap.isEmpty())
{
- try
+ for (Map.Entry<String, ReplicationNode> replicationNodeEntry : removalMap.entrySet())
{
- future.get(REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- catch (ExecutionException e)
- {
- LOGGER.warn("Cannot update node state for group " + _configuration.getGroupName(), e.getCause());
- }
- catch (TimeoutException e)
- {
- LOGGER.warn("Timeout whilst updating node state for group " + _configuration.getGroupName());
- future.cancel(true);
+ String replicationNodeName = replicationNodeEntry.getKey();
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Remote replication node removed '" + replicationNodeName + "' from '" + groupName + "'");
+ }
+ _remoteReplicationNodes.remove(replicationNodeName);
+ if (replicationGroupListener != null)
+ {
+ replicationGroupListener.onReplicationNodeRemovedFromGroup(replicationNodeEntry.getValue());
+ }
}
}
+ }
+ }
+
+ private Map<ReplicationNode, NodeState> discoverNodeStates(Collection<ReplicationNode> electableNodes)
+ {
+ final Map<ReplicationNode, NodeState> nodeStates = new HashMap<ReplicationNode, NodeState>();
+ Set<Future<Void>> futures = new HashSet<Future<Void>>();
- if (ReplicatedEnvironment.State.MASTER == _environment.getState())
+ for (final ReplicationNode node : electableNodes)
+ {
+ Future<Void> future = _groupChangeExecutor.submit(new Callable<Void>()
{
- boolean stateChanged = !_previousGroupState.equals(currentGroupState);
- _previousGroupState = currentGroupState;
- if (stateChanged && State.OPEN == _state.get())
+ @Override
+ public Void call()
{
- new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this);
+ NodeState nodeStateObject = null;
+ try
+ {
+ nodeStateObject = getRemoteNodeState(node);
+ }
+ catch (IOException | ServiceConnectFailedException e )
+ {
+ // Cannot discover node states. The node state should be treated as UNKNOWN
+ }
+
+ nodeStates.put(node, nodeStateObject);
+ return null;
}
+ });
+ futures.add(future);
+ }
+
+ for (Future<Void> future : futures)
+ {
+ try
+ {
+ future.get(REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ LOGGER.warn("Cannot update node state for group " + _configuration.getGroupName(), e.getCause());
+ }
+ catch (TimeoutException e)
+ {
+ LOGGER.warn("Timeout whilst updating node state for group " + _configuration.getGroupName());
+ future.cancel(true);
}
}
- finally
+ return nodeStates;
+ }
+
+ private void executeDabasePingerOnNodeChangesIfMaster(final Map<ReplicationNode, NodeState> nodeStates)
+ {
+ if (ReplicatedEnvironment.State.MASTER == _environment.getState())
{
- _groupChangeExecutor.schedule(this, REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+ Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<String, ReplicatedEnvironment.State>();
+ for (Map.Entry<ReplicationNode, NodeState> entry : nodeStates.entrySet())
+ {
+ ReplicationNode node = entry.getKey();
+ NodeState nodeState = entry.getValue();
+ ReplicatedEnvironment.State state = nodeState == null? ReplicatedEnvironment.State.UNKNOWN : nodeState.getNodeState();
+ currentGroupState.put(node.getName(), state);
+ }
+ boolean stateChanged = !_previousGroupState.equals(currentGroupState);
+ _previousGroupState = currentGroupState;
+ if (stateChanged && State.OPEN == _state.get())
+ {
+ new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this);
+ }
+ }
+ }
+
+ private void notifyGroupListenerAboutNodeStates(final Map<ReplicationNode, NodeState> nodeStates)
+ {
+ ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get();
+ if (replicationGroupListener != null)
+ {
+ for (Map.Entry<ReplicationNode, NodeState> entry : nodeStates.entrySet())
+ {
+ replicationGroupListener.onNodeState(entry.getKey(), entry.getValue());
+ }
}
- return null;
}
}
+
public static enum State
{
OPENING,
Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java?rev=1591281&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java Wed Apr 30 13:08:49 2014
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.ReplicationNode;
+
+public interface ReplicationGroupListener
+{
+ /**
+ * Fired when a remote replication node is added to a group. This event happens
+ * exactly once just after a new replication node is created.
+ */
+ void onReplicationNodeAddedToGroup(ReplicationNode node);
+
+ /**
+ * Fired exactly once for each existing remote node. Used to inform the application
+ * on any existing nodes as it starts up for the first time.
+ */
+ void onReplicationNodeRecovered(ReplicationNode node);
+
+ /**
+ * Fired when a remote replication node is (permanently) removed from group. This event
+ * happens exactly once just after the existing replication node is deleted.
+ */
+ void onReplicationNodeRemovedFromGroup(ReplicationNode node);
+
+ /**
+ * Invoked to notify listener on node state update
+ */
+ void onNodeState(ReplicationNode node, NodeState nodeState);
+
+}
Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java?rev=1591281&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java Wed Apr 30 13:08:49 2014
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode.berkeleydb;
+
+import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.RemoteReplicationNode;
+
+@ManagedObject(category=false, managesChildren=false, creatable=false)
+public interface BDBHARemoteReplicationNode<X extends BDBHARemoteReplicationNode<X>> extends RemoteReplicationNode<X>
+{
+ String GROUP_NAME = "groupName";
+ String ADDRESS = "address";
+ String ROLE = "role";
+ String LAST_KNOWN_REPLICATION_TRANSACTION_ID = "lastKnownReplicationTransactionId";
+ String JOIN_TIME = "joinTime";
+
+ @ManagedAttribute(derived = true)
+ String getGroupName();
+
+ @ManagedAttribute(derived = true)
+ String getAddress();
+
+ @ManagedAttribute(automate = true)
+ String getRole();
+
+ @ManagedAttribute(derived = true)
+ long getJoinTime();
+
+ @ManagedAttribute(derived = true)
+ long getLastKnownReplicationTransactionId();
+
+}
Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java?rev=1591281&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java Wed Apr 30 13:08:49 2014
@@ -0,0 +1,201 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.virtualhostnode.berkeleydb;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+
+import com.sleepycat.je.rep.MasterStateException;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+
+public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDBHARemoteReplicationNodeImpl> implements BDBHARemoteReplicationNode<BDBHARemoteReplicationNodeImpl>
+{
+ private static final Logger LOGGER = Logger.getLogger(BDBHARemoteReplicationNodeImpl.class);
+
+ private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
+ private final String _address;
+
+ private volatile long _joinTime;
+ private volatile long _lastTransactionId;
+
+ @ManagedAttributeField(afterSet="afterSetRole")
+ private volatile String _role;
+
+ private final AtomicReference<State> _state;
+
+ public BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl virtualHostNode, Map<String, Object> attributes, ReplicatedEnvironmentFacade replicatedEnvironmentFacade)
+ {
+ super(parentsMap(virtualHostNode), attributes);
+ _address = (String)attributes.get(ADDRESS);
+ _replicatedEnvironmentFacade = replicatedEnvironmentFacade;
+ _state = new AtomicReference<State>(State.ACTIVE);
+ }
+
+ @Override
+ public State getState()
+ {
+ return _state.get();
+ }
+
+ @Override
+ public String getGroupName()
+ {
+ return _replicatedEnvironmentFacade.getGroupName();
+ }
+
+ @Override
+ public String getAddress()
+ {
+ return _address;
+ }
+
+ @Override
+ public String getRole()
+ {
+ return _role;
+ }
+
+ @Override
+ public long getJoinTime()
+ {
+ return _joinTime;
+ }
+
+ @Override
+ public long getLastKnownReplicationTransactionId()
+ {
+ return _lastTransactionId;
+ }
+
+ public void delete()
+ {
+ this.deleted();
+ }
+
+ protected void afterSetRole()
+ {
+ try
+ {
+ String nodeName = getName();
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Trying to transfer master to " + nodeName);
+ }
+
+ _replicatedEnvironmentFacade.transferMasterAsynchronously(nodeName);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("The mastership has been transfered to " + nodeName);
+ }
+ }
+ catch(Exception e)
+ {
+ throw new IllegalConfigurationException("Cannot transfer mastership to " + getName(), e);
+ }
+ }
+
+ @Override
+ protected boolean setState(State currentState, State desiredState)
+ {
+ if (desiredState == State.DELETED)
+ {
+ String nodeName = getName();
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleting node '" + nodeName + "' from group '" + getGroupName() + "'");
+ }
+
+ try
+ {
+ _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName);
+ _state.set(State.DELETED);
+ delete();
+ return true;
+ }
+ catch(MasterStateException e)
+ {
+ throw new IllegalStateTransitionException("Node '" + nodeName + "' cannot be deleted when role is a master");
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateTransitionException("Unexpected exception on node '" + nodeName + "' deletion", e);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
+ {
+ super.validateChange(proxyForValidation, changedAttributes);
+ if (changedAttributes.contains(ROLE))
+ {
+ String currentRole = getRole();
+ if (!ReplicatedEnvironment.State.REPLICA.name().equals(currentRole))
+ {
+ throw new IllegalArgumentException("Cannot transfer mastership when not a replica");
+ }
+ if (!ReplicatedEnvironment.State.MASTER.name().equals(((BDBHARemoteReplicationNode<?>)proxyForValidation).getRole()))
+ {
+ throw new IllegalArgumentException("Changing role to other value then " + ReplicatedEnvironment.State.MASTER.name() + " is unsupported");
+ }
+ }
+
+ if (changedAttributes.contains(JOIN_TIME))
+ {
+ throw new IllegalArgumentException("Cannot change derived attribute " + JOIN_TIME);
+ }
+
+ if (changedAttributes.contains(LAST_KNOWN_REPLICATION_TRANSACTION_ID))
+ {
+ throw new IllegalArgumentException("Cannot change derived attribute " + LAST_KNOWN_REPLICATION_TRANSACTION_ID);
+ }
+ }
+
+ void setRole(String role)
+ {
+ _role = role;
+ }
+
+ void setJoinTime(long joinTime)
+ {
+ _joinTime = joinTime;
+ }
+
+ void setLastTransactionId(long lastTransactionId)
+ {
+ _lastTransactionId = lastTransactionId;
+ }
+
+}
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1591281&r1=1591280&r2=1591281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java Wed Apr 30 13:08:49 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.virtualhostnode.berkeleydb;
import java.security.PrivilegedAction;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -30,12 +31,14 @@ import java.util.concurrent.atomic.Atomi
import javax.security.auth.Subject;
+import com.sleepycat.je.rep.NodeState;
import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationNode;
import com.sleepycat.je.rep.StateChangeEvent;
import com.sleepycat.je.rep.StateChangeListener;
-import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.log4j.Logger;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
@@ -43,6 +46,7 @@ import org.apache.qpid.server.model.Conf
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
+import org.apache.qpid.server.model.RemoteReplicationNode;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
@@ -54,12 +58,14 @@ import org.apache.qpid.server.store.berk
import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicationGroupListener;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostState;
import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
@ManagedObject( category = false, type = "BDB_HA" )
-public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtualHostNodeImpl> implements BDBHAVirtualHostNode<BDBHAVirtualHostNodeImpl>
+public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtualHostNodeImpl> implements
+ BDBHAVirtualHostNode<BDBHAVirtualHostNodeImpl>
{
/**
* Length of time we synchronously await the a JE mutation to complete. It is not considered an error if we exceed this timeout, although a
@@ -183,7 +189,7 @@ public class BDBHAVirtualHostNodeImpl ex
@Override
public String getRole()
{
- ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
if (environmentFacade != null)
{
return environmentFacade.getNodeState();
@@ -194,7 +200,7 @@ public class BDBHAVirtualHostNodeImpl ex
@Override
public Long getLastKnownReplicationTransactionId()
{
- ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
if (environmentFacade != null)
{
return environmentFacade.getLastKnownReplicationTransactionId();
@@ -205,7 +211,7 @@ public class BDBHAVirtualHostNodeImpl ex
@Override
public Long getJoinTime()
{
- ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
if (environmentFacade != null)
{
return environmentFacade.getJoinTime();
@@ -219,6 +225,14 @@ public class BDBHAVirtualHostNodeImpl ex
return _replicatedEnvironmentConfiguration;
}
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Collection<? extends RemoteReplicationNode> getRemoteReplicationNodes()
+ {
+ Collection<RemoteReplicationNode> remoteNodes = getChildren(RemoteReplicationNode.class);
+ return (Collection<? extends RemoteReplicationNode>)remoteNodes;
+ }
+
@Override
public String toString()
{
@@ -255,6 +269,12 @@ public class BDBHAVirtualHostNodeImpl ex
return (BDBMessageStore) super.getConfigurationStore();
}
+ protected ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade()
+ {
+ return _environmentFacade.get();
+ }
+
+ @Override
protected DurableConfigurationStore createConfigurationStore()
{
return new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory());
@@ -276,8 +296,11 @@ public class BDBHAVirtualHostNodeImpl ex
getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.STORE_LOCATION(getStorePath()));
ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) getConfigurationStore().getEnvironmentFacade();
- environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
- _environmentFacade.set(environmentFacade);
+ if (_environmentFacade.compareAndSet(null, environmentFacade))
+ {
+ environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
+ environmentFacade.setReplicationGroupListener(new RemoteNodesDiscoverer());
+ }
}
@Override
@@ -289,7 +312,7 @@ public class BDBHAVirtualHostNodeImpl ex
}
finally
{
- ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
if (_environmentFacade.compareAndSet(environmentFacade, null))
{
environmentFacade.close();
@@ -421,7 +444,7 @@ public class BDBHAVirtualHostNodeImpl ex
@SuppressWarnings("unused")
private void postSetPriority()
{
- ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
if (environmentFacade != null)
{
try
@@ -451,7 +474,7 @@ public class BDBHAVirtualHostNodeImpl ex
@SuppressWarnings("unused")
private void postSetDesignatedPrimary()
{
- ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
if (environmentFacade != null)
{
try
@@ -481,7 +504,7 @@ public class BDBHAVirtualHostNodeImpl ex
@SuppressWarnings("unused")
private void postSetQuorumOverride()
{
- ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
if (environmentFacade != null)
{
try
@@ -511,7 +534,7 @@ public class BDBHAVirtualHostNodeImpl ex
@SuppressWarnings("unused")
private void preSetRole()
{
- ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
if (environmentFacade != null)
{
String currentRole = environmentFacade.getNodeState();
@@ -531,7 +554,7 @@ public class BDBHAVirtualHostNodeImpl ex
@SuppressWarnings("unused")
private void postSetRole()
{
- ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
if (environmentFacade != null)
{
try
@@ -561,6 +584,63 @@ public class BDBHAVirtualHostNodeImpl ex
}
}
+ private class RemoteNodesDiscoverer implements ReplicationGroupListener
+ {
+ @Override
+ public void onReplicationNodeAddedToGroup(ReplicationNode node)
+ {
+ BDBHARemoteReplicationNodeImpl remoteNode = new BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl.this, nodeToAttributes(node), getReplicatedEnvironmentFacade());
+ remoteNode.create();
+ childAdded(remoteNode);
+ }
+
+ @Override
+ public void onReplicationNodeRecovered(ReplicationNode node)
+ {
+ BDBHARemoteReplicationNodeImpl remoteNode = new BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl.this, nodeToAttributes(node), getReplicatedEnvironmentFacade());
+ remoteNode.open();
+ }
+
+ @Override
+ public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+ {
+ BDBHARemoteReplicationNodeImpl remoteNode = getChildByName(BDBHARemoteReplicationNodeImpl.class, node.getName());
+ if (remoteNode != null)
+ {
+ remoteNode.delete();
+ childRemoved(remoteNode);
+ }
+ }
+
+ @Override
+ public void onNodeState(ReplicationNode node, NodeState nodeState)
+ {
+ BDBHARemoteReplicationNodeImpl remoteNode = getChildByName(BDBHARemoteReplicationNodeImpl.class, node.getName());
+ if (remoteNode != null)
+ {
+ if (nodeState == null)
+ {
+ remoteNode.setRole(ReplicatedEnvironment.State.UNKNOWN.name());
+ }
+ else
+ {
+ remoteNode.setRole(nodeState.getNodeState().name());
+ remoteNode.setJoinTime(nodeState.getJoinTime());
+ remoteNode.setLastTransactionId(nodeState.getKnownMasterTxnEndVLSN());
+ }
+ }
+ }
+
+ private Map<String, Object> nodeToAttributes(ReplicationNode replicationNode)
+ {
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(ConfiguredObject.NAME, replicationNode.getName());
+ attributes.put(ConfiguredObject.DURABLE, false);
+ attributes.put(BDBHARemoteReplicationNode.ADDRESS, replicationNode.getHostName() + ":" + replicationNode.getPort());
+ return attributes;
+ }
+ }
+
private class ReplicaVirtualHost extends BDBHAVirtualHost
{
ReplicaVirtualHost(Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java?rev=1591281&r1=1591280&r2=1591281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java Wed Apr 30 13:08:49 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.ber
import static org.mockito.Mockito.when;
import java.io.File;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -39,11 +40,13 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.RemoteReplicationNode;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -159,8 +162,11 @@ public class BDBHAVirtualHostNodeTest ex
@Override
public void childAdded(ConfiguredObject object, ConfiguredObject child)
{
- child.addChangeListener(this);
- virtualHostAddedLatch.countDown();
+ if (child instanceof VirtualHost)
+ {
+ child.addChangeListener(this);
+ virtualHostAddedLatch.countDown();
+ }
}
@Override
@@ -314,7 +320,100 @@ public class BDBHAVirtualHostNodeTest ex
while(!"MASTER".equals(replica.getRole()))
{
Thread.sleep(100);
- if (awaitMastershipCount > 20)
+ if (awaitMastershipCount > 50)
+ {
+ fail("Replica did not assume master role");
+ }
+ awaitMastershipCount++;
+ }
+ }
+
+
+ public void testTransferMasterToReplica() throws Exception
+ {
+ int node1PortNumber = findFreePort();
+ String helperAddress = "localhost:" + node1PortNumber;
+ String groupName = "group";
+
+ Map<String, Object> node1Attributes = new HashMap<String, Object>();
+ node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node1Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node1Attributes.put(BDBHAVirtualHostNode.NAME, "node1");
+ node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress);
+ node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1");
+
+ BDBHAVirtualHostNode<?> node1 = createHaVHN(node1Attributes);
+ assertEquals("Failed to activate node", State.ACTIVE, node1.setDesiredState(node1.getState(), State.ACTIVE));
+
+ final CountDownLatch remoteNodeLatch = new CountDownLatch(2);
+ node1.addChangeListener(new ConfigurationChangeListener()
+ {
+ @Override
+ public void stateChanged(ConfiguredObject object, State oldState, State newState)
+ {
+ }
+
+ @Override
+ public void childRemoved(ConfiguredObject object, ConfiguredObject child)
+ {
+ }
+
+ @Override
+ public void childAdded(ConfiguredObject object, ConfiguredObject child)
+ {
+ if (child instanceof RemoteReplicationNode)
+ {
+ remoteNodeLatch.countDown();
+ }
+ }
+
+ @Override
+ public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue,
+ Object newAttributeValue)
+ {
+ }
+ });
+
+ int node2PortNumber = getNextAvailable(node1PortNumber+1);
+
+ Map<String, Object> node2Attributes = new HashMap<String, Object>();
+ node2Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node2Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node2Attributes.put(BDBHAVirtualHostNode.NAME, "node2");
+ node2Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node2Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node2PortNumber);
+ node2Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2");
+
+ BDBHAVirtualHostNode<?> node2 = createHaVHN(node2Attributes);
+ assertEquals("Failed to activate node2", State.ACTIVE, node2.setDesiredState(node2.getState(), State.ACTIVE));
+
+ int node3PortNumber = getNextAvailable(node2PortNumber+1);
+ Map<String, Object> node3Attributes = new HashMap<String, Object>();
+ node3Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node3Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node3Attributes.put(BDBHAVirtualHostNode.NAME, "node3");
+ node3Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node3Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node3PortNumber);
+ node3Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node3Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "3");
+ BDBHAVirtualHostNode<?> node3 = createHaVHN(node3Attributes);
+ assertEquals("Failed to activate node3", State.ACTIVE, node3.setDesiredState(node3.getState(), State.ACTIVE));
+
+ assertTrue("Replication nodes have not been seen during 5s", remoteNodeLatch.await(5, TimeUnit.SECONDS));
+
+ Collection<? extends RemoteReplicationNode> remoteNodes = node1.getRemoteReplicationNodes();
+ RemoteReplicationNode replicaRemoteNode = remoteNodes.iterator().next();
+ replicaRemoteNode.setAttribute(BDBHARemoteReplicationNode.ROLE, "REPLICA", "MASTER");
+
+ BDBHAVirtualHostNode<?> replica = replicaRemoteNode.getName().equals(node2.getName())? node2 : node3;
+ int awaitMastershipCount = 0;
+ while(!"MASTER".equals(replica.getRole()))
+ {
+ Thread.sleep(100);
+ if (awaitMastershipCount > 50)
{
fail("Replica did not assume master role");
}
Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1591281&r1=1591280&r2=1591281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java Wed Apr 30 13:08:49 2014
@@ -24,12 +24,14 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -40,15 +42,17 @@ import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Environment;
+import com.sleepycat.je.rep.InsufficientReplicasException;
+import com.sleepycat.je.rep.NodeState;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicatedEnvironment.State;
import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationNode;
import com.sleepycat.je.rep.StateChangeEvent;
import com.sleepycat.je.rep.StateChangeListener;
public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
-
private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
private static final int LISTENER_TIMEOUT = 5;
private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
@@ -220,6 +224,304 @@ public class ReplicatedEnvironmentFacade
assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride());
}
+ public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception
+ {
+ ReplicatedEnvironmentFacade master = createMaster();
+ String nodeName2 = TEST_NODE_NAME + "_2";
+ String host = "localhost";
+ int port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node2NodeHostPort = host + ":" + port;
+
+ final AtomicInteger invocationCount = new AtomicInteger();
+ final CountDownLatch nodeRecoveryLatch = new CountDownLatch(1);
+ ReplicationGroupListener listener = new NoopReplicationGroupListener()
+ {
+ @Override
+ public void onReplicationNodeRecovered(ReplicationNode node)
+ {
+ nodeRecoveryLatch.countDown();
+ invocationCount.incrementAndGet();
+ }
+ };
+
+ createReplica(nodeName2, node2NodeHostPort, listener);
+
+ assertEquals("Unexpected number of nodes", 2, master.getNumberOfElectableGroupMembers());
+
+ assertTrue("Listener not fired within timeout", nodeRecoveryLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+ assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
+ }
+
+ public void testReplicationGroupListenerHearsNodeAdded() throws Exception
+ {
+ final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
+ final AtomicInteger invocationCount = new AtomicInteger();
+ ReplicationGroupListener listener = new NoopReplicationGroupListener()
+ {
+ @Override
+ public void onReplicationNodeAddedToGroup(ReplicationNode node)
+ {
+ invocationCount.getAndIncrement();
+ nodeAddedLatch.countDown();
+ }
+ };
+
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
+ assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ assertEquals("Unexpected number of nodes at start of test", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
+
+ String node2Name = TEST_NODE_NAME + "_2";
+ String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
+ createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
+
+ assertTrue("Listener not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ assertEquals("Unexpected number of nodes", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
+
+ assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
+ }
+
+ public void testReplicationGroupListenerHearsNodeRemoved() throws Exception
+ {
+ final CountDownLatch nodeDeletedLatch = new CountDownLatch(1);
+ final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
+ final AtomicInteger invocationCount = new AtomicInteger();
+ ReplicationGroupListener listener = new NoopReplicationGroupListener()
+ {
+ @Override
+ public void onReplicationNodeRecovered(ReplicationNode node)
+ {
+ nodeAddedLatch.countDown();
+ }
+
+ @Override
+ public void onReplicationNodeAddedToGroup(ReplicationNode node)
+ {
+ nodeAddedLatch.countDown();
+ }
+
+ @Override
+ public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+ {
+ invocationCount.getAndIncrement();
+ nodeDeletedLatch.countDown();
+ }
+ };
+
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
+ assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ String node2Name = TEST_NODE_NAME + "_2";
+ String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
+ createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
+
+ assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
+
+ // Need to await the listener hearing the addition of the node to the model.
+ assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ // Now remove the node and ensure we hear the event
+ replicatedEnvironmentFacade.removeNodeFromGroup(node2Name);
+
+ assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ assertEquals("Unexpected number of nodes after node removal", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
+
+ assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
+ }
+
+ public void testMasterHearsRemoteNodeRoles() throws Exception
+ {
+ final String node2Name = TEST_NODE_NAME + "_2";
+ final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
+ final AtomicReference<ReplicationNode> nodeRef = new AtomicReference<ReplicationNode>();
+ final CountDownLatch stateLatch = new CountDownLatch(1);
+ final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>();
+ ReplicationGroupListener listener = new NoopReplicationGroupListener()
+ {
+ @Override
+ public void onReplicationNodeAddedToGroup(ReplicationNode node)
+ {
+ nodeRef.set(node);
+ nodeAddedLatch.countDown();
+ }
+
+ @Override
+ public void onNodeState(ReplicationNode node, NodeState nodeState)
+ {
+ if (node2Name.equals(node.getName()))
+ {
+ stateRef.set(nodeState);
+ stateLatch.countDown();
+ }
+ }
+ };
+
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
+ assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
+ createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
+
+ assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
+
+ assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ ReplicationNode remoteNode = (ReplicationNode)nodeRef.get();
+ assertEquals("Unexpcted node name", node2Name, remoteNode.getName());
+
+ assertTrue("Node state not fired within timeout", stateLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+ assertEquals("Unexpcted node state", State.REPLICA, stateRef.get().getNodeState());
+ }
+
+ public void testRemoveNodeFromGroup() throws Exception
+ {
+ ReplicatedEnvironmentFacade environmentFacade = createMaster();
+
+ String node2Name = TEST_NODE_NAME + "_2";
+ String node2NodeHostPort = "localhost:" + getNextAvailable(TEST_NODE_PORT + 1);
+ ReplicatedEnvironmentFacade ref2 = createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
+
+ assertEquals("Unexpected group members count", 2, environmentFacade.getNumberOfElectableGroupMembers());
+ ref2.close();
+
+ environmentFacade.removeNodeFromGroup(node2Name);
+ assertEquals("Unexpected group members count", 1, environmentFacade.getNumberOfElectableGroupMembers());
+ }
+
+
+ public void testEnvironmentFacadeDetectsRemovalOfRemoteNode() throws Exception
+ {
+ final String replicaName = TEST_NODE_NAME + "_1";
+ final CountDownLatch nodeRemovedLatch = new CountDownLatch(1);
+ final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
+ final AtomicReference<ReplicationNode> addedNodeRef = new AtomicReference<ReplicationNode>();
+ final AtomicReference<ReplicationNode> removedNodeRef = new AtomicReference<ReplicationNode>();
+ final CountDownLatch stateLatch = new CountDownLatch(1);
+ final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>();
+
+ ReplicationGroupListener listener = new NoopReplicationGroupListener()
+ {
+ @Override
+ public void onReplicationNodeAddedToGroup(ReplicationNode node)
+ {
+ if (addedNodeRef.compareAndSet(null, node))
+ {
+ nodeAddedLatch.countDown();
+ }
+ }
+
+ @Override
+ public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+ {
+ removedNodeRef.set(node);
+ nodeRemovedLatch.countDown();
+ }
+
+ @Override
+ public void onNodeState(ReplicationNode node, NodeState nodeState)
+ {
+ if (replicaName.equals(node.getName()))
+ {
+ stateRef.set(nodeState);
+ stateLatch.countDown();
+ }
+ }
+ };
+
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ final ReplicatedEnvironmentFacade masterEnvironment = addNode(State.MASTER, stateChangeListener, listener);
+ assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ masterEnvironment.setDesignatedPrimary(true);
+
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node1NodeHostPort = "localhost:" + replica1Port;
+
+ ReplicatedEnvironmentFacade replica = createReplica(replicaName, node1NodeHostPort, new NoopReplicationGroupListener());
+
+ assertTrue("Node should be added", nodeAddedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
+
+ ReplicationNode node = addedNodeRef.get();
+ assertEquals("Unexpected node name", replicaName, node.getName());
+
+ assertTrue("Node state was not heared", stateLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
+ assertEquals("Unexpected node role", State.REPLICA, stateRef.get().getNodeState());
+ assertEquals("Unexpected node name", replicaName, stateRef.get().getNodeName());
+
+ replica.close();
+ masterEnvironment.removeNodeFromGroup(node.getName());
+
+ assertTrue("Node deleting is undetected by the environment facade", nodeRemovedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
+ assertEquals("Unexpected node is deleted", node, removedNodeRef.get());
+ }
+
+ public void testCloseStateTransitions() throws Exception
+ {
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster();
+
+ assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState());
+ replicatedEnvironmentFacade.close();
+ assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState());
+ }
+
+ public void testEnvironmentRestartOnInsufficientReplicas() throws Exception
+ {
+
+ ReplicatedEnvironmentFacade master = createMaster();
+
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String replica1NodeName = TEST_NODE_NAME + "_1";
+ String replica1NodeHostPort = "localhost:" + replica1Port;
+ ReplicatedEnvironmentFacade replica1 = createReplica(replica1NodeName, replica1NodeHostPort, new NoopReplicationGroupListener());
+
+ int replica2Port = getNextAvailable(replica1Port + 1);
+ String replica2NodeName = TEST_NODE_NAME + "_2";
+ String replica2NodeHostPort = "localhost:" + replica2Port;
+ ReplicatedEnvironmentFacade replica2 = createReplica(replica2NodeName, replica2NodeHostPort, new NoopReplicationGroupListener());
+
+ String databaseName = "test";
+
+ DatabaseConfig dbConfig = createDatabase(master, databaseName);
+
+ // close replicas
+ replica1.close();
+ replica2.close();
+
+ Environment e = master.getEnvironment();
+ master.getOpenDatabase(databaseName);
+ try
+ {
+ master.openDatabases(dbConfig, "test2");
+ fail("Opening of new database without quorum should fail");
+ }
+ catch(InsufficientReplicasException ex)
+ {
+ master.handleDatabaseException(null, ex);
+ }
+
+ EnumSet<State> states = EnumSet.of(State.MASTER, State.REPLICA);
+ replica1 = createReplica(replica1NodeName, replica1NodeHostPort, new TestStateChangeListener(states), new NoopReplicationGroupListener());
+ replica2 = createReplica(replica2NodeName, replica2NodeHostPort, new TestStateChangeListener(states), new NoopReplicationGroupListener());
+
+ // Need to poll to await the remote node updating itself
+ long timeout = System.currentTimeMillis() + 5000;
+ while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout)
+ {
+ Thread.sleep(200);
+ }
+
+ assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(),
+ State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) );
+
+ Environment e2 = master.getEnvironment();
+ assertNotSame("Environment has not been restarted", e2, e);
+ }
+
public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception
{
final CountDownLatch masterLatch = new CountDownLatch(1);
@@ -244,7 +546,7 @@ public class ReplicatedEnvironmentFacade
}
};
- addNode(State.MASTER, stateChangeListener);
+ addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
@@ -252,8 +554,8 @@ public class ReplicatedEnvironmentFacade
int replica2Port = getNextAvailable(replica1Port + 1);
String node2NodeHostPort = "localhost:" + replica2Port;
- ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
- ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort);
+ ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
+ ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort, new NoopReplicationGroupListener());
// close replicas
replica1.close();
@@ -266,15 +568,6 @@ public class ReplicatedEnvironmentFacade
assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get());
}
- public void testCloseStateTransitions() throws Exception
- {
- ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster();
-
- assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState());
- replicatedEnvironmentFacade.close();
- assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState());
- }
-
public void testTransferMasterToSelf() throws Exception
{
final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1);
@@ -295,12 +588,12 @@ public class ReplicatedEnvironmentFacade
}
}
};
- ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener);
+ ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
String node1NodeHostPort = "localhost:" + replica1Port;
- ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+ ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
int replica2Port = getNextAvailable(replica1Port + 1);
@@ -323,7 +616,7 @@ public class ReplicatedEnvironmentFacade
}
}
};
- ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener);
+ ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
@@ -353,12 +646,12 @@ public class ReplicatedEnvironmentFacade
}
}
};
- ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener);
+ ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
String node1NodeHostPort = "localhost:" + replica1Port;
- ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+ ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
int replica2Port = getNextAvailable(replica1Port + 1);
@@ -382,7 +675,7 @@ public class ReplicatedEnvironmentFacade
}
};
String thirdNodeName = TEST_NODE_NAME + "_2";
- ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener);
+ ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
@@ -394,34 +687,56 @@ public class ReplicatedEnvironmentFacade
private ReplicatedEnvironmentFacade createMaster() throws Exception
{
+ return createMaster(new NoopReplicationGroupListener());
+ }
+
+ private ReplicatedEnvironmentFacade createMaster(ReplicationGroupListener replicationGroupListener) throws Exception
+ {
TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
- ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener);
+ ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener, replicationGroupListener);
assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
return env;
}
- private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort) throws Exception
+ private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort, ReplicationGroupListener replicationGroupListener) throws Exception
{
TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA);
- ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener);
+ return createReplica(nodeName, nodeHostPort, testStateChangeListener, replicationGroupListener);
+ }
+
+ private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort,
+ TestStateChangeListener testStateChangeListener, ReplicationGroupListener replicationGroupListener)
+ throws InterruptedException
+ {
+ ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, replicationGroupListener);
boolean awaitForStateChange = testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS);
assertTrue("Replica " + nodeName + " did not go into desired state; current actual state is " + testStateChangeListener.getCurrentActualState(), awaitForStateChange);
return replicaEnvironmentFacade;
}
private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
- State desiredState, StateChangeListener stateChangeListener)
+ State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
{
ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary);
ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config, null);
ref.setStateChangeListener(stateChangeListener);
+ ref.setReplicationGroupListener(replicationGroupListener);
_nodes.put(nodeName, ref);
return ref;
}
- private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener)
+ private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
+ {
+ return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener, replicationGroupListener);
+ }
+
+ private DatabaseConfig createDatabase(ReplicatedEnvironmentFacade environmentFacade, String databaseName)
{
- return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener);
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ environmentFacade.openDatabases(dbConfig, databaseName);
+ return dbConfig;
}
private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary)
@@ -444,4 +759,29 @@ public class ReplicatedEnvironmentFacade
when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath());
return node;
}
+
+ class NoopReplicationGroupListener implements ReplicationGroupListener
+ {
+
+ @Override
+ public void onReplicationNodeAddedToGroup(ReplicationNode node)
+ {
+ }
+
+ @Override
+ public void onReplicationNodeRecovered(ReplicationNode node)
+ {
+ }
+
+ @Override
+ public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+ {
+ }
+
+ @Override
+ public void onNodeState(ReplicationNode node, NodeState nodeState)
+ {
+ }
+
+ }
}
Added: qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java?rev=1591281&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java (added)
+++ qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java Wed Apr 30 13:08:49 2014
@@ -0,0 +1,176 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.systest.rest.QpidRestTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.util.FileUtils;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+
+public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase
+{
+ private static final String NODE1 = "node1";
+ private static final String NODE2 = "node2";
+ private static final String NODE3 = "node3";
+
+ private int _node1HaPort;
+ private int _node2HaPort;
+ private int _node3HaPort;
+
+ private String _hostName;
+ private File _storeBaseDir;
+ private String _baseNodeRestUrl;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000");
+
+ super.setUp();
+ _hostName = getTestName();
+ _baseNodeRestUrl = "/rest/virtualhostnode/";
+
+ _storeBaseDir = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis());
+
+ _node1HaPort = findFreePort();
+ _node2HaPort = getNextAvailable(_node1HaPort + 1);
+ _node3HaPort = getNextAvailable(_node2HaPort + 1);
+
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ if (_storeBaseDir != null)
+ {
+ FileUtils.delete(_storeBaseDir, true);
+ }
+ }
+ }
+
+ @Override
+ protected void customizeConfiguration() throws IOException
+ {
+ super.customizeConfiguration();
+ TestBrokerConfiguration config = getBrokerConfiguration();
+ config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST);
+ config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST);
+ }
+
+ public void testCreate3NodesCluster() throws Exception
+ {
+ createHANode(NODE1, _node1HaPort, _node1HaPort);
+ assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1);
+ createHANode(NODE2, _node2HaPort, _node1HaPort);
+ assertNode(NODE2, _node2HaPort, _node1HaPort, NODE1);
+ createHANode(NODE3, _node3HaPort, _node1HaPort);
+ assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1);
+ assertRemoteNodes(NODE1, NODE2, NODE3);
+ }
+
+ private void createHANode(String nodeName, int nodePort, int helperPort) throws IOException, JsonGenerationException, JsonMappingException
+ {
+ Map<String, Object> nodeData = new HashMap<String, Object>();
+ nodeData.put(BDBHAVirtualHostNode.NAME, nodeName);
+ nodeData.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ nodeData.put(BDBHAVirtualHostNode.STORE_PATH, _storeBaseDir.getPath() + File.separator + nodeName);
+ nodeData.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName);
+ nodeData.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + nodePort);
+ nodeData.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + helperPort);
+
+ int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName, "PUT", nodeData);
+ assertEquals("Unexpected response code for virtual host node " + nodeName + " creation request", 201, responseCode);
+ }
+
+ private void assertNode(String nodeName, int nodePort, int nodeHelperPort, String masterNode) throws Exception
+ {
+ boolean isMaster = nodeName.equals(masterNode);
+ waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, isMaster? "MASTER" : "REPLICA");
+
+ Map<String, Object> nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + nodeName);
+ assertEquals("Unexpected name", nodeName, nodeData.get(BDBHAVirtualHostNode.NAME));
+ assertEquals("Unexpected type", "BDB_HA", nodeData.get(BDBHAVirtualHostNode.TYPE));
+ assertEquals("Unexpected path", new File(_storeBaseDir, nodeName).getPath(), nodeData.get(BDBHAVirtualHostNode.STORE_PATH));
+ assertEquals("Unexpected address", "localhost:" + nodePort, nodeData.get(BDBHAVirtualHostNode.ADDRESS));
+ assertEquals("Unexpected helper address", "localhost:" + nodeHelperPort, nodeData.get(BDBHAVirtualHostNode.HELPER_ADDRESS));
+ assertEquals("Unexpected group name", _hostName, nodeData.get(BDBHAVirtualHostNode.GROUP_NAME));;
+
+ if (isMaster)
+ {
+ assertEquals("Unexpected role", "MASTER", nodeData.get(BDBHAVirtualHostNode.ROLE));
+ Map<String, Object> hostData = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/" + masterNode + "/" + _hostName);
+ assertEquals("Unexpected host name", _hostName, hostData.get(VirtualHost.NAME));
+ }
+ else
+ {
+ assertEquals("Unexpected role", "REPLICA", nodeData.get(BDBHAVirtualHostNode.ROLE));
+ }
+ }
+
+ private void assertRemoteNodes(String masterNode, String... replicaNodes) throws Exception
+ {
+ List<String> clusterNodes = new ArrayList<String>(Arrays.asList(replicaNodes));
+ clusterNodes.add(masterNode);
+
+ for (String clusterNodeName : clusterNodes)
+ {
+ List<String> remotes = new ArrayList<String>(clusterNodes);
+ remotes.remove(clusterNodeName);
+ for (String remote : remotes)
+ {
+ String remoteUrl = "/rest/replicationnode/" + clusterNodeName + "/" + remote;
+ waitForAttributeChanged(remoteUrl, BDBHARemoteReplicationNode.ROLE, remote.equals(masterNode) ? "MASTER" : "REPLICA");
+ }
+ }
+ }
+
+ private void waitForAttributeChanged(String url, String attributeName, Object newValue) throws Exception
+ {
+ List<Map<String, Object>> nodeAttributes = getRestTestHelper().getJsonAsList(url);
+ long limit = System.currentTimeMillis() + 5000;
+ while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName))))
+ {
+ Thread.sleep(100l);
+ nodeAttributes = getRestTestHelper().getJsonAsList(url);
+ }
+ assertEquals("Unexpected attribute " + attributeName, newValue, nodeAttributes.get(0).get(attributeName));
+ }
+}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java?rev=1591281&r1=1591280&r2=1591281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java Wed Apr 30 13:08:49 2014
@@ -72,6 +72,7 @@ public final class BrokerModel extends M
addRelationship(Broker.class, Plugin.class);
addRelationship(VirtualHostNode.class, VirtualHost.class);
+ addRelationship(VirtualHostNode.class, RemoteReplicationNode.class);
addRelationship(VirtualHost.class, Exchange.class);
addRelationship(VirtualHost.class, Queue.class);
Copied: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java (from r1591247, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java?p2=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java&p1=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java&r1=1591247&r2=1591281&rev=1591281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java Wed Apr 30 13:08:49 2014
@@ -20,17 +20,10 @@
*/
package org.apache.qpid.server.model;
-import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ManagedObject;
-@ManagedObject(category=true, managesChildren=false)
-public interface VirtualHostNode<X extends VirtualHostNode<X>> extends ConfiguredObject<X>
+@ManagedObject(category=true, managesChildren=false, creatable=false)
+public interface RemoteReplicationNode<X extends RemoteReplicationNode<X>> extends ConfiguredObject<X>
{
- public static final String IS_MESSAGE_STORE_PROVIDER = "messageStoreProvider";
-
- @ManagedAttribute (automate = true, defaultValue = "false")
- boolean isMessageStoreProvider();
-
- VirtualHost<?,?,?> getVirtualHost();
-
- DurableConfigurationStore getConfigurationStore();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java?rev=1591281&r1=1591280&r2=1591281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java Wed Apr 30 13:08:49 2014
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.model;
+import java.util.Collection;
+
import org.apache.qpid.server.store.DurableConfigurationStore;
@ManagedObject(category=true, managesChildren=false)
@@ -33,4 +35,7 @@ public interface VirtualHostNode<X exten
VirtualHost<?,?,?> getVirtualHost();
DurableConfigurationStore getConfigurationStore();
+
+ @SuppressWarnings("rawtypes")
+ Collection<? extends RemoteReplicationNode> getRemoteReplicationNodes();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org