You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/08/25 21:48:16 UTC
svn commit: r1697761 - in /qpid/java/trunk/bdbstore/src:
main/java/org/apache/qpid/server/store/berkeleydb/
main/java/org/apache/qpid/server/store/berkeleydb/replication/
main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/
test/java/org/apache...
Author: orudyy
Date: Tue Aug 25 19:48:16 2015
New Revision: 1697761
URL: http://svn.apache.org/r1697761
Log:
QPID-6710: [Java Broker] Propagate coalescing committer exceptions occuring on flushing data to disk into committing threads
Refactor EnvironmentFacade implementations to encapsulate underlying Environment
(work by Lorenz Quack <qu...@gmail.com> and Alex Rudyy <or...@apache.org>)
Removed:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactoryTest.java
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Tue Aug 25 19:48:16 2015
@@ -109,15 +109,15 @@ public abstract class AbstractBDBMessage
{
try
{
- new Upgrader(getEnvironmentFacade().getEnvironment(), getParent()).upgradeIfNecessary();
+ getEnvironmentFacade().upgradeIfNecessary(getParent());
+
+ // TODO this relies on the fact that the VH will call upgrade just before putting the VH into service.
+ _totalStoreSize = getSizeOnDisk();
}
catch(RuntimeException e)
{
throw getEnvironmentFacade().handleDatabaseException("Cannot upgrade store", e);
}
-
- // TODO this relies on the fact that the VH will call upgrade just before putting the VH into service.
- _totalStoreSize = getSizeOnDisk();
}
@Override
@@ -228,7 +228,7 @@ public abstract class AbstractBDBMessage
tx = null;
try
{
- tx = getEnvironmentFacade().getEnvironment().beginTransaction(null, null);
+ tx = getEnvironmentFacade().beginTransaction(null);
//remove the message meta data from the store
DatabaseEntry key = new DatabaseEntry();
@@ -865,12 +865,12 @@ public abstract class AbstractBDBMessage
private void reduceSizeOnDisk()
{
- BDBUtils.runCleaner(getEnvironmentFacade().getEnvironment());
+ getEnvironmentFacade().reduceSizeOnDisk();
}
private long getSizeOnDisk()
{
- return getEnvironmentFacade().getEnvironment().getStats(null).getTotalLogSize();
+ return getEnvironmentFacade().getTotalLogSize();
}
private Database getMessageContentDb()
@@ -1225,8 +1225,7 @@ public abstract class AbstractBDBMessage
Transaction txn;
try
{
- txn = getEnvironmentFacade().getEnvironment().beginTransaction(
- null, null);
+ txn = getEnvironmentFacade().beginTransaction(null);
}
catch (RuntimeException e)
{
@@ -1304,7 +1303,7 @@ public abstract class AbstractBDBMessage
{
try
{
- _txn = getEnvironmentFacade().getEnvironment().beginTransaction(null, null);
+ _txn = getEnvironmentFacade().beginTransaction(null);
}
catch(RuntimeException e)
{
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java Tue Aug 25 19:48:16 2015
@@ -128,7 +128,7 @@ public class BDBConfigurationStore imple
{
try
{
- new Upgrader(_environmentFacade.getEnvironment(), _parent).upgradeIfNecessary();
+ _environmentFacade.upgradeIfNecessary(_parent);
if(_overwrite)
{
clearConfigurationRecords();
@@ -263,7 +263,7 @@ public class BDBConfigurationStore imple
com.sleepycat.je.Transaction txn = null;
try
{
- txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ txn = _environmentFacade.beginTransaction(null);
storeConfiguredObjectEntry(txn, configuredObject);
txn.commit();
txn = null;
@@ -290,7 +290,7 @@ public class BDBConfigurationStore imple
com.sleepycat.je.Transaction txn = null;
try
{
- txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ txn = _environmentFacade.beginTransaction(null);
Collection<UUID> removed = new ArrayList<UUID>(objects.length);
for(ConfiguredObjectRecord record : objects)
@@ -326,7 +326,7 @@ public class BDBConfigurationStore imple
com.sleepycat.je.Transaction txn = null;
try
{
- txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ txn = _environmentFacade.beginTransaction(null);
for(ConfiguredObjectRecord record : records)
{
update(createIfNecessary, record, txn);
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java Tue Aug 25 19:48:16 2015
@@ -93,7 +93,7 @@ public class BDBMessageStore extends Abs
_environmentFacade.close();
_environmentFacade = null;
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw new StoreException("Exception occurred on message store close", e);
}
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java Tue Aug 25 19:48:16 2015
@@ -76,6 +76,11 @@ public class BDBUtils
public synchronized static void runCleaner(final Environment environment)
{
+ if (environment == null || !environment.isValid())
+ {
+ return;
+ }
+
boolean cleanerWasRunning = Boolean.parseBoolean(environment.getConfig().getConfigParam(EnvironmentConfig.ENV_RUN_CLEANER));
try
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java Tue Aug 25 19:48:16 2015
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.Atomi
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
+import org.apache.qpid.server.store.StoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -124,12 +125,6 @@ public class CoalescingCommiter implemen
}
waitForCompletion();
-
- if (_databaseException != null)
- {
- throw _databaseException;
- }
-
}
public synchronized boolean isComplete()
@@ -163,6 +158,11 @@ public class CoalescingCommiter implemen
long duration = System.currentTimeMillis() - startTime;
LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
}
+
+ if (_databaseException != null)
+ {
+ throw _databaseException;
+ }
}
public synchronized void waitForCompletion(long timeout) throws TimeoutException
@@ -197,6 +197,11 @@ public class CoalescingCommiter implemen
long duration = System.currentTimeMillis() - startTime;
LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
}
+
+ if (_databaseException != null)
+ {
+ throw _databaseException;
+ }
}
}
@@ -266,11 +271,7 @@ public class CoalescingCommiter implemen
startTime = System.currentTimeMillis();
}
- Environment environment = _environmentFacade.getEnvironment();
- if (environment != null && environment.isValid())
- {
- environment.flushLog(true);
- }
+ _environmentFacade.flushLog();
if(LOGGER.isDebugEnabled())
{
@@ -289,7 +290,7 @@ public class CoalescingCommiter implemen
}
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
try
{
@@ -313,7 +314,7 @@ public class CoalescingCommiter implemen
{
_environmentFacade.close();
}
- catch (DatabaseException ex)
+ catch (Exception ex)
{
LOGGER.error("Exception closing store environment", ex);
}
@@ -344,22 +345,22 @@ public class CoalescingCommiter implemen
public void close()
{
- RuntimeException e = new RuntimeException("Commit thread has been closed, transaction aborted");
synchronized (_lock)
{
_stopped.set(true);
- Environment environment = _environmentFacade.getEnvironment();
BDBCommitFutureResult commit;
- if (environment != null && environment.isValid())
+
+ try
{
- environment.flushLog(true);
+ _environmentFacade.flushLog();
while ((commit = _jobQueue.poll()) != null)
{
commit.complete();
}
}
- else
+ catch(RuntimeException flushException)
{
+ RuntimeException e = new RuntimeException("Commit thread has been closed, transaction aborted");
int abortedCommits = 0;
while ((commit = _jobQueue.poll()) != null)
{
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java Tue Aug 25 19:48:16 2015
@@ -33,6 +33,8 @@ import com.sleepycat.je.Sequence;
import com.sleepycat.je.SequenceConfig;
import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.util.FutureResult;
public interface EnvironmentFacade
@@ -45,22 +47,25 @@ public interface EnvironmentFacade
put(EnvironmentConfig.STATS_COLLECT, "false");
}});
- Environment getEnvironment();
+ void upgradeIfNecessary(ConfiguredObject<?> parent);
Database openDatabase(String databaseName, DatabaseConfig databaseConfig);
Database clearDatabase(String databaseName, DatabaseConfig databaseConfig);
Sequence openSequence(Database database, DatabaseEntry sequenceKey, SequenceConfig sequenceConfig);
- Transaction beginTransaction();
+ Transaction beginTransaction(TransactionConfig transactionConfig);
FutureResult commit(com.sleepycat.je.Transaction tx, boolean sync);
RuntimeException handleDatabaseException(String contextMessage, RuntimeException e);
- String getStoreLocation();
-
void closeDatabase(String name);
void close();
+ long getTotalLogSize();
+
+ void reduceSizeOnDisk();
+
+ void flushLog();
}
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Tue Aug 25 19:48:16 2015
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
@@ -35,6 +36,9 @@ import com.sleepycat.je.EnvironmentConfi
import com.sleepycat.je.Sequence;
import com.sleepycat.je.SequenceConfig;
import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,8 +53,8 @@ public class StandardEnvironmentFacade i
private final String _storePath;
private final ConcurrentMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
+ private final AtomicReference<Environment> _environment;
- private Environment _environment;
private final Committer _committer;
private final File _environmentPath;
@@ -105,7 +109,7 @@ public class StandardEnvironmentFacade i
boolean success = false;
try
{
- _environment = new Environment(_environmentPath, envConfig);
+ _environment = new AtomicReference<>(new Environment(_environmentPath, envConfig));
success = true;
}
finally
@@ -122,9 +126,9 @@ public class StandardEnvironmentFacade i
@Override
- public Transaction beginTransaction()
+ public Transaction beginTransaction(TransactionConfig transactionConfig)
{
- return _environment.beginTransaction(null, null);
+ return getEnvironment().beginTransaction(null, transactionConfig);
}
@Override
@@ -168,6 +172,31 @@ public class StandardEnvironmentFacade i
}
}
+ @Override
+ public long getTotalLogSize()
+ {
+ return getEnvironment().getStats(null).getTotalLogSize();
+ }
+
+ @Override
+ public void reduceSizeOnDisk()
+ {
+ BDBUtils.runCleaner(getEnvironment());
+ }
+
+ @Override
+ public void flushLog()
+ {
+ try
+ {
+ getEnvironment().flushLog(true);
+ }
+ catch (RuntimeException e)
+ {
+ throw handleDatabaseException("Exception whilst syncing data to disk", e);
+ }
+ }
+
private void closeSequences()
{
RuntimeException firstThrownException = null;
@@ -216,9 +245,10 @@ public class StandardEnvironmentFacade i
private void closeEnvironmentSafely()
{
- if (_environment != null)
+ Environment environment = _environment.getAndSet(null);
+ if (environment != null)
{
- if (_environment.isValid())
+ if (environment.isValid())
{
try
{
@@ -231,7 +261,7 @@ public class StandardEnvironmentFacade i
}
try
{
- _environment.close();
+ environment.close();
}
catch (DatabaseException ex)
{
@@ -241,34 +271,45 @@ public class StandardEnvironmentFacade i
{
LOGGER.error("Exception closing store environment", ex);
}
- finally
- {
- _environment = null;
- }
}
}
+ private Environment getEnvironment()
+ {
+ final Environment environment = _environment.get();
+ if (environment == null)
+ {
+ throw new IllegalStateException("Environment is null.");
+ }
+ else if (!environment.isValid())
+ {
+ throw new IllegalStateException("Environment is invalid.");
+ }
+ return environment;
+ }
+
@Override
- public Environment getEnvironment()
+ public void upgradeIfNecessary(ConfiguredObject<?> parent)
{
- return _environment;
+ Upgrader upgrader = new Upgrader(getEnvironment(), parent);
+ upgrader.upgradeIfNecessary();
}
private void closeEnvironment()
{
- if (_environment != null)
+ Environment environment = _environment.getAndSet(null);
+ if (environment != null)
{
// Clean the log before closing. This makes sure it doesn't contain
// redundant data. Closing without doing this means the cleaner may
// not get a chance to finish.
try
{
- BDBUtils.runCleaner(_environment);
+ BDBUtils.runCleaner(environment);
}
finally
{
- _environment.close();
- _environment = null;
+ environment.close();
}
}
}
@@ -276,7 +317,8 @@ public class StandardEnvironmentFacade i
@Override
public RuntimeException handleDatabaseException(String contextMessage, RuntimeException e)
{
- if (_environment != null && !_environment.isValid())
+ Environment environment = _environment.get();
+ if (environment != null && !environment.isValid())
{
closeEnvironmentSafely();
}
@@ -293,7 +335,7 @@ public class StandardEnvironmentFacade i
Database cachedHandle = _cachedDatabases.get(name);
if (cachedHandle == null)
{
- Database handle = _environment.openDatabase(null, name, databaseConfig);
+ Database handle = getEnvironment().openDatabase(null, name, databaseConfig);
Database existingHandle = _cachedDatabases.putIfAbsent(name, handle);
if (existingHandle == null)
{
@@ -313,7 +355,7 @@ public class StandardEnvironmentFacade i
public Database clearDatabase(String name, DatabaseConfig databaseConfig)
{
closeDatabase(name);
- _environment.removeDatabase(null, name);
+ getEnvironment().removeDatabase(null, name);
return openDatabase(name, databaseConfig);
}
@@ -359,10 +401,4 @@ public class StandardEnvironmentFacade i
cachedHandle.close();
}
}
-
- @Override
- public String getStoreLocation()
- {
- return _storePath;
- }
}
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java Tue Aug 25 19:48:16 2015
@@ -72,7 +72,7 @@ public class DatabasePinger
Transaction txn = null;
try
{
- txn = facade.getEnvironment().beginTransaction(null, _pingTransactionConfig);
+ txn = facade.beginTransaction(_pingTransactionConfig);
db.put(txn, key, value);
txn.commit();
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Tue Aug 25 19:48:16 2015
@@ -69,6 +69,8 @@ import com.sleepycat.je.rep.utilint.Serv
import com.sleepycat.je.rep.vlsn.VLSNRange;
import com.sleepycat.je.utilint.PropUtil;
import com.sleepycat.je.utilint.VLSN;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -191,12 +193,12 @@ public class ReplicatedEnvironmentFacade
private final Durability _defaultDurability;
private final ConcurrentMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
- private final Set<String> _permittedNodes = new CopyOnWriteArraySet<String>();
+ private final AtomicReference<ReplicatedEnvironment> _environment = new AtomicReference<>();
+ private final Set<String> _permittedNodes = new CopyOnWriteArraySet<String>();
private volatile Durability _realMessageStoreDurability = null;
private volatile Durability _messageStoreDurability;
private volatile CoalescingCommiter _coalescingCommiter = null;
- private volatile ReplicatedEnvironment _environment;
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
private volatile long _envSetupTimeoutMillis;
@@ -240,7 +242,14 @@ public class ReplicatedEnvironmentFacade
boolean success = false;
try
{
- createEnvironment(true);
+ createEnvironment(true, new Runnable(){
+ @Override
+ public void run()
+ {
+ populateExistingRemoteReplicationNodes();
+ _groupChangeExecutor.submit(new RemoteNodeStateLearner());
+ }
+ });
success = true;
}
finally
@@ -250,28 +259,12 @@ public class ReplicatedEnvironmentFacade
EnvHomeRegistry.getInstance().deregisterHome(_environmentDirectory);
}
}
- populateExistingRemoteReplicationNodes();
- _groupChangeExecutor.submit(new RemoteNodeStateLearner());
}
@Override
- public Transaction beginTransaction()
+ public Transaction beginTransaction(TransactionConfig transactionConfig)
{
- if (_messageStoreDurability == null)
- {
- throw new IllegalStateException("Message store durability is not set");
- }
-
- try
- {
- TransactionConfig transactionConfig = new TransactionConfig();
- transactionConfig.setDurability(getRealMessageStoreDurability());
- return _environment.beginTransaction(null, transactionConfig);
- }
- catch(DatabaseException e)
- {
- throw handleDatabaseException("Failure to start transaction", e);
- }
+ return getEnvironment().beginTransaction(null, transactionConfig);
}
@Override
@@ -496,15 +489,12 @@ public class ReplicatedEnvironmentFacade
throw new ConnectionScopedRuntimeException("Environment facade is not in opened state");
}
- if (!_environment.isValid())
- {
- throw new ConnectionScopedRuntimeException("Environment is not valid");
- }
+ ReplicatedEnvironment environment = getEnvironment();
Database cachedHandle = _cachedDatabases.get(name);
if (cachedHandle == null)
{
- Database handle = _environment.openDatabase(null, name, databaseConfig);
+ Database handle = environment.openDatabase(null, name, databaseConfig);
Database existingHandle = _cachedDatabases.putIfAbsent(name, handle);
if (existingHandle == null)
{
@@ -533,7 +523,7 @@ public class ReplicatedEnvironmentFacade
public Database clearDatabase(String name, DatabaseConfig databaseConfig)
{
closeDatabase(name);
- _environment.removeDatabase(null, name);
+ getEnvironment().removeDatabase(null, name);
return openDatabase(name, databaseConfig);
}
@@ -587,12 +577,6 @@ public class ReplicatedEnvironmentFacade
}
@Override
- public String getStoreLocation()
- {
- return _environmentDirectory.getAbsolutePath();
- }
-
- @Override
public void stateChange(final StateChangeEvent stateChangeEvent)
{
if (LOGGER.isInfoEnabled())
@@ -623,6 +607,36 @@ public class ReplicatedEnvironmentFacade
}
}
+ @Override
+ public long getTotalLogSize()
+ {
+ return getEnvironment().getStats(null).getTotalLogSize();
+ }
+
+ @Override
+ public void reduceSizeOnDisk()
+ {
+ BDBUtils.runCleaner(getEnvironment());
+ }
+
+ @Override
+ public void flushLog()
+ {
+ try
+ {
+ getEnvironment().flushLog(true);
+ }
+ catch (RuntimeException e)
+ {
+ throw handleDatabaseException("Exception whilst syncing data to disk", e);
+ }
+ }
+
+ public Set<ReplicationNode> getNodes()
+ {
+ return getEnvironment().getGroup().getNodes();
+ }
+
private void stateChanged(StateChangeEvent stateChangeEvent)
{
if (LOGGER.isDebugEnabled())
@@ -697,7 +711,7 @@ public class ReplicatedEnvironmentFacade
{
return ReplicatedEnvironment.State.UNKNOWN.name();
}
- ReplicatedEnvironment.State state = _environment.getState();
+ ReplicatedEnvironment.State state = getEnvironment().getState();
return state.toString();
}
@@ -707,7 +721,7 @@ public class ReplicatedEnvironmentFacade
{
throw new IllegalStateException("Environment facade is not opened");
}
- return _environment.getRepMutableConfig().getDesignatedPrimary();
+ return getEnvironment().getRepMutableConfig().getDesignatedPrimary();
}
public Future<Void> setDesignatedPrimary(final boolean isPrimary)
@@ -732,9 +746,10 @@ public class ReplicatedEnvironmentFacade
{
try
{
- final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+ ReplicatedEnvironment environment = getEnvironment();
+ final ReplicationMutableConfig oldConfig = environment.getRepMutableConfig();
final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary);
- _environment.setRepMutableConfig(newConfig);
+ environment.setRepMutableConfig(newConfig);
if (LOGGER.isInfoEnabled())
{
@@ -743,7 +758,7 @@ public class ReplicatedEnvironmentFacade
}
catch (Exception e)
{
- LOGGER.error("Cannot set designated primary to " + isPrimary + " on node " + _prettyGroupNodeName, e);
+ throw new ConnectionScopedRuntimeException("Cannot set designated primary to " + isPrimary + " on node " + _prettyGroupNodeName, e);
}
}
@@ -753,7 +768,7 @@ public class ReplicatedEnvironmentFacade
{
throw new IllegalStateException("Environment facade is not opened");
}
- ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
+ ReplicationMutableConfig repConfig = getEnvironment().getRepMutableConfig();
return repConfig.getNodePriority();
}
@@ -779,9 +794,10 @@ public class ReplicatedEnvironmentFacade
{
try
{
- final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+ final ReplicatedEnvironment environment = getEnvironment();
+ final ReplicationMutableConfig oldConfig = environment.getRepMutableConfig();
final ReplicationMutableConfig newConfig = oldConfig.setNodePriority(priority);
- _environment.setRepMutableConfig(newConfig);
+ environment.setRepMutableConfig(newConfig);
if (LOGGER.isDebugEnabled())
{
@@ -790,7 +806,7 @@ public class ReplicatedEnvironmentFacade
}
catch (Exception e)
{
- LOGGER.error("Cannot set priority to " + priority + " on node " + _prettyGroupNodeName, e);
+ throw new ConnectionScopedRuntimeException("Cannot set priority to " + priority + " on node " + _prettyGroupNodeName, e);
}
}
@@ -800,7 +816,7 @@ public class ReplicatedEnvironmentFacade
{
throw new IllegalStateException("Environment facade is not opened");
}
- ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
+ ReplicationMutableConfig repConfig = getEnvironment().getRepMutableConfig();
return repConfig.getElectableGroupSizeOverride();
}
@@ -826,9 +842,10 @@ public class ReplicatedEnvironmentFacade
{
try
{
- final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+ final ReplicatedEnvironment environment = getEnvironment();
+ final ReplicationMutableConfig oldConfig = environment.getRepMutableConfig();
final ReplicationMutableConfig newConfig = oldConfig.setElectableGroupSizeOverride(electableGroupOverride);
- _environment.setRepMutableConfig(newConfig);
+ environment.setRepMutableConfig(newConfig);
if (LOGGER.isDebugEnabled())
{
@@ -837,7 +854,7 @@ public class ReplicatedEnvironmentFacade
}
catch (Exception e)
{
- LOGGER.error("Cannot set electable group size to " + electableGroupOverride + " on node " + _prettyGroupNodeName, e);
+ throw new ConnectionScopedRuntimeException("Cannot set electable group size to " + electableGroupOverride + " on node " + _prettyGroupNodeName, e);
}
}
@@ -849,6 +866,7 @@ public class ReplicatedEnvironmentFacade
public Future<Void> transferMasterAsynchronously(final String nodeName)
{
+ // TODO: Should this be executed in the EnvironmentJobExecutor?
return _groupChangeExecutor.submit(new Callable<Void>()
{
@Override
@@ -889,7 +907,7 @@ public class ReplicatedEnvironmentFacade
{
if (_state.get() == State.OPEN)
{
- VLSNRange range = RepInternal.getRepImpl(_environment).getVLSNIndex().getRange();
+ VLSNRange range = RepInternal.getRepImpl(getEnvironment()).getVLSNIndex().getRange();
VLSN lastTxnEnd = range.getLastTxnEnd();
return lastTxnEnd.getSequence();
}
@@ -902,17 +920,34 @@ public class ReplicatedEnvironmentFacade
private ReplicationGroupAdmin createReplicationGroupAdmin()
{
final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
- helpers.addAll(_environment.getRepConfig().getHelperSockets());
+ final ReplicationConfig repConfig = getEnvironment().getRepConfig();
- final ReplicationConfig repConfig = _environment.getRepConfig();
+ helpers.addAll(repConfig.getHelperSockets());
helpers.add(HostPortPair.getSocket(HostPortPair.getString(repConfig.getNodeHostname(), repConfig.getNodePort())));
return new ReplicationGroupAdmin(_configuration.getGroupName(), helpers);
}
- public ReplicatedEnvironment getEnvironment()
+ private ReplicatedEnvironment getEnvironment()
+ {
+ final ReplicatedEnvironment environment = _environment.get();
+ if (environment == null)
+ {
+ throw new IllegalStateException("Environment is null.");
+ }
+ else if (!environment.isValid())
+ {
+ throw new IllegalStateException("Environment is invalid.");
+ }
+ return environment;
+
+ }
+
+ @Override
+ public void upgradeIfNecessary(ConfiguredObject<?> parent)
{
- return _environment;
+ Upgrader upgrader = new Upgrader(getEnvironment(), parent);
+ upgrader.upgradeIfNecessary();
}
public State getFacadeState()
@@ -924,7 +959,11 @@ public class ReplicatedEnvironmentFacade
{
if (_stateChangeListener.compareAndSet(null, stateChangeListener))
{
- _environment.setStateChangeListener(this);
+ final ReplicatedEnvironment environment = _environment.get();
+ if (environment != null)
+ {
+ environment.setStateChangeListener(this);
+ }
}
else
{
@@ -942,37 +981,37 @@ public class ReplicatedEnvironmentFacade
// Clean the log before closing. This makes sure it doesn't contain
// redundant data. Closing without doing this means the cleaner may not
// get a chance to finish.
- try
- {
- if (_environment.isValid())
- {
- BDBUtils.runCleaner(_environment);
- }
- }
- finally
+ ReplicatedEnvironment environment = _environment.getAndSet(null);
+ if (environment != null)
{
- // Try closing the environment but swallow EnvironmentFailureException
- // if the environment becomes invalid while closing.
- // This can be caused by potential race between facade close and DatabasePinger open.
try
{
- _environment.close();
+ if (environment.isValid())
+ {
+ BDBUtils.runCleaner(environment);
+ }
}
- catch (EnvironmentFailureException efe)
+ finally
{
- if (!_environment.isValid())
+ // Try closing the environment but swallow EnvironmentFailureException
+ // if the environment becomes invalid while closing.
+ // This can be caused by potential race between facade close and DatabasePinger open.
+ try
{
- LOGGER.debug("Environment became invalid on close, so ignore", efe);
+ environment.close();
}
- else
+ catch (EnvironmentFailureException efe)
{
- throw efe;
+ if (!environment.isValid())
+ {
+ LOGGER.debug("Environment became invalid on close, so ignore", efe);
+ }
+ else
+ {
+ throw efe;
+ }
}
}
- finally
- {
- _environment = null;
- }
}
}
@@ -997,11 +1036,11 @@ public class ReplicatedEnvironmentFacade
closeEnvironmentOnRestart();
- createEnvironment(false);
+ createEnvironment(false, null);
if (stateChangeListener != null)
{
- _environment.setStateChangeListener(this);
+ getEnvironment().setStateChangeListener(this);
}
LOGGER.info("Environment is restarted");
@@ -1009,7 +1048,7 @@ public class ReplicatedEnvironmentFacade
private void closeEnvironmentOnRestart()
{
- ReplicatedEnvironment environment = _environment;
+ ReplicatedEnvironment environment = _environment.getAndSet(null);
if (environment != null)
{
try
@@ -1088,7 +1127,7 @@ public class ReplicatedEnvironmentFacade
}
}
- private void createEnvironment(boolean createEnvironmentInSeparateThread)
+ private void createEnvironment(boolean createEnvironmentInSeparateThread, Runnable postCreationAction)
{
String groupName = _configuration.getGroupName();
String helperHostPort = _configuration.getHelperHostPort();
@@ -1157,24 +1196,26 @@ public class ReplicatedEnvironmentFacade
if (createEnvironmentInSeparateThread)
{
- createEnvironmentInSeparateThread(_environmentDirectory, envConfig, replicationConfig);
+ createEnvironmentInSeparateThread(_environmentDirectory, envConfig, replicationConfig, postCreationAction);
}
else
{
- createEnvironment(_environmentDirectory, envConfig, replicationConfig);
+ createEnvironment(_environmentDirectory, envConfig, replicationConfig, postCreationAction);
}
}
private void createEnvironmentInSeparateThread(final File environmentPathFile, final EnvironmentConfig envConfig,
- final ReplicationConfig replicationConfig)
+ final ReplicationConfig replicationConfig, final Runnable postCreationAction)
{
- Future<Void> environmentFuture = _environmentJobExecutor.submit(new Callable<Void>(){
+ Future<Void> environmentFuture = _environmentJobExecutor.submit(new Callable<Void>()
+ {
@Override
public Void call() throws Exception
{
- createEnvironment(environmentPathFile, envConfig, replicationConfig);
+ createEnvironment(environmentPathFile, envConfig, replicationConfig, postCreationAction);
return null;
- }});
+ }
+ });
final long setUpTimeOutMillis = extractEnvSetupTimeoutMillis(replicationConfig);
final long initialTimeOutMillis = Math.max(setUpTimeOutMillis / 4, 1000);
@@ -1217,13 +1258,13 @@ public class ReplicatedEnvironmentFacade
}
private void createEnvironment(File environmentPathFile, EnvironmentConfig envConfig,
- final ReplicationConfig replicationConfig)
+ final ReplicationConfig replicationConfig, Runnable action)
{
String originalThreadName = Thread.currentThread().getName();
try
{
_envSetupTimeoutMillis = extractEnvSetupTimeoutMillis(replicationConfig);
- _environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
+ _environment.set(new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig));
}
catch (final InsufficientLogException ile)
{
@@ -1233,13 +1274,26 @@ public class ReplicatedEnvironmentFacade
config.setRetainLogFiles(false);
restore.execute(ile, config);
LOGGER.warn("Network restore complete.");
- _environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
+ _environment.set(new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig));
}
finally
{
Thread.currentThread().setName(originalThreadName);
}
+ if (action != null)
+ {
+ action.run();
+ }
+
+ if (_stateChangeListener.get() != null)
+ {
+ final ReplicatedEnvironment environment = _environment.get();
+ if (environment != null)
+ {
+ environment.setStateChangeListener(this);
+ }
+ }
if (LOGGER.isInfoEnabled())
{
LOGGER.info("Environment is created for node " + _prettyGroupNodeName);
@@ -1257,7 +1311,7 @@ public class ReplicatedEnvironmentFacade
{
throw new IllegalStateException("Environment facade is not opened");
}
- return _environment.getGroup().getElectableNodes().size();
+ return getEnvironment().getGroup().getElectableNodes().size();
}
public boolean isMaster()
@@ -1420,7 +1474,7 @@ public class ReplicatedEnvironmentFacade
if (!permittedNodes.isEmpty())
{
byte[] data = permittedNodeListToBytes(permittedNodes);
- _environment.registerAppStateMonitor(new EnvironmentStateHolder(data));
+ getEnvironment().registerAppStateMonitor(new EnvironmentStateHolder(data));
}
}
@@ -1477,19 +1531,27 @@ public class ReplicatedEnvironmentFacade
private void populateExistingRemoteReplicationNodes()
{
- ReplicationGroup group = _environment.getGroup();
- Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes());
- String localNodeName = getNodeName();
-
- for (ReplicationNode replicationNode : nodes)
+ try
{
- String discoveredNodeName = replicationNode.getName();
- if (!discoveredNodeName.equals(localNodeName))
+ ReplicationGroup group = getEnvironment().getGroup();
+ Set<ReplicationNode> nodes = new HashSet<>(group.getElectableNodes());
+ String localNodeName = getNodeName();
+
+ for (ReplicationNode replicationNode : nodes)
{
- _remoteReplicationNodes.put(replicationNode.getName(), replicationNode);
+ String discoveredNodeName = replicationNode.getName();
+ if (!discoveredNodeName.equals(localNodeName))
+ {
+ _remoteReplicationNodes.put(replicationNode.getName(), replicationNode);
+ }
}
}
- }
+ catch (RuntimeException e)
+ {
+ // should never happen
+ handleDatabaseException("Exception on discovery of existing nodes", e);
+ }
+ }
private void notifyExistingRemoteReplicationNodes(ReplicationGroupListener listener)
{
@@ -1544,13 +1606,14 @@ public class ReplicatedEnvironmentFacade
{
continueMonitoring = detectGroupChangesAndNotify();
}
- catch(DatabaseException e)
+ catch(RuntimeException e)
{
handleDatabaseException("Exception on replication group check", e);
}
if (continueMonitoring)
{
+ // TODO: this code block does not seem to handle exceptions correctly.
boolean currentDesignatedPrimary = isDesignatedPrimary();
int currentElectableGroupSizeOverride = getElectableGroupSizeOverride();
@@ -1588,7 +1651,7 @@ public class ReplicatedEnvironmentFacade
}
boolean shouldContinue = true;
String groupName = _configuration.getGroupName();
- ReplicatedEnvironment env = _environment;
+ ReplicatedEnvironment env = _environment.get();
ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get();
if (env != null)
{
@@ -1747,7 +1810,7 @@ public class ReplicatedEnvironmentFacade
final boolean currentDesignatedPrimary,
final int currentElectableGroupSizeOverride)
{
- if (ReplicatedEnvironment.State.MASTER == _environment.getState())
+ if (ReplicatedEnvironment.State.MASTER == getEnvironment().getState())
{
Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<>();
for (Map.Entry<ReplicationNode, NodeState> entry : nodeStates.entrySet())
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java Tue Aug 25 19:48:16 2015
@@ -56,6 +56,7 @@ import com.sleepycat.je.rep.StateChangeE
import com.sleepycat.je.rep.StateChangeListener;
import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
import com.sleepycat.je.rep.utilint.HostPortPair;
+import org.apache.qpid.server.store.StoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -349,7 +350,7 @@ public class BDBHAVirtualHostNodeImpl ex
try
{
- Set<ReplicationNode> remoteNodes = environmentFacade.getEnvironment().getGroup().getNodes();
+ Set<ReplicationNode> remoteNodes = environmentFacade.getNodes();
for (ReplicationNode node : remoteNodes)
{
String nodeAddress = node.getHostName() + ":" + node.getPort();
@@ -423,7 +424,14 @@ public class BDBHAVirtualHostNodeImpl ex
ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
if (environmentFacade != null && _environmentFacade.compareAndSet(environmentFacade, null))
{
- environmentFacade.close();
+ try
+ {
+ environmentFacade.close();
+ }
+ catch (RuntimeException e)
+ {
+ throw new StoreException("Exception occurred on environment facade close", e);
+ }
}
}
Modified: qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java (original)
+++ qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java Tue Aug 25 19:48:16 2015
@@ -50,6 +50,7 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.berkeleydb.replication.DatabasePinger;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost;
import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
@@ -102,6 +103,17 @@ public class BDBHAVirtualHostNodeTest ex
String repStreamTimeout = "2 h";
Map<String,String> context = (Map<String,String>)attributes.get(BDBHAVirtualHostNode.CONTEXT);
context.put(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout);
+ context.put(EnvironmentConfig.ENV_IS_TRANSACTIONAL, "false");
+ try
+ {
+ _helper.createHaVHN(attributes);
+ fail("Exception was not thrown.");
+ }
+ catch (RuntimeException e)
+ {
+ assertTrue("Unexpected Exception being thrown.", e.getCause() instanceof IllegalArgumentException);
+ }
+ context.put(EnvironmentConfig.ENV_IS_TRANSACTIONAL, "true");
BDBHAVirtualHostNode<?> node = _helper.createHaVHN(attributes);
node.start();
@@ -113,16 +125,14 @@ public class BDBHAVirtualHostNodeTest ex
assertNotNull(store);
BDBConfigurationStore bdbConfigurationStore = (BDBConfigurationStore) store;
- ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbConfigurationStore.getEnvironmentFacade().getEnvironment();
- ReplicationConfig replicationConfig = environment.getRepConfig();
+ ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) bdbConfigurationStore.getEnvironmentFacade();
- assertEquals(nodeName, environment.getNodeName());
- assertEquals(groupName, environment.getGroup().getName());
- assertEquals(helperAddress, replicationConfig.getNodeHostPort());
- assertEquals(helperAddress, replicationConfig.getHelperHosts());
+ assertEquals(nodeName, environmentFacade.getNodeName());
+ assertEquals(groupName, environmentFacade.getGroupName());
+ assertEquals(helperAddress, environmentFacade.getHostPort());
+ assertEquals(helperAddress, environmentFacade.getHelperHostPort());
- assertEquals("SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString());
- assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
+ assertEquals("SYNC,NO_SYNC,SIMPLE_MAJORITY", environmentFacade.getMessageStoreDurability().toString());
_helper.awaitForVirtualhost(node, 30000);
VirtualHost<?, ?, ?> virtualHost = node.getVirtualHost();
@@ -153,20 +163,17 @@ public class BDBHAVirtualHostNodeTest ex
Map<String, Object> attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
BDBHAVirtualHostNode<?> node = _helper.createAndStartHaVHN(attributes);
- BDBConfigurationStore bdbConfigurationStore = (BDBConfigurationStore) node.getConfigurationStore();
- ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbConfigurationStore.getEnvironmentFacade().getEnvironment();
-
- assertEquals("Unexpected node priority value before mutation", 1, environment.getRepMutableConfig().getNodePriority());
- assertFalse("Unexpected designated primary value before mutation", environment.getRepMutableConfig().getDesignatedPrimary());
- assertEquals("Unexpected electable group override value before mutation", 0, environment.getRepMutableConfig().getElectableGroupSizeOverride());
+ assertEquals("Unexpected node priority value before mutation", 1, node.getPriority());
+ assertFalse("Unexpected designated primary value before mutation", node.isDesignatedPrimary());
+ assertEquals("Unexpected electable group override value before mutation", 0, node.getQuorumOverride());
node.setAttribute(BDBHAVirtualHostNode.PRIORITY, 1, 2);
node.setAttribute(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, false, true);
node.setAttribute(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 0, 1);
- assertEquals("Unexpected node priority value after mutation", 2, environment.getRepMutableConfig().getNodePriority());
- assertTrue("Unexpected designated primary value after mutation", environment.getRepMutableConfig().getDesignatedPrimary());
- assertEquals("Unexpected electable group override value after mutation", 1, environment.getRepMutableConfig().getElectableGroupSizeOverride());
+ assertEquals("Unexpected node priority value after mutation", 2, node.getPriority());
+ assertTrue("Unexpected designated primary value after mutation", node.isDesignatedPrimary());
+ assertEquals("Unexpected electable group override value after mutation", 1, node.getQuorumOverride());
assertNotNull("Join time should be set", node.getJoinTime());
assertNotNull("Last known replication transaction id should be set", node.getLastKnownReplicationTransactionId());
Modified: qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactoryTest.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactoryTest.java (original)
+++ qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactoryTest.java Tue Aug 25 19:48:16 2015
@@ -26,10 +26,9 @@ import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
import java.io.File;
-import java.util.HashMap;
+import java.util.Collections;
-
-import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.FileBasedSettings;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -40,7 +39,6 @@ import org.mockito.stubbing.Answer;
public class StandardEnvironmentFacadeFactoryTest extends QpidTestCase
{
- private HashMap<String, String> _jeProperties;
private File _path;
private ConfiguredObject<?> _parent;
@@ -48,9 +46,7 @@ public class StandardEnvironmentFacadeFa
public void setUp()throws Exception
{
super.setUp();
- _jeProperties= new HashMap<>();
- _jeProperties.put("je.log.memOnly", "true");
- _jeProperties.put("je.maxMemoryPercent", "5");
+
_path = TestFileUtils.createTestDirectory(".je.test", true);
// make mock object implementing FileBasedSettings
@@ -84,27 +80,19 @@ public class StandardEnvironmentFacadeFa
public void testCreateEnvironmentFacade()
{
when(_parent.getName()).thenReturn(getTestName());
- when(_parent.getContextKeys(any(boolean.class))).thenReturn(_jeProperties.keySet());
- for (String key : _jeProperties.keySet())
- {
- when(_parent.getContextValue(String.class, key)).thenReturn(_jeProperties.get(key));
- }
+ when(_parent.getContextKeys(any(boolean.class))).thenReturn(Collections.singleton(EnvironmentConfig.ENV_IS_TRANSACTIONAL));
+ when(_parent.getContextValue(String.class, EnvironmentConfig.ENV_IS_TRANSACTIONAL)).thenReturn("false");
StandardEnvironmentFacadeFactory factory = new StandardEnvironmentFacadeFactory();
EnvironmentFacade facade = factory.createEnvironmentFacade(_parent);
try
{
- assertNotNull("Facade should not be null", facade);
- Environment environment = facade.getEnvironment();
- for (String key : _jeProperties.keySet())
- {
- when(_parent.getContextValue(String.class, key)).thenReturn(_jeProperties.get(key));
- assertEquals("Unexpected environment setting", _jeProperties.get(key), environment.getConfig().getConfigParam(key));
- }
+ facade.beginTransaction(null);
+ fail("Context variables were not picked up on environment creation");
}
- finally
+ catch(UnsupportedOperationException e)
{
- facade.getEnvironment().close();
+ //pass
}
}
Modified: qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java (original)
+++ qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java Tue Aug 25 19:48:16 2015
@@ -27,6 +27,8 @@ import java.io.File;
import java.util.Collections;
import java.util.Map;
+import com.sleepycat.je.Transaction;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -65,14 +67,6 @@ public class StandardEnvironmentFacadeTe
}
}
- public void testEnvironmentFacade() throws Exception
- {
- EnvironmentFacade ef = createEnvironmentFacade();
- assertNotNull("Environment should not be null", ef);
- Environment e = ef.getEnvironment();
- assertTrue("Environment is not valid", e.isValid());
- }
-
public void testSecondEnvironmentFacadeUsingSamePathRejected() throws Exception
{
EnvironmentFacade ef = createEnvironmentFacade();
@@ -97,21 +91,27 @@ public class StandardEnvironmentFacadeTe
{
EnvironmentFacade ef = createEnvironmentFacade();
ef.close();
- Environment e = ef.getEnvironment();
-
- assertNull("Environment should be null after facade close", e);
}
public void testOverrideJeParameter() throws Exception
{
- String statCollectVarName = EnvironmentConfig.STATS_COLLECT;
-
+ // verify that transactions can be created by default
EnvironmentFacade ef = createEnvironmentFacade();
- assertEquals("false", ef.getEnvironment().getMutableConfig().getConfigParam(statCollectVarName));
+ Transaction t = ef.beginTransaction(null);
+ t.commit();
ef.close();
- ef = createEnvironmentFacade(Collections.singletonMap(statCollectVarName, "true"));
- assertEquals("true", ef.getEnvironment().getMutableConfig().getConfigParam(statCollectVarName));
+ // customize the environment to be non-transactional
+ ef = createEnvironmentFacade(Collections.singletonMap(EnvironmentConfig.ENV_IS_TRANSACTIONAL, "false"));
+ try
+ {
+ ef.beginTransaction(null);
+ fail("Overridden settings were not picked up on environment creation");
+ }
+ catch(UnsupportedOperationException e)
+ {
+ // pass
+ }
ef.close();
}
Modified: qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java (original)
+++ qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java Tue Aug 25 19:48:16 2015
@@ -48,9 +48,9 @@ import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Durability;
-import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
import com.sleepycat.je.rep.NodeState;
import com.sleepycat.je.rep.ReplicaWriteException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
@@ -126,21 +126,12 @@ public class ReplicatedEnvironmentFacade
_portHelper.waitUntilAllocatedPortsAreFree();
}
- public void testEnvironmentFacade() throws Exception
- {
- EnvironmentFacade ef = createMaster();
- assertNotNull("Environment should not be null", ef);
- Environment e = ef.getEnvironment();
- assertTrue("Environment is not valid", e.isValid());
- }
public void testClose() throws Exception
{
- EnvironmentFacade ef = createMaster();
+ ReplicatedEnvironmentFacade ef = createMaster();
ef.close();
- Environment e = ef.getEnvironment();
-
- assertNull("Environment should be null after facade close", e);
+ assertEquals("Unexpected state after close", ReplicatedEnvironmentFacade.State.CLOSED, ef.getFacadeState());
}
public void testOpenDatabaseReusesCachedHandle() throws Exception
@@ -675,7 +666,9 @@ public class ReplicatedEnvironmentFacade
Transaction txn = null;
try
{
- txn = facade.beginTransaction();
+ TransactionConfig transactionConfig = new TransactionConfig();
+ transactionConfig.setDurability(facade.getRealMessageStoreDurability());
+ txn = facade.beginTransaction(transactionConfig);
assertNotNull("Transaction is not created", txn);
txn.commit();
txn = null;
@@ -771,9 +764,7 @@ public class ReplicatedEnvironmentFacade
node1.setDesignatedPrimary(true);
- Transaction txn = node1.beginTransaction();
- Database db = node1.getEnvironment().openDatabase(txn, "mydb", createConfig);
- txn.commit();
+ Database db = node1.openDatabase("mydb", createConfig);
// Put a record (that will be replicated)
putRecord(node1, db, 1, "value1");
@@ -788,6 +779,7 @@ public class ReplicatedEnvironmentFacade
// Stop node1
node1.close();
+ LOGGER.debug("RESTARTING " + replicaName);
// Restart the node2, making it primary so it becomes master
TestStateChangeListener node2StateChangeListener = new TestStateChangeListener(State.MASTER);
node2 = addNode(replicaName, replicaNodeHostPort, true, node2StateChangeListener, new NoopReplicationGroupListener());
@@ -795,13 +787,12 @@ public class ReplicatedEnvironmentFacade
assertTrue(replicaName + " did not go into desired state; current actual state is "
+ node2StateChangeListener.getCurrentActualState(), awaitForStateChange);
- txn = node2.beginTransaction();
- db = node2.getEnvironment().openDatabase(txn, "mydb", DatabaseConfig.DEFAULT);
- txn.commit();
+ db = node2.openDatabase("mydb", DatabaseConfig.DEFAULT);
// Do a transaction on node2. The two environments will have diverged
putRecord(node2, db, 3, "diverged");
+ LOGGER.debug("RESTARTING " + TEST_NODE_NAME);
// Now restart node1 and ensure that it realises it needs to rollback before it can rejoin.
TestStateChangeListener node1StateChangeListener = new TestStateChangeListener(State.REPLICA);
final CountDownLatch _replicaRolledback = new CountDownLatch(1);
@@ -810,10 +801,11 @@ public class ReplicatedEnvironmentFacade
@Override
public void onNodeRolledback()
{
+ LOGGER.debug("onNodeRolledback in " + TEST_NODE_NAME);
_replicaRolledback.countDown();
}
});
- assertTrue("Node 1 did not go into desired state and remained in state " + node1.getNodeState(),
+ assertTrue("Node 1 did not go into desired state",
node1StateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
assertTrue("Node 1 did not experience rollback within timeout",
_replicaRolledback.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
@@ -822,6 +814,7 @@ public class ReplicatedEnvironmentFacade
putRecord(node2, db, 4, "value4");
db.close();
+ LOGGER.debug("CLOSING");
node1.close();
node2.close();
}
@@ -850,7 +843,7 @@ public class ReplicatedEnvironmentFacade
@Override
public Transaction call() throws Exception
{
- return replica.getEnvironment().beginTransaction(null, null);
+ return replica.beginTransaction(null);
}
});
Transaction transaction = future.get(5, TimeUnit.SECONDS);
@@ -894,7 +887,9 @@ public class ReplicatedEnvironmentFacade
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
- Transaction txn = master.beginTransaction();
+ TransactionConfig transactionConfig = new TransactionConfig();
+ transactionConfig.setDurability(master.getRealMessageStoreDurability());
+ Transaction txn = master.beginTransaction(transactionConfig);
IntegerBinding.intToEntry(keyValue, key);
StringBinding.stringToEntry(dataValue, data);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org