You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/21 17:57:19 UTC
svn commit: r1560094 [1/2] - in /qpid/branches/java-broker-bdb-ha/qpid/java:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/
bdbstore/src/main/java/org/apache/q...
Author: kwall
Date: Tue Jan 21 16:57:19 2014
New Revision: 1560094
URL: http://svn.apache.org/r1560094
Log:
QPID-5409: Add functionality to automatically detect that master is isolated from majority and restart environment.
Added:
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java
- copied, changed from r1559960, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestStateChangeListener.java
- copied, changed from r1559960, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/resources/log4j.properties
Removed:
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeTestCase.java
Modified:
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java Tue Jan 21 16:57:19 2014
@@ -221,8 +221,6 @@ public class BDBHAVirtualHost extends Ab
private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
{
- // TODO shutdown the executor
- private final Executor _executor = Executors.newSingleThreadExecutor();
@Override
public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
@@ -231,20 +229,21 @@ public class BDBHAVirtualHost extends Ab
if (LOGGER.isInfoEnabled())
{
- LOGGER.info("Received BDB event indicating transition to state " + state);
+ LOGGER.info("Received BDB event indicating transition to state " + state
+ + " when current message store state is " + _messageStore._stateManager.getState());
}
switch (state)
{
case MASTER:
- activateStoreAsync();
+ activate();
break;
case REPLICA:
- passivateStoreAsync();
+ passivate();
break;
case DETACHED:
LOGGER.error("BDB replicated node in detached state, therefore passivating.");
- passivateStoreAsync();
+ passivate();
break;
case UNKNOWN:
LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)");
@@ -255,102 +254,33 @@ public class BDBHAVirtualHost extends Ab
}
}
- /**
- * Calls {@link MessageStore#activate()}.
- *
- * <p/>
- *
- * This is done a background thread, in line with
- * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because
- * activate may execute transactions, which can't complete until
- * {@link StateChangeListener#stateChange(StateChangeEvent)} has returned.
- */
- private void activateStoreAsync()
+ private void activate()
{
- String threadName = "BDBHANodeActivationThread-" + getName();
- executeStateChangeAsync(new Callable<Void>()
+ try
{
- @Override
- public Void call() throws Exception
- {
- try
- {
- _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
- _messageStore.activate();
- }
- catch (Exception e)
- {
- LOGGER.error("Failed to activate on hearing MASTER change event", e);
- }
- return null;
- }
- }, threadName);
- }
-
- private void passivateStoreAsync()
- {
- String threadName = "BDBHANodePassivationThread-" + getName();
- executeStateChangeAsync(new Callable<Void>()
+ _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
+ _messageStore.activate();
+ }
+ catch (Exception e)
{
-
- @Override
- public Void call() throws Exception
- {
- try
- {
- if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED))
- {
- LOGGER.debug("Store becoming passive");
- _messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
- }
- }
- catch (Exception e)
- {
- LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event", e);
- }
- return null;
- }
- }, threadName);
+ LOGGER.error("Failed to activate on hearing MASTER change event", e);
+ }
}
- private void executeStateChangeAsync(final Callable<Void> callable, final String threadName)
+ private void passivate()
{
- final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger();
-
- _executor.execute(new Runnable()
+ try
{
-
- @Override
- public void run()
+ if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED))
{
- final String originalThreadName = Thread.currentThread().getName();
- Thread.currentThread().setName(threadName);
- try
- {
- CurrentActor.set(new AbstractActor(_rootLogger)
- {
- @Override
- public String getLogMessage()
- {
- return threadName;
- }
- });
-
- try
- {
- callable.call();
- }
- catch (Exception e)
- {
- LOGGER.error("Exception during state change", e);
- }
- }
- finally
- {
- Thread.currentThread().setName(originalThreadName);
- }
+ _messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
}
- });
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event", e);
+ }
}
+
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java Tue Jan 21 16:57:19 2014
@@ -169,7 +169,7 @@ public class BDBMessageStore implements
try
{
new Upgrader(_environmentFacade.getEnvironment(), _virtualHost.getName()).upgradeIfNecessary();
- _environmentFacade.openDatabases(DATABASE_NAMES, dbConfig);
+ _environmentFacade.openDatabases(dbConfig, DATABASE_NAMES);
_totalStoreSize = getSizeOnDisk();
}
catch(DatabaseException e)
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java Tue Jan 21 16:57:19 2014
@@ -49,7 +49,7 @@ public interface EnvironmentFacade
AMQStoreException handleDatabaseException(String contextMessage, DatabaseException e);
- void openDatabases(String[] databaseNames, DatabaseConfig dbConfig) throws AMQStoreException;
+ void openDatabases(DatabaseConfig dbConfig, String... databaseNames);
void close();
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java Tue Jan 21 16:57:19 2014
@@ -54,6 +54,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.replication.ReplicationGroupListener;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.berkeleydb.replication.DatabasePinger;
import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory;
import org.apache.qpid.server.util.DaemonThreadFactory;
@@ -129,10 +130,11 @@ public class ReplicatedEnvironmentFacade
public static final String TYPE = "BDB-HA";
- // TODO: get rid of these names
+ // TODO: JMX will change to observe the model, at that point these names will disappear
public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
+ private final String _prettyGroupNodeName;
private final String _groupName;
private final String _nodeName;
private final String _nodeHostPort;
@@ -147,23 +149,23 @@ public class ReplicatedEnvironmentFacade
private final ExecutorService _restartEnvironmentExecutor;
private final ScheduledExecutorService _groupChangeExecutor;
private final AtomicReference<State> _state = new AtomicReference<State>(State.INITIAL);
- private final ConcurrentMap<String, Database> _databases = new ConcurrentHashMap<String, Database>();
+ private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>();
private final ConcurrentMap<String, RemoteReplicationNode> _remoteReplicationNodes = new ConcurrentHashMap<String, RemoteReplicationNode>();
private final RemoteReplicationNodeFactory _remoteReplicationNodeFactory;
+ private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
+ private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
private volatile CommitThreadWrapper _commitThreadWrapper;
- private volatile StateChangeListener _stateChangeListener;
private volatile ReplicatedEnvironment _environment;
- private ReplicationGroupListener _replicationGroupListener;
private long _joinTime;
private String _lastKnownReplicationTransactionId;
@SuppressWarnings("unchecked")
- public ReplicatedEnvironmentFacade(String name, String environmentPath,
+ public ReplicatedEnvironmentFacade(String virtualHostName, String environmentPath,
org.apache.qpid.server.model.ReplicationNode replicationNode,
RemoteReplicationNodeFactory remoteReplicationNodeFactory)
{
- _name = name;
+ _name = virtualHostName;
_environmentPath = environmentPath;
_groupName = (String)replicationNode.getAttribute(GROUP_NAME);
_nodeName = replicationNode.getName();
@@ -174,15 +176,49 @@ public class ReplicatedEnvironmentFacade
_coalescingSync = (Boolean)replicationNode.getAttribute(COALESCING_SYNC);
_environmentParameters = (Map<String, String>)replicationNode.getAttribute(PARAMETERS);
_replicationEnvironmentParameters = (Map<String, String>)replicationNode.getAttribute(REPLICATION_PARAMETERS);
+ _prettyGroupNodeName = _groupName + ":" + _nodeName;
- _restartEnvironmentExecutor = Executors.newFixedThreadPool(1, new DaemonThreadFactory("Environment-Restarter:" + _groupName));
- _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _groupName));
+ _restartEnvironmentExecutor = Executors.newFixedThreadPool(1, new DaemonThreadFactory("Environment-Starter:" + _prettyGroupNodeName));
+ _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
_remoteReplicationNodeFactory = remoteReplicationNodeFactory;
_state.set(State.OPENING);
_groupChangeExecutor.scheduleWithFixedDelay(new GroupChangeLearner(), 0, GROUP_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
_groupChangeExecutor.schedule(new RemoteNodeStateLearner(), _remoteReplicationNodeFactory.getRemoteNodeMonitorInterval(), TimeUnit.MILLISECONDS);
- _environment = createEnvironment();
+
+ // create environment in a separate thread to avoid renaming of the current thread by JE
+ Future<ReplicatedEnvironment> environmentFuture = _restartEnvironmentExecutor.submit(new Callable<ReplicatedEnvironment>(){
+ @Override
+ public ReplicatedEnvironment call() throws Exception
+ {
+ String originalThreadName = Thread.currentThread().getName();
+ try
+ {
+ return createEnvironment();
+ }
+ finally
+ {
+ Thread.currentThread().setName(originalThreadName);
+ }
+ }});
+
+ // TODO: evaluate the future timeout from JE ENVIRONMENT_SETUP
+ try
+ {
+ _environment = environmentFuture.get(15 * 2, TimeUnit.MINUTES);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException("Unexpected exception on environment creation", e.getCause());
+ }
+ catch (TimeoutException e)
+ {
+ throw new RuntimeException("JE environment has not been created in due time");
+ }
populateExistingRemoteReplicationNodes();
_commitThreadWrapper = startCommitThread(_name, _environment);
}
@@ -221,9 +257,9 @@ public class ReplicatedEnvironmentFacade
{
try
{
- LOGGER.debug("Closing replicated environment facade");
- _restartEnvironmentExecutor.shutdownNow();
- _groupChangeExecutor.shutdownNow();
+ LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName);
+ _restartEnvironmentExecutor.shutdown();
+ _groupChangeExecutor.shutdown();
stopCommitThread();
closeDatabases();
closeEnvironment();
@@ -273,23 +309,59 @@ public class ReplicatedEnvironmentFacade
}
@Override
- public void openDatabases(String[] databaseNames, DatabaseConfig dbConfig) throws AMQStoreException
+ public void openDatabases(DatabaseConfig dbConfig, String... databaseNames)
{
+ if (_state.get() != State.OPEN)
+ {
+ throw new IllegalStateException("Environment facade is not in opened state");
+ }
+
+ if (!_environment.isValid())
+ {
+ throw new IllegalStateException("Environment is not valid");
+ }
+
+ if (_environment.getState() != ReplicatedEnvironment.State.MASTER)
+ {
+ throw new IllegalStateException("Databases can only be opened on Master node");
+ }
+
+ for (String databaseName : databaseNames)
+ {
+ _databases.put(databaseName, new DatabaseHolder(dbConfig));
+ }
for (String databaseName : databaseNames)
{
- Database database = _environment.openDatabase(null, databaseName, dbConfig);
- _databases.put(databaseName, database);
+ DatabaseHolder holder = _databases.get(databaseName);
+ openDatabaseInternally(databaseName, holder);
}
}
+ private void openDatabaseInternally(String databaseName, DatabaseHolder holder)
+ {
+ LOGGER.debug("Opening database " + databaseName + " on " + _prettyGroupNodeName);
+ Database database = _environment.openDatabase(null, databaseName, holder.getConfig());
+ holder.setDatabase(database);
+ }
+
@Override
public Database getOpenDatabase(String name)
{
+ if (_state.get() != State.OPEN)
+ {
+ throw new IllegalStateException("Environment facade is not in opened state");
+ }
+
if (!_environment.isValid())
{
throw new IllegalStateException("Environment is not valid");
}
- Database database = _databases.get(name);
+ DatabaseHolder databaseHolder = _databases.get(name);
+ if (databaseHolder == null)
+ {
+ throw new IllegalArgumentException("Database with name '" + name + "' has never been requested to be opened");
+ }
+ Database database = databaseHolder.getDatabase();
if (database == null)
{
throw new IllegalArgumentException("Database with name '" + name + "' has not been opened");
@@ -298,32 +370,67 @@ public class ReplicatedEnvironmentFacade
}
@Override
- public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ public void stateChange(final StateChangeEvent stateChangeEvent)
+ {
+ _groupChangeExecutor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ stateChanged(stateChangeEvent);
+ }
+ });
+ }
+
+ private void stateChanged(StateChangeEvent stateChangeEvent)
{
ReplicatedEnvironment.State state = stateChangeEvent.getState();
- LOGGER.info("The node state is " + state);
+ LOGGER.info("The node '" + _prettyGroupNodeName + "' state is " + state);
if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER)
{
if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN))
{
- LOGGER.info("The environment facade is in open state");
+ LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName);
_joinTime = System.currentTimeMillis();
}
}
- if (_state.get() != State.CLOSING && _state.get() != State.CLOSED)
+
+ if (state == ReplicatedEnvironment.State.MASTER)
{
- StateChangeListener listener = _stateChangeListener;
+ reopenDatabases();
+ _commitThreadWrapper = startCommitThread(_name, _environment);
+ StateChangeListener listener = _stateChangeListener.get();
+ LOGGER.debug("Application state change listener " + listener);
if (listener != null)
{
listener.stateChange(stateChangeEvent);
}
}
+ else
+ {
+ if (_state.get() != State.CLOSING && _state.get() != State.CLOSED)
+ {
+ StateChangeListener listener = _stateChangeListener.get();
+ if (listener != null)
+ {
+ listener.stateChange(stateChangeEvent);
+ }
+ }
+ }
}
- public void setStateChangeListener(StateChangeListener listener)
+ private void reopenDatabases()
{
- _stateChangeListener = listener;
- _environment.setStateChangeListener(this);
+ DatabaseConfig pingDbConfig = new DatabaseConfig();
+ pingDbConfig.setTransactional(true);
+ pingDbConfig.setAllowCreate(true);
+
+ _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig));
+
+ for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
+ {
+ openDatabaseInternally(entry.getKey(), entry.getValue());
+ }
}
public String getName()
@@ -420,7 +527,7 @@ public class ReplicatedEnvironmentFacade
if (LOGGER.isInfoEnabled())
{
- LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group");
+ LOGGER.info("Node " + _prettyGroupNodeName + " successfully set as designated primary for group");
}
}
@@ -483,19 +590,27 @@ public class ReplicatedEnvironmentFacade
return _state.get();
}
- /**
- * Sets the replication group listener. Whenever a new listener is set, the listener
- * will hear {@link ReplicationGroupListener#onReplicationNodeRecovered(org.apache.qpid.server.model.ReplicationNode)
- * for every existing remote node.
- *
- * @param replicationGroupListener listener
- */
public void setReplicationGroupListener(ReplicationGroupListener replicationGroupListener)
{
- _replicationGroupListener = replicationGroupListener;
- if (_replicationGroupListener != null)
+ if (_replicationGroupListener.compareAndSet(null, replicationGroupListener))
+ {
+ notifyExistingRemoteReplicationNodes(replicationGroupListener);
+ }
+ else
+ {
+ throw new IllegalStateException("ReplicationGroupListener is already set on " + _prettyGroupNodeName);
+ }
+ }
+
+ public void setStateChangeListener(StateChangeListener stateChangeListener)
+ {
+ if (_stateChangeListener.compareAndSet(null, stateChangeListener))
{
- notifyExistingRemoteReplicationNodes(_replicationGroupListener);
+ _environment.setStateChangeListener(this);
+ }
+ else
+ {
+ throw new IllegalStateException("StateChangeListener is already set on " + _prettyGroupNodeName);
}
}
@@ -604,22 +719,16 @@ public class ReplicatedEnvironmentFacade
stopCommitThread(dbe);
- Set<String> databaseNames = new HashSet<String>(_databases.keySet());
closeEnvironmentSafely();
_environment = createEnvironment();
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- // TODO Alex and I think this should be removed.
- openDatabases(databaseNames.toArray(new String[databaseNames.size()]), dbConfig);
-
- _commitThreadWrapper = startCommitThread(_name, _environment);
-
- _environment.setStateChangeListener(this);
+ if (_stateChangeListener.get() != null)
+ {
+ _environment.setStateChangeListener(this);
+ }
LOGGER.info("Environment is restarted");
-
}
private void closeEnvironmentSafely()
@@ -652,21 +761,32 @@ public class ReplicatedEnvironmentFacade
private void closeDatabases()
{
RuntimeException firstThrownException = null;
- for (Database database : _databases.values())
+ LOGGER.debug("Closing databases " + _databases);
+ for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
{
- try
- {
- database.close();
- }
- catch(RuntimeException e)
+ DatabaseHolder databaseHolder = entry.getValue();
+ Database database = databaseHolder.getDatabase();
+ if (database != null)
{
- if (firstThrownException == null)
+ try
+ {
+ LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName);
+ database.close();
+ }
+ catch(RuntimeException e)
+ {
+ LOGGER.error("Failed to close database on " + _prettyGroupNodeName, e);
+ if (firstThrownException == null)
+ {
+ firstThrownException = e;
+ }
+ }
+ finally
{
- firstThrownException = e;
+ databaseHolder.setDatabase(null);
}
}
}
- _databases.clear();
if (firstThrownException != null)
{
throw firstThrownException;
@@ -756,7 +876,7 @@ public class ReplicatedEnvironmentFacade
}
ReplicatedEnvironment env = _environment;
- ReplicationGroupListener replicationGroupListener = _replicationGroupListener;
+ ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get();
if (env != null && env.isValid())
{
ReplicationGroup group = env.getGroup();
@@ -814,6 +934,7 @@ public class ReplicatedEnvironmentFacade
//TODO: move the class into external class
private class RemoteNodeStateLearner implements Callable<Void>
{
+ private Map<String, String> _previousGroupState = Collections.emptyMap();
@Override
public Void call()
{
@@ -821,9 +942,8 @@ public class ReplicatedEnvironmentFacade
try
{
Set<Future<Void>> futures = new HashSet<Future<Void>>();
- for (Map.Entry<String, RemoteReplicationNode> entry : _remoteReplicationNodes.entrySet())
+ for (final RemoteReplicationNode node : _remoteReplicationNodes.values())
{
- final RemoteReplicationNode node = entry.getValue();
Future<Void> future = _groupChangeExecutor.submit(new Callable<Void>()
{
@Override
@@ -856,6 +976,21 @@ public class ReplicatedEnvironmentFacade
future.cancel(true);
}
}
+
+ if (ReplicatedEnvironment.State.MASTER == _environment.getState())
+ {
+ Map<String, String> currentGroupState = new HashMap<String, String>();
+ for (final RemoteReplicationNode node : _remoteReplicationNodes.values())
+ {
+ currentGroupState.put(node.getName(), (String)node.getAttribute(org.apache.qpid.server.model.ReplicationNode.ROLE));
+ }
+ boolean stateChanged = !_previousGroupState.equals(currentGroupState);
+ _previousGroupState = currentGroupState;
+ if (stateChanged && State.OPEN == _state.get())
+ {
+ new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this);
+ }
+ }
}
finally
{
@@ -876,7 +1011,7 @@ public class ReplicatedEnvironmentFacade
public static enum State
{
- INITIAL,
+ INITIAL, // TODO unused remove
OPENING,
OPEN,
RESTARTING,
@@ -884,4 +1019,36 @@ public class ReplicatedEnvironmentFacade
CLOSED
}
+ private static class DatabaseHolder
+ {
+ private final DatabaseConfig _config;
+ private Database _database;
+
+ public DatabaseHolder(DatabaseConfig config)
+ {
+ _config = config;
+ }
+
+ public Database getDatabase()
+ {
+ return _database;
+ }
+
+ public void setDatabase(Database database)
+ {
+ _database = database;
+ }
+
+ public DatabaseConfig getConfig()
+ {
+ return _config;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DatabaseHolder [_config=" + _config + ", _database=" + _database + "]";
+ }
+
+ }
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java Tue Jan 21 16:57:19 2014
@@ -25,9 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.ReplicationNode;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.berkeleydb.replication.LocalReplicationNode;
import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
@@ -38,6 +36,7 @@ import com.sleepycat.je.Durability.SyncP
public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
{
+
@Override
public EnvironmentFacade createEnvironmentFacade(String name, String storeLocation, VirtualHost virtualHost)
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Tue Jan 21 16:57:19 2014
@@ -234,7 +234,7 @@ public class StandardEnvironmentFacade i
}
@Override
- public void openDatabases(String[] databaseNames, DatabaseConfig dbConfig)
+ public void openDatabases(DatabaseConfig dbConfig, String... databaseNames)
{
for (String databaseName : databaseNames)
{
Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java?rev=1560094&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java Tue Jan 21 16:57:19 2014
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Transaction;
+
+public class DatabasePinger
+{
+ public static final String PING_DATABASE_NAME = "PINGDB";
+ private static final int ID = 0;
+
+
+ public void pingDb(EnvironmentFacade facade)
+ {
+ try
+ {
+ final Database db = facade.getOpenDatabase(PING_DATABASE_NAME);
+
+ DatabaseEntry key = new DatabaseEntry();
+ IntegerBinding.intToEntry(ID, key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ LongBinding.longToEntry(System.currentTimeMillis(), value);
+ Transaction txn = null;
+ try
+ {
+ txn = facade.getEnvironment().beginTransaction(null, null);
+ db.put(txn, key, value);
+ txn.commit();
+ txn = null;
+ }
+ finally
+ {
+ try
+ {
+ if (txn != null)
+ {
+ txn.abort();
+ }
+ }
+ finally
+ {
+ db.close();
+ }
+ }
+ }
+ catch (DatabaseException de)
+ {
+ facade.handleDatabaseException("DatabaseException from DatabasePinger ", de);
+ }
+ }
+}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java Tue Jan 21 16:57:19 2014
@@ -223,12 +223,12 @@ public class RemoteReplicationNode exten
catch (IOException e)
{
_role = com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN.name();
- LOGGER.warn("Cannot connect to node " + _replicationNode.getName() + " from " + _groupName, e);
+ //LOGGER.warn("Cannot connect to node " + _replicationNode.getName() + " from " + _groupName, e);
}
catch (ServiceConnectFailedException e)
{
_role = com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN.name();
- LOGGER.warn("Cannot retrieve the node details for node " + _replicationNode.getName() + " from " + _groupName, e);
+ //LOGGER.warn("Cannot retrieve the node details for node " + _replicationNode.getName() + " from " + _groupName, e);
}
if (!_role.equals(oldRole))
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java Tue Jan 21 16:57:19 2014
@@ -21,9 +21,11 @@
package org.apache.qpid.server.store.berkeleydb.upgrade;
import com.sleepycat.je.Cursor;
+
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
@@ -38,6 +40,8 @@ import com.sleepycat.je.OperationStatus;
public class Upgrader
{
+ private static final Logger LOGGER = Logger.getLogger(Upgrader.class);
+
static final String VERSION_DB_NAME = "DB_VERSION";
private Environment _environment;
@@ -63,6 +67,7 @@ public class Upgrader
if(versionDb.count() == 0L)
{
+
int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion();
DatabaseEntry key = new DatabaseEntry();
IntegerBinding.intToEntry(sourceVersion, key);
@@ -73,6 +78,12 @@ public class Upgrader
}
int version = getSourceVersion(versionDb);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Source message store version is " + version);
+ }
+
if(version > BDBMessageStore.VERSION)
{
throw new AMQStoreException("Database version " + version
@@ -178,7 +189,7 @@ public class Upgrader
private int identifyOldStoreVersion() throws DatabaseException
{
- int version = 0;
+ int version = BDBMessageStore.VERSION;
for (String databaseName : _environment.getDatabaseNames())
{
if (databaseName.contains("_v"))
Copied: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java (from r1559960, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java&r1=1559960&r2=1560094&rev=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java Tue Jan 21 16:57:19 2014
@@ -20,15 +20,23 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import java.util.Collections;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.replication.ReplicationGroupListener;
-public class StandardEnvironmentFacadeTest extends EnvironmentFacadeTestCase
+class NoopReplicationGroupListener implements ReplicationGroupListener
{
+ @Override
+ public void onReplicationNodeRecovered(ReplicationNode node)
+ {
+ }
@Override
- EnvironmentFacade createEnvironmentFacade()
+ public void onReplicationNodeAddedToGroup(ReplicationNode node)
{
- return new StandardEnvironmentFacade(getName(), _storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
}
-}
+ @Override
+ public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+ {
+ }
+}
\ No newline at end of file
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java Tue Jan 21 16:57:19 2014
@@ -28,23 +28,17 @@ import static org.apache.qpid.server.mod
import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
import static org.apache.qpid.server.model.ReplicationNode.NAME;
import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -56,51 +50,31 @@ import org.apache.qpid.server.replicatio
import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
-import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Environment;
-import com.sleepycat.je.Transaction;
import com.sleepycat.je.rep.InsufficientReplicasException;
import com.sleepycat.je.rep.ReplicatedEnvironment.State;
import com.sleepycat.je.rep.ReplicationConfig;
import com.sleepycat.je.rep.StateChangeEvent;
import com.sleepycat.je.rep.StateChangeListener;
-public class ReplicatedEnvironmentFacadeTest extends EnvironmentFacadeTestCase
+public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
-
- private static class NoopReplicationGroupListener implements ReplicationGroupListener
- {
- @Override
- public void onReplicationNodeRecovered(ReplicationNode node)
- {
- }
-
- @Override
- public void onReplicationNodeAddedToGroup(ReplicationNode node)
- {
- }
-
- @Override
- public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
- {
- }
- }
-
+ protected File _storePath;
private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
- private static final TimeUnit WAIT_STATE_CHANGE_TIME_UNIT = TimeUnit.SECONDS;
+ private static final int LISTENER_TIMEOUT = 5;
private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
private static final String TEST_GROUP_NAME = "testGroupName";
private static final String TEST_NODE_NAME = "testNodeName";
private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT;
private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT;
private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString();
- private static final boolean TEST_DESIGNATED_PRIMARY = true;
+ private static final boolean TEST_DESIGNATED_PRIMARY = false;
private static final boolean TEST_COALESCING_SYNC = true;
private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
private VirtualHost _virtualHost = mock(VirtualHost.class);
@@ -110,6 +84,8 @@ public class ReplicatedEnvironmentFacade
{
super.setUp();
+ _storePath = TestFileUtils.createTestDirectory("bdb", true);
+
when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL)).thenReturn(100L);
when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT)).thenReturn(100L);
}
@@ -126,58 +102,122 @@ public class ReplicatedEnvironmentFacade
}
finally
{
- super.tearDown();
+ try
+ {
+ if (_storePath != null)
+ {
+ FileUtils.delete(_storePath, true);
+ }
+ }
+ finally
+ {
+ super.tearDown();
+ }
}
}
+ public void testEnvironmentFacade() throws Exception
+ {
+ EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster();
+ assertNotNull("Environment should not be null", ef);
+ Environment e = ef.getEnvironment();
+ assertTrue("Environment is not valid", e.isValid());
+ }
+
+ public void testClose() throws Exception
+ {
+ EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster();
+ ef.close();
+ Environment e = ef.getEnvironment();
+
+ assertNull("Environment should be null after facade close", e);
+ }
+
+ public void testOpenDatabases() throws Exception
+ {
+ EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1", "test2");
+ Database test1 = ef.getOpenDatabase("test1");
+ Database test2 = ef.getOpenDatabase("test2");
+
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
+ }
- public void testGetName()
+ public void testGetOpenDatabaseForNonExistingDatabase() throws Exception
{
- assertEquals("Unexpected name", getName(), getEnvironmentFacade().getName());
+ EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1");
+ Database test1 = ef.getOpenDatabase("test1");
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ try
+ {
+ ef.getOpenDatabase("test2");
+ fail("An exception should be thrown for the non existing database");
+ }
+ catch(IllegalArgumentException e)
+ {
+ assertEquals("Unexpected exception message", "Database with name 'test2' has never been requested to be opened", e.getMessage());
+ }
}
- public void testGetGroupName()
+
+ public void testGetName() throws Exception
{
- assertEquals("Unexpected group name", TEST_GROUP_NAME, getEnvironmentFacade().getGroupName());
+ assertEquals("Unexpected name", getName(), ((ReplicatedEnvironmentFacade) createMaster()).getName());
}
- public void testGetNodeName()
+ public void testGetGroupName() throws Exception
{
- assertEquals("Unexpected group name", TEST_NODE_NAME, getEnvironmentFacade().getNodeName());
+ assertEquals("Unexpected group name", TEST_GROUP_NAME, ((ReplicatedEnvironmentFacade) createMaster()).getGroupName());
}
- public void testGetNodeHostPort()
+ public void testGetNodeName() throws Exception
{
- assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, getEnvironmentFacade().getHostPort());
+ assertEquals("Unexpected group name", TEST_NODE_NAME, ((ReplicatedEnvironmentFacade) createMaster()).getNodeName());
}
- public void testGetHelperHostPort()
+ public void testGetNodeHostPort() throws Exception
{
- assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, getEnvironmentFacade().getHelperHostPort());
+ assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, ((ReplicatedEnvironmentFacade) createMaster()).getHostPort());
}
- public void testGetDurability()
+ public void testGetHelperHostPort() throws Exception
{
- assertEquals("Unexpected durability", TEST_DURABILITY.toString(), getEnvironmentFacade().getDurability());
+ assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, ((ReplicatedEnvironmentFacade) createMaster()).getHelperHostPort());
}
- public void testIsCoalescingSync()
+ public void testGetDurability() throws Exception
{
- assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, getEnvironmentFacade().isCoalescingSync());
+ assertEquals("Unexpected durability", TEST_DURABILITY.toString(), ((ReplicatedEnvironmentFacade) createMaster()).getDurability());
}
- public void testGetNodeState()
+ public void testIsCoalescingSync() throws Exception
{
- assertEquals("Unexpected state", State.MASTER.name(), getEnvironmentFacade().getNodeState());
+ assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, ((ReplicatedEnvironmentFacade) createMaster()).isCoalescingSync());
}
- public void testIsDesignatedPrimary()
+ public void testGetNodeState() throws Exception
{
- assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, getEnvironmentFacade().isDesignatedPrimary());
+ assertEquals("Unexpected state", State.MASTER.name(), ((ReplicatedEnvironmentFacade) createMaster()).getNodeState());
}
- public void testGetGroupMembers()
+ public void testIsDesignatedPrimary() throws Exception
{
- List<Map<String, String>> groupMembers = getEnvironmentFacade().getGroupMembers();
+ ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster();
+ assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
+ master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY);
+ assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
+ }
+
+ public void testGetGroupMembers() throws Exception
+ {
+ List<Map<String, String>> groupMembers = ((ReplicatedEnvironmentFacade) createMaster()).getGroupMembers();
Map<String, String> expectedMember = new HashMap<String, String>();
expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
@@ -187,29 +227,36 @@ public class ReplicatedEnvironmentFacade
public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception
{
- getEnvironmentFacade();
+ ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster();
String nodeName2 = TEST_NODE_NAME + "_2";
String host = "localhost";
int port = getNextAvailable(TEST_NODE_PORT + 1);
String node2NodeHostPort = host + ":" + port;
- ReplicatedEnvironmentFacade replicatedEnvironmentFacade2 = joinReplica(nodeName2, node2NodeHostPort);
- List<Map<String, String>> groupMembers = replicatedEnvironmentFacade2.getGroupMembers();
+ final AtomicInteger invocationCount = new AtomicInteger();
+ final CountDownLatch nodeRecoveryLatch = new CountDownLatch(1);
+ ReplicationGroupListener listener = new NoopReplicationGroupListener()
+ {
+ @Override
+ public void onReplicationNodeRecovered(ReplicationNode node)
+ {
+ nodeRecoveryLatch.countDown();
+ invocationCount.incrementAndGet();
+ }
+ };
+
+ addReplica(nodeName2, node2NodeHostPort, listener);
+
+ List<Map<String, String>> groupMembers = master.getGroupMembers();
assertEquals("Unexpected number of nodes", 2, groupMembers.size());
- ReplicationGroupListener listener = mock(ReplicationGroupListener.class);
- replicatedEnvironmentFacade2.setReplicationGroupListener(listener);
- verify(listener).onReplicationNodeRecovered(any(RemoteReplicationNode.class));
+ assertTrue("Listener not fired within timeout", nodeRecoveryLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+ assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
}
public void testReplicationGroupListenerHearsNodeAdded() throws Exception
{
- ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getEnvironmentFacade();
-
- List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
- assertEquals("Unexpected number of nodes at start of test", 1, initialGroupMembers.size());
-
- final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
final AtomicInteger invocationCount = new AtomicInteger();
ReplicationGroupListener listener = new NoopReplicationGroupListener()
{
@@ -217,16 +264,22 @@ public class ReplicatedEnvironmentFacade
public void onReplicationNodeAddedToGroup(ReplicationNode node)
{
invocationCount.getAndIncrement();
- latch.countDown();
+ nodeAddedLatch.countDown();
}
};
- replicatedEnvironmentFacade.setReplicationGroupListener(listener);
+
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
+ assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
+ assertEquals("Unexpected number of nodes at start of test", 1, initialGroupMembers.size());
String node2Name = TEST_NODE_NAME + "_2";
String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
- joinReplica(node2Name, node2NodeHostPort);
+ addReplica(node2Name, node2NodeHostPort);
- assertTrue("Listener not fired within timeout", latch.await(5, TimeUnit.SECONDS));
+ assertTrue("Listener not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
List<Map<String, String>> groupMembers = replicatedEnvironmentFacade.getGroupMembers();
assertEquals("Unexpected number of nodes", 2, groupMembers.size());
@@ -236,14 +289,6 @@ public class ReplicatedEnvironmentFacade
public void testReplicationGroupListenerHearsNodeRemoved() throws Exception
{
- ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getEnvironmentFacade();
- String node2Name = TEST_NODE_NAME + "_2";
- String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
- joinReplica(node2Name, node2NodeHostPort);
-
- List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
- assertEquals("Unexpected number of nodes at start of test", 2, initialGroupMembers.size());
-
final CountDownLatch nodeDeletedLatch = new CountDownLatch(1);
final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
final AtomicInteger invocationCount = new AtomicInteger();
@@ -269,15 +314,24 @@ public class ReplicatedEnvironmentFacade
}
};
- replicatedEnvironmentFacade.setReplicationGroupListener(listener);
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
+ assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ String node2Name = TEST_NODE_NAME + "_2";
+ String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
+ addReplica(node2Name, node2NodeHostPort);
+
+ List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
+ assertEquals("Unexpected number of nodes at start of test", 2, initialGroupMembers.size());
// Need to await the listener hearing the addition of the node to the model.
- assertTrue("Node add not fired within timeout", nodeAddedLatch.await(5, TimeUnit.SECONDS));
+ assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
// Now remove the node and ensure we hear the event
replicatedEnvironmentFacade.removeNodeFromGroup(node2Name);
- assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(5, TimeUnit.SECONDS));
+ assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
List<Map<String, String>> groupMembers = replicatedEnvironmentFacade.getGroupMembers();
assertEquals("Unexpected number of nodes after node removal", 1, groupMembers.size());
@@ -300,17 +354,18 @@ public class ReplicatedEnvironmentFacade
}
};
- ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getEnvironmentFacade();
- replicatedEnvironmentFacade.setReplicationGroupListener(listener);
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
+ assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
String node2Name = TEST_NODE_NAME + "_2";
String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
- joinReplica(node2Name, node2NodeHostPort);
+ addReplica(node2Name, node2NodeHostPort);
List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
assertEquals("Unexpected number of nodes at start of test", 2, initialGroupMembers.size());
- assertTrue("Node add not fired within timeout", nodeAddedLatch.await(5, TimeUnit.SECONDS));
+ assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
RemoteReplicationNode remoteNode = (RemoteReplicationNode)nodeRef.get();
assertEquals("Unexpcted node name", node2Name, remoteNode.getName());
@@ -329,298 +384,198 @@ public class ReplicatedEnvironmentFacade
public void testRemoveNodeFromGroup() throws Exception
{
- ReplicatedEnvironmentFacade environmentFacade = getEnvironmentFacade();
- String nodeName = TEST_NODE_NAME + "_2";
- ReplicatedEnvironmentFacade ref2 = joinReplica(nodeName, "localhost:" + getNextAvailable(TEST_NODE_PORT + 1));
+ ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) createMaster();
+
+ String node2Name = TEST_NODE_NAME + "_2";
+ String node2NodeHostPort = "localhost:" + getNextAvailable(TEST_NODE_PORT + 1);
+ ReplicatedEnvironmentFacade ref2 = addReplica(node2Name, node2NodeHostPort);
+
List<Map<String, String>> groupMembers = environmentFacade.getGroupMembers();
assertEquals("Unexpected group members count", 2, groupMembers.size());
ref2.close();
- environmentFacade.removeNodeFromGroup(nodeName);
+ environmentFacade.removeNodeFromGroup(node2Name);
groupMembers = environmentFacade.getGroupMembers();
assertEquals("Unexpected group members count", 1, groupMembers.size());
}
- public void testSetDesignatedPrimary() throws AMQStoreException
+ public void testSetDesignatedPrimary() throws Exception
{
- ReplicatedEnvironmentFacade environmentFacade = getEnvironmentFacade();
+ ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) createMaster();
environmentFacade.setDesignatedPrimary(false);
assertFalse("Unexpected designated primary", environmentFacade.isDesignatedPrimary());
}
- public void testGetNodePriority()
+ public void testGetNodePriority() throws Exception
{
- assertEquals("Unexpected node priority", 1, getEnvironmentFacade().getPriority());
+ assertEquals("Unexpected node priority", 1, ((ReplicatedEnvironmentFacade) createMaster()).getPriority());
}
- public void testGetElectableGroupSizeOverride()
+ public void testGetElectableGroupSizeOverride() throws Exception
{
- assertEquals("Unexpected Electable Group Size Override", 0, getEnvironmentFacade().getElectableGroupSizeOverride());
+ assertEquals("Unexpected Electable Group Size Override", 0, ((ReplicatedEnvironmentFacade) createMaster()).getElectableGroupSizeOverride());
}
public void testEnvironmentRestartOnInsufficientReplicas() throws Exception
{
- ReplicatedEnvironmentFacade[] nodes = startClusterSequentially(3);
- ReplicatedEnvironmentFacade environmentFacade = nodes[0];
+ ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster();
+
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String replica1NodeName = TEST_NODE_NAME + "_1";
+ String replica1NodeHostPort = "localhost:" + replica1Port;
+ ReplicatedEnvironmentFacade replica1 = addReplica(replica1NodeName, replica1NodeHostPort);
+
+ int replica2Port = getNextAvailable(replica1Port + 1);
+ String replica2NodeName = TEST_NODE_NAME + "_2";
+ String replica2NodeHostPort = "localhost:" + replica2Port;
+ ReplicatedEnvironmentFacade replica2 = addReplica(replica2NodeName, replica2NodeHostPort);
String databaseName = "test";
- DatabaseConfig dbConfig = createDatabase(environmentFacade, databaseName);
+
+ DatabaseConfig dbConfig = createDatabase(master, databaseName);
// close replicas
- nodes[1].close();
- nodes[2].close();
+ replica1.close();
+ replica2.close();
- final CountDownLatch nodeAwaitLatch = new CountDownLatch(1);
- Environment e = environmentFacade.getEnvironment();
- Database db = environmentFacade.getOpenDatabase(databaseName);
+ Environment e = master.getEnvironment();
+ Database db = master.getOpenDatabase(databaseName);
try
{
- environmentFacade.openDatabases(new String[] { "test2" }, dbConfig);
+ master.openDatabases(dbConfig, "test2");
fail("Opening of new database without quorum should fail");
}
catch(InsufficientReplicasException ex)
{
- environmentFacade.handleDatabaseException(null, ex);
+ master.handleDatabaseException(null, ex);
}
- // restore quorum
- nodes[1] = joinReplica(TEST_NODE_NAME + "_1", nodes[1].getHostPort());
- nodes[2] = joinReplica(TEST_NODE_NAME + "_2", nodes[2].getHostPort());
+ replica1 = addReplica(replica1NodeName, replica1NodeHostPort);
+ replica2 = addReplica(replica2NodeName, replica2NodeHostPort);
- environmentFacade.setStateChangeListener(new StateChangeListener()
+ // Need to poll to await the remote node updating itself
+ long timeout = System.currentTimeMillis() + 5000;
+ while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout)
{
- @Override
- public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
- {
- if (stateChangeEvent.getState() == State.MASTER || stateChangeEvent.getState() == State.REPLICA)
- {
- nodeAwaitLatch.countDown();
- }
- }
- });
+ Thread.sleep(200);
+ }
- assertTrue("The node could not rejoin the cluster",
- nodeAwaitLatch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
+ assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(),
+ State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) );
- Environment e2 = environmentFacade.getEnvironment();
+ Environment e2 = master.getEnvironment();
assertNotSame("Environment has not been restarted", e2, e);
- Database db1 = environmentFacade.getOpenDatabase(databaseName);
+ Database db1 = master.getOpenDatabase(databaseName);
assertNotSame("Database should be the re-created", db1, db);
}
- public void testEnvironmentIsRestartOnlyOnceOnInsufficientReplicas() throws Exception
+ public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception
{
- ReplicatedEnvironmentFacade[] nodes = startClusterSequentially(3);
- final ReplicatedEnvironmentFacade environmentFacade = nodes[0];
-
- int numberOfThreads = 100;
-
- // restart counter
- final AtomicInteger numberOfTimesElected = new AtomicInteger();
- environmentFacade.setStateChangeListener(new StateChangeListener()
+ final CountDownLatch masterLatch = new CountDownLatch(1);
+ final AtomicInteger masterStateChangeCount = new AtomicInteger();
+ final CountDownLatch unknownLatch = new CountDownLatch(1);
+ final AtomicInteger unknownStateChangeCount = new AtomicInteger();
+ StateChangeListener stateChangeListener = new StateChangeListener()
{
@Override
public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
{
if (stateChangeEvent.getState() == State.MASTER)
{
- numberOfTimesElected.incrementAndGet();
+ masterStateChangeCount.incrementAndGet();
+ masterLatch.countDown();
+ }
+ else if (stateChangeEvent.getState() == State.UNKNOWN)
+ {
+ unknownStateChangeCount.incrementAndGet();
+ unknownLatch.countDown();
}
}
- });
+ };
- String databaseName = "test";
- createDatabase(environmentFacade, databaseName);
- final CountDownLatch latch = new CountDownLatch(numberOfThreads);
+ addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
+ assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
- final Database db = environmentFacade.getOpenDatabase(databaseName);
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node1NodeHostPort = "localhost:" + replica1Port;
+ int replica2Port = getNextAvailable(replica1Port + 1);
+ String node2NodeHostPort = "localhost:" + replica2Port;
- // close replicas
- nodes[1].close();
- nodes[2].close();
+ ReplicatedEnvironmentFacade replica1 = addReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+ ReplicatedEnvironmentFacade replica2 = addReplica(TEST_NODE_NAME + "_2", node2NodeHostPort);
- // perform transactions in separate threads in order to provoke InsufficientReplicasException
- ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);
- try
- {
- List<Callable<Void>> tasks = new ArrayList<Callable<Void>>();
- for (int i = 0; i < numberOfThreads; i++)
- {
- final int index = i;
- tasks.add(new Callable<Void>(){
-
- @Override
- public Void call() throws Exception
- {
- try
- {
- Transaction tx = environmentFacade.getEnvironment().beginTransaction(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- IntegerBinding.intToEntry(index, key);
- IntegerBinding.intToEntry(index, data);
- db.put(tx, key, data);
- tx.commit();
- }
- catch(DatabaseException e)
- {
- _environmentFacade.handleDatabaseException("Exception", e);
- }
- finally
- {
- latch.countDown();
- }
- return null;
- }});
- }
- service.invokeAll(tasks);
- assertTrue("Not all tasks have been executed",
- latch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
- }
- finally
- {
- service.shutdown();
- }
+ // close replicas
+ replica1.close();
+ replica2.close();
- // restore quorum
- nodes[1] = joinReplica(TEST_NODE_NAME + "_1", nodes[1].getHostPort());
- nodes[2] = joinReplica(TEST_NODE_NAME + "_2", nodes[2].getHostPort());
+ assertTrue("Environment should be recreated and go into unknown state",
+ unknownLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
- long start = System.currentTimeMillis();
- while(environmentFacade.getFacadeState() != ReplicatedEnvironmentFacade.State.OPEN && System.currentTimeMillis() - start < 10000l)
- {
- Thread.sleep(1000l);
- }
- assertEquals("EnvironmentFacade should be in open state", ReplicatedEnvironmentFacade.State.OPEN, environmentFacade.getFacadeState());
-
- // it should be elected twice: once on first start-up and second time after environment restart
- assertEquals("Elected master unexpected number of times", 2, numberOfTimesElected.get());
+ assertEquals("Node made master an unexpected number of times", 1, masterStateChangeCount.get());
+ assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get());
+
+ // restart other nodes
+ // check state of node 1 is either MASTER or REPLICA
}
- public void testFacadeStateTransitions() throws InterruptedException
+ public void testCloseStateTransitions() throws Exception
{
- String nodeName = "node1";
- final String nodePath = createNodeWorkingFolder(nodeName);
- ReplicatedEnvironmentFacade ref = null;
- try
- {
- ref = createReplicatedEnvironmentFacade(nodePath, nodeName, TEST_NODE_HOST_PORT, false);
- assertEquals("Unexpected state " + ref.getFacadeState(), ReplicatedEnvironmentFacade.State.OPENING, ref.getFacadeState());
-
- final CountDownLatch nodeAwaitLatch = new CountDownLatch(1);
- ref.setStateChangeListener(new StateChangeListener()
- {
- @Override
- public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
- {
- if (stateChangeEvent.getState() == State.MASTER)
- {
- nodeAwaitLatch.countDown();
- }
- }
- });
- assertTrue("Node did not join the cluster", nodeAwaitLatch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
- assertEquals("Unexpected state " + ref.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, ref.getFacadeState());
- ref.close();
- assertEquals("Unexpected state " + ref.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, ref.getFacadeState());
- }
- finally
- {
- if (ref != null)
- {
- ref.close();
- }
- }
- }
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster();
- @Override
- EnvironmentFacade createEnvironmentFacade()
- {
- try
- {
- return startNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, State.MASTER);
- }
- catch (InterruptedException e)
- {
- Thread.interrupted();
- throw new RuntimeException(e);
- }
+ assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState());
+ replicatedEnvironmentFacade.close();
+ assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState());
}
- @Override
- ReplicatedEnvironmentFacade getEnvironmentFacade()
+ private ReplicatedEnvironmentFacade createMaster() throws Exception
{
- return (ReplicatedEnvironmentFacade) super.getEnvironmentFacade();
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
+ assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+ return env;
}
- private ReplicatedEnvironmentFacade joinReplica(final String nodeName, final String hostPort) throws InterruptedException
+ private ReplicatedEnvironmentFacade addReplica(String nodeName, String nodeHostPort) throws Exception
{
- return startNode(nodeName, hostPort, false, State.REPLICA);
+ return addReplica(nodeName, nodeHostPort, new NoopReplicationGroupListener());
}
- private ReplicatedEnvironmentFacade startNode(String nodeName, String nodeHostPort, boolean designatedPrimary, State targetState)
- throws InterruptedException
+ private ReplicatedEnvironmentFacade addReplica(String nodeName, String nodeHostPort, ReplicationGroupListener replicationGroupListener)
+ throws Exception
{
- final String nodePath = createNodeWorkingFolder(nodeName);
- final CountDownLatch _nodeAwaitLatch = new CountDownLatch(1);
- ReplicatedEnvironmentFacade ref = join(nodeName, nodePath, nodeHostPort, designatedPrimary, _nodeAwaitLatch, targetState);
- assertTrue("Node did not join the cluster", _nodeAwaitLatch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
- return ref;
+ TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA);
+ ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, replicationGroupListener);
+ assertTrue("Replica " + nodeName + " was not started", testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+ return replicaEnvironmentFacade;
}
private String createNodeWorkingFolder(String nodeName)
{
File nodeLocation = new File(_storePath, nodeName);
- nodeLocation.mkdirs();
- final String nodePath = nodeLocation.getAbsolutePath();
- return nodePath;
- }
-
- private ReplicatedEnvironmentFacade join(String nodeName, String nodePath, String nodeHostPort, boolean designatedPrimary,
- final CountDownLatch nodeAwaitLatch, final State expectedState)
- {
- ReplicatedEnvironmentFacade ref = createReplicatedEnvironmentFacade(nodePath, nodeName, nodeHostPort, designatedPrimary);
-
- if (expectedState == State.REPLICA)
+ if (!nodeLocation.exists())
{
- _nodes.put(nodeName, ref);
+ nodeLocation.mkdirs();
}
- ref.setStateChangeListener(new StateChangeListener()
- {
- @Override
- public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
- {
- if (stateChangeEvent.getState() == expectedState)
- {
- nodeAwaitLatch.countDown();
- }
- }
- });
- return ref;
+ final String nodePath = nodeLocation.getAbsolutePath();
+ return nodePath;
}
- private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodePath, String nodeName, String nodeHostPort,
- boolean designatedPrimary)
+ private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
+ State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
{
+ final String nodePath = createNodeWorkingFolder(nodeName);
ReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary);
- return new ReplicatedEnvironmentFacade(getName(), nodePath, node, _remoteReplicationNodeFactory);
+ ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(getName(), nodePath, node, _remoteReplicationNodeFactory);
+ ref.setReplicationGroupListener(replicationGroupListener);
+ ref.setStateChangeListener(stateChangeListener);
+ _nodes.put(nodeName, ref);
+ return ref;
}
- private ReplicatedEnvironmentFacade[] startClusterSequentially(int nodeNumber) throws InterruptedException
+ private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener groupChangeListener)
{
- // master
- ReplicatedEnvironmentFacade environmentFacade = getEnvironmentFacade();
- ReplicatedEnvironmentFacade[] nodes = new ReplicatedEnvironmentFacade[nodeNumber];
- nodes[0] = environmentFacade;
-
- int nodePort = TEST_NODE_PORT;
- for (int i = 1; i < nodeNumber; i++)
- {
- nodePort = getNextAvailable(nodePort + 1);
- nodes[i] = joinReplica(TEST_NODE_NAME + "_" + i, "localhost:" + nodePort);
- }
- return nodes;
+ return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener, groupChangeListener);
}
private DatabaseConfig createDatabase(ReplicatedEnvironmentFacade environmentFacade, String databaseName) throws AMQStoreException
@@ -628,7 +583,7 @@ public class ReplicatedEnvironmentFacade
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
dbConfig.setAllowCreate(true);
- environmentFacade.openDatabases(new String[] { databaseName }, dbConfig);
+ environmentFacade.openDatabases(dbConfig, databaseName);
return dbConfig;
}
@@ -650,5 +605,4 @@ public class ReplicatedEnvironmentFacade
when(node.getAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig);
return node;
}
-
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java Tue Jan 21 16:57:19 2014
@@ -20,12 +20,107 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import java.io.File;
import java.util.Collections;
-public class StandardEnvironmentFacadeTest extends EnvironmentFacadeTestCase
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Environment;
+
+public class StandardEnvironmentFacadeTest extends QpidTestCase
{
+ protected File _storePath;
+ protected EnvironmentFacade _environmentFacade;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _storePath = TestFileUtils.createTestDirectory("bdb", true);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ if (_environmentFacade != null)
+ {
+ _environmentFacade.close();
+ }
+ }
+ finally
+ {
+ if (_storePath != null)
+ {
+ FileUtils.delete(_storePath, true);
+ }
+ }
+ }
+
+ public void testEnvironmentFacade() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ assertNotNull("Environment should not be null", ef);
+ Environment e = ef.getEnvironment();
+ assertTrue("Environment is not valid", e.isValid());
+ }
+
+ public void testClose() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ ef.close();
+ Environment e = ef.getEnvironment();
+
+ assertNull("Environment should be null after facade close", e);
+ }
+
+ public void testOpenDatabases() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1", "test2");
+ Database test1 = ef.getOpenDatabase("test1");
+ Database test2 = ef.getOpenDatabase("test2");
+
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
+ }
+
+ public void testGetOpenDatabaseForNonExistingDatabase() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1");
+ Database test1 = ef.getOpenDatabase("test1");
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ try
+ {
+ ef.getOpenDatabase("test2");
+ fail("An exception should be thrown for the non existing database");
+ }
+ catch(IllegalArgumentException e)
+ {
+ assertEquals("Unexpected exception message", "Database with name 'test2' has not been opened", e.getMessage());
+ }
+ }
+
+ EnvironmentFacade getEnvironmentFacade() throws Exception
+ {
+ if (_environmentFacade == null)
+ {
+ _environmentFacade = createEnvironmentFacade();
+ }
+ return _environmentFacade;
+ }
- @Override
EnvironmentFacade createEnvironmentFacade()
{
return new StandardEnvironmentFacade(getName(), _storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org