You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/08/25 21:48:16 UTC

svn commit: r1697761 - in /qpid/java/trunk/bdbstore/src: main/java/org/apache/qpid/server/store/berkeleydb/ main/java/org/apache/qpid/server/store/berkeleydb/replication/ main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/ test/java/org/apache...

Author: orudyy
Date: Tue Aug 25 19:48:16 2015
New Revision: 1697761

URL: http://svn.apache.org/r1697761
Log:
QPID-6710: [Java Broker] Propagate coalescing committer exceptions occuring on flushing data to disk into committing threads
                         Refactor EnvironmentFacade implementations to encapsulate underlying Environment
                         (work by Lorenz Quack <qu...@gmail.com> and Alex Rudyy <or...@apache.org>)

Removed:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
Modified:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
    qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
    qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactoryTest.java
    qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
    qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Tue Aug 25 19:48:16 2015
@@ -109,15 +109,15 @@ public abstract class AbstractBDBMessage
     {
         try
         {
-            new Upgrader(getEnvironmentFacade().getEnvironment(), getParent()).upgradeIfNecessary();
+            getEnvironmentFacade().upgradeIfNecessary(getParent());
+
+            // TODO this relies on the fact that the VH will call upgrade just before putting the VH into service.
+            _totalStoreSize = getSizeOnDisk();
         }
         catch(RuntimeException e)
         {
             throw getEnvironmentFacade().handleDatabaseException("Cannot upgrade store", e);
         }
-
-        // TODO this relies on the fact that the VH will call upgrade just before putting the VH into service.
-        _totalStoreSize = getSizeOnDisk();
     }
 
     @Override
@@ -228,7 +228,7 @@ public abstract class AbstractBDBMessage
                 tx = null;
                 try
                 {
-                    tx = getEnvironmentFacade().getEnvironment().beginTransaction(null, null);
+                    tx = getEnvironmentFacade().beginTransaction(null);
 
                     //remove the message meta data from the store
                     DatabaseEntry key = new DatabaseEntry();
@@ -865,12 +865,12 @@ public abstract class AbstractBDBMessage
 
     private void reduceSizeOnDisk()
     {
-        BDBUtils.runCleaner(getEnvironmentFacade().getEnvironment());
+        getEnvironmentFacade().reduceSizeOnDisk();
     }
 
     private long getSizeOnDisk()
     {
-        return getEnvironmentFacade().getEnvironment().getStats(null).getTotalLogSize();
+        return getEnvironmentFacade().getTotalLogSize();
     }
 
     private Database getMessageContentDb()
@@ -1225,8 +1225,7 @@ public abstract class AbstractBDBMessage
                 Transaction txn;
                 try
                 {
-                    txn = getEnvironmentFacade().getEnvironment().beginTransaction(
-                            null, null);
+                    txn = getEnvironmentFacade().beginTransaction(null);
                 }
                 catch (RuntimeException e)
                 {
@@ -1304,7 +1303,7 @@ public abstract class AbstractBDBMessage
         {
             try
             {
-                _txn = getEnvironmentFacade().getEnvironment().beginTransaction(null, null);
+                _txn = getEnvironmentFacade().beginTransaction(null);
             }
             catch(RuntimeException e)
             {

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java Tue Aug 25 19:48:16 2015
@@ -128,7 +128,7 @@ public class BDBConfigurationStore imple
     {
         try
         {
-            new Upgrader(_environmentFacade.getEnvironment(), _parent).upgradeIfNecessary();
+            _environmentFacade.upgradeIfNecessary(_parent);
             if(_overwrite)
             {
                 clearConfigurationRecords();
@@ -263,7 +263,7 @@ public class BDBConfigurationStore imple
         com.sleepycat.je.Transaction txn = null;
         try
         {
-            txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+            txn = _environmentFacade.beginTransaction(null);
             storeConfiguredObjectEntry(txn, configuredObject);
             txn.commit();
             txn = null;
@@ -290,7 +290,7 @@ public class BDBConfigurationStore imple
         com.sleepycat.je.Transaction txn = null;
         try
         {
-            txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+            txn = _environmentFacade.beginTransaction(null);
 
             Collection<UUID> removed = new ArrayList<UUID>(objects.length);
             for(ConfiguredObjectRecord record : objects)
@@ -326,7 +326,7 @@ public class BDBConfigurationStore imple
         com.sleepycat.je.Transaction txn = null;
         try
         {
-            txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+            txn = _environmentFacade.beginTransaction(null);
             for(ConfiguredObjectRecord record : records)
             {
                 update(createIfNecessary, record, txn);

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java Tue Aug 25 19:48:16 2015
@@ -93,7 +93,7 @@ public class BDBMessageStore extends Abs
                     _environmentFacade.close();
                     _environmentFacade = null;
                 }
-                catch (DatabaseException e)
+                catch (RuntimeException e)
                 {
                     throw new StoreException("Exception occurred on message store close", e);
                 }

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java Tue Aug 25 19:48:16 2015
@@ -76,6 +76,11 @@ public class BDBUtils
 
     public synchronized static void runCleaner(final Environment environment)
     {
+        if (environment == null || !environment.isValid())
+        {
+            return;
+        }
+
         boolean cleanerWasRunning = Boolean.parseBoolean(environment.getConfig().getConfigParam(EnvironmentConfig.ENV_RUN_CLEANER));
 
         try

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java Tue Aug 25 19:48:16 2015
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.Atomi
 import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.Environment;
 import com.sleepycat.je.Transaction;
+import org.apache.qpid.server.store.StoreException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -124,12 +125,6 @@ public class CoalescingCommiter implemen
             }
 
             waitForCompletion();
-
-            if (_databaseException != null)
-            {
-                throw _databaseException;
-            }
-
         }
 
         public synchronized boolean isComplete()
@@ -163,6 +158,11 @@ public class CoalescingCommiter implemen
                 long duration = System.currentTimeMillis() - startTime;
                 LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
             }
+
+            if (_databaseException != null)
+            {
+                throw _databaseException;
+            }
         }
 
         public synchronized void waitForCompletion(long timeout) throws TimeoutException
@@ -197,6 +197,11 @@ public class CoalescingCommiter implemen
                 long duration = System.currentTimeMillis() - startTime;
                 LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
             }
+
+            if (_databaseException != null)
+            {
+                throw _databaseException;
+            }
         }
     }
 
@@ -266,11 +271,7 @@ public class CoalescingCommiter implemen
                     startTime = System.currentTimeMillis();
                 }
 
-                Environment environment = _environmentFacade.getEnvironment();
-                if (environment != null && environment.isValid())
-                {
-                    environment.flushLog(true);
-                }
+                _environmentFacade.flushLog();
 
                 if(LOGGER.isDebugEnabled())
                 {
@@ -289,7 +290,7 @@ public class CoalescingCommiter implemen
                 }
 
             }
-            catch (DatabaseException e)
+            catch (RuntimeException e)
             {
                 try
                 {
@@ -313,7 +314,7 @@ public class CoalescingCommiter implemen
                     {
                         _environmentFacade.close();
                     }
-                    catch (DatabaseException ex)
+                    catch (Exception ex)
                     {
                         LOGGER.error("Exception closing store environment", ex);
                     }
@@ -344,22 +345,22 @@ public class CoalescingCommiter implemen
 
         public void close()
         {
-            RuntimeException e = new RuntimeException("Commit thread has been closed, transaction aborted");
             synchronized (_lock)
             {
                 _stopped.set(true);
-                Environment environment = _environmentFacade.getEnvironment();
                 BDBCommitFutureResult commit;
-                if (environment != null && environment.isValid())
+
+                try
                 {
-                    environment.flushLog(true);
+                    _environmentFacade.flushLog();
                     while ((commit = _jobQueue.poll()) != null)
                     {
                         commit.complete();
                     }
                 }
-                else
+                catch(RuntimeException flushException)
                 {
+                    RuntimeException e = new RuntimeException("Commit thread has been closed, transaction aborted");
                     int abortedCommits = 0;
                     while ((commit = _jobQueue.poll()) != null)
                     {

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java Tue Aug 25 19:48:16 2015
@@ -33,6 +33,8 @@ import com.sleepycat.je.Sequence;
 import com.sleepycat.je.SequenceConfig;
 import com.sleepycat.je.Transaction;
 
+import com.sleepycat.je.TransactionConfig;
+import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.util.FutureResult;
 
 public interface EnvironmentFacade
@@ -45,22 +47,25 @@ public interface EnvironmentFacade
         put(EnvironmentConfig.STATS_COLLECT, "false");
     }});
 
-    Environment getEnvironment();
+    void upgradeIfNecessary(ConfiguredObject<?> parent);
 
     Database openDatabase(String databaseName, DatabaseConfig databaseConfig);
     Database clearDatabase(String databaseName, DatabaseConfig databaseConfig);
 
     Sequence openSequence(Database database, DatabaseEntry sequenceKey, SequenceConfig sequenceConfig);
 
-    Transaction beginTransaction();
+    Transaction beginTransaction(TransactionConfig transactionConfig);
 
     FutureResult commit(com.sleepycat.je.Transaction tx, boolean sync);
 
     RuntimeException handleDatabaseException(String contextMessage, RuntimeException e);
 
-    String getStoreLocation();
-
     void closeDatabase(String name);
     void close();
 
+    long getTotalLogSize();
+
+    void reduceSizeOnDisk();
+
+    void flushLog();
 }

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Tue Aug 25 19:48:16 2015
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
@@ -35,6 +36,9 @@ import com.sleepycat.je.EnvironmentConfi
 import com.sleepycat.je.Sequence;
 import com.sleepycat.je.SequenceConfig;
 import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,8 +53,8 @@ public class StandardEnvironmentFacade i
     private final String _storePath;
     private final ConcurrentMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
     private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
+    private final AtomicReference<Environment> _environment;
 
-    private Environment _environment;
     private final Committer _committer;
     private final File _environmentPath;
 
@@ -105,7 +109,7 @@ public class StandardEnvironmentFacade i
         boolean success = false;
         try
         {
-            _environment = new Environment(_environmentPath, envConfig);
+            _environment = new AtomicReference<>(new Environment(_environmentPath, envConfig));
             success = true;
         }
         finally
@@ -122,9 +126,9 @@ public class StandardEnvironmentFacade i
 
 
     @Override
-    public Transaction beginTransaction()
+    public Transaction beginTransaction(TransactionConfig transactionConfig)
     {
-        return _environment.beginTransaction(null, null);
+        return getEnvironment().beginTransaction(null, transactionConfig);
     }
 
     @Override
@@ -168,6 +172,31 @@ public class StandardEnvironmentFacade i
         }
     }
 
+    @Override
+    public long getTotalLogSize()
+    {
+        return getEnvironment().getStats(null).getTotalLogSize();
+    }
+
+    @Override
+    public void reduceSizeOnDisk()
+    {
+        BDBUtils.runCleaner(getEnvironment());
+    }
+
+    @Override
+    public void flushLog()
+    {
+        try
+        {
+            getEnvironment().flushLog(true);
+        }
+        catch (RuntimeException e)
+        {
+            throw handleDatabaseException("Exception whilst syncing data to disk", e);
+        }
+    }
+
     private void closeSequences()
     {
         RuntimeException firstThrownException = null;
@@ -216,9 +245,10 @@ public class StandardEnvironmentFacade i
 
     private void closeEnvironmentSafely()
     {
-        if (_environment != null)
+        Environment environment = _environment.getAndSet(null);
+        if (environment != null)
         {
-            if (_environment.isValid())
+            if (environment.isValid())
             {
                 try
                 {
@@ -231,7 +261,7 @@ public class StandardEnvironmentFacade i
             }
             try
             {
-                _environment.close();
+                environment.close();
             }
             catch (DatabaseException ex)
             {
@@ -241,34 +271,45 @@ public class StandardEnvironmentFacade i
             {
                 LOGGER.error("Exception closing store environment", ex);
             }
-            finally
-            {
-                _environment = null;
-            }
         }
     }
 
+    private Environment getEnvironment()
+    {
+        final Environment environment = _environment.get();
+        if (environment == null)
+        {
+            throw new IllegalStateException("Environment is null.");
+        }
+        else if (!environment.isValid())
+        {
+            throw new IllegalStateException("Environment is invalid.");
+        }
+        return environment;
+    }
+
     @Override
-    public Environment getEnvironment()
+    public void upgradeIfNecessary(ConfiguredObject<?> parent)
     {
-        return _environment;
+        Upgrader upgrader = new Upgrader(getEnvironment(), parent);
+        upgrader.upgradeIfNecessary();
     }
 
     private void closeEnvironment()
     {
-        if (_environment != null)
+        Environment environment = _environment.getAndSet(null);
+        if (environment != null)
         {
             // Clean the log before closing. This makes sure it doesn't contain
             // redundant data. Closing without doing this means the cleaner may
             // not get a chance to finish.
             try
             {
-                BDBUtils.runCleaner(_environment);
+                BDBUtils.runCleaner(environment);
             }
             finally
             {
-                _environment.close();
-                _environment = null;
+                environment.close();
             }
         }
     }
@@ -276,7 +317,8 @@ public class StandardEnvironmentFacade i
     @Override
     public RuntimeException handleDatabaseException(String contextMessage, RuntimeException e)
     {
-        if (_environment != null && !_environment.isValid())
+        Environment environment = _environment.get();
+        if (environment != null && !environment.isValid())
         {
             closeEnvironmentSafely();
         }
@@ -293,7 +335,7 @@ public class StandardEnvironmentFacade i
         Database cachedHandle = _cachedDatabases.get(name);
         if (cachedHandle == null)
         {
-            Database handle = _environment.openDatabase(null, name, databaseConfig);
+            Database handle = getEnvironment().openDatabase(null, name, databaseConfig);
             Database existingHandle = _cachedDatabases.putIfAbsent(name, handle);
             if (existingHandle == null)
             {
@@ -313,7 +355,7 @@ public class StandardEnvironmentFacade i
     public Database clearDatabase(String name, DatabaseConfig databaseConfig)
     {
         closeDatabase(name);
-        _environment.removeDatabase(null, name);
+        getEnvironment().removeDatabase(null, name);
         return openDatabase(name, databaseConfig);
     }
 
@@ -359,10 +401,4 @@ public class StandardEnvironmentFacade i
             cachedHandle.close();
         }
     }
-
-    @Override
-    public String getStoreLocation()
-    {
-        return _storePath;
-    }
 }

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java Tue Aug 25 19:48:16 2015
@@ -72,7 +72,7 @@ public class DatabasePinger
             Transaction txn = null;
             try
             {
-                txn = facade.getEnvironment().beginTransaction(null, _pingTransactionConfig);
+                txn = facade.beginTransaction(_pingTransactionConfig);
                 db.put(txn, key, value);
                 txn.commit();
 

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Tue Aug 25 19:48:16 2015
@@ -69,6 +69,8 @@ import com.sleepycat.je.rep.utilint.Serv
 import com.sleepycat.je.rep.vlsn.VLSNRange;
 import com.sleepycat.je.utilint.PropUtil;
 import com.sleepycat.je.utilint.VLSN;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -191,12 +193,12 @@ public class ReplicatedEnvironmentFacade
     private final Durability _defaultDurability;
     private final ConcurrentMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
     private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
-    private final Set<String> _permittedNodes = new CopyOnWriteArraySet<String>();
+    private final AtomicReference<ReplicatedEnvironment> _environment = new AtomicReference<>();
 
+    private final Set<String> _permittedNodes = new CopyOnWriteArraySet<String>();
     private volatile Durability _realMessageStoreDurability = null;
     private volatile Durability _messageStoreDurability;
     private volatile CoalescingCommiter _coalescingCommiter = null;
-    private volatile ReplicatedEnvironment _environment;
     private volatile long _joinTime;
     private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
     private volatile long _envSetupTimeoutMillis;
@@ -240,7 +242,14 @@ public class ReplicatedEnvironmentFacade
         boolean success = false;
         try
         {
-            createEnvironment(true);
+            createEnvironment(true, new Runnable(){
+                @Override
+                public void run()
+                {
+                    populateExistingRemoteReplicationNodes();
+                    _groupChangeExecutor.submit(new RemoteNodeStateLearner());
+                }
+            });
             success = true;
         }
         finally
@@ -250,28 +259,12 @@ public class ReplicatedEnvironmentFacade
                 EnvHomeRegistry.getInstance().deregisterHome(_environmentDirectory);
             }
         }
-        populateExistingRemoteReplicationNodes();
-        _groupChangeExecutor.submit(new RemoteNodeStateLearner());
     }
 
     @Override
-    public Transaction beginTransaction()
+    public Transaction beginTransaction(TransactionConfig transactionConfig)
     {
-        if (_messageStoreDurability == null)
-        {
-            throw new IllegalStateException("Message store durability is not set");
-        }
-
-        try
-        {
-            TransactionConfig transactionConfig = new TransactionConfig();
-            transactionConfig.setDurability(getRealMessageStoreDurability());
-            return _environment.beginTransaction(null, transactionConfig);
-        }
-        catch(DatabaseException e)
-        {
-            throw handleDatabaseException("Failure to start transaction", e);
-        }
+        return getEnvironment().beginTransaction(null, transactionConfig);
     }
 
     @Override
@@ -496,15 +489,12 @@ public class ReplicatedEnvironmentFacade
             throw new ConnectionScopedRuntimeException("Environment facade is not in opened state");
         }
 
-        if (!_environment.isValid())
-        {
-            throw new ConnectionScopedRuntimeException("Environment is not valid");
-        }
+        ReplicatedEnvironment environment = getEnvironment();
 
         Database cachedHandle = _cachedDatabases.get(name);
         if (cachedHandle == null)
         {
-            Database handle = _environment.openDatabase(null, name, databaseConfig);
+            Database handle = environment.openDatabase(null, name, databaseConfig);
             Database existingHandle = _cachedDatabases.putIfAbsent(name, handle);
             if (existingHandle == null)
             {
@@ -533,7 +523,7 @@ public class ReplicatedEnvironmentFacade
     public Database clearDatabase(String name, DatabaseConfig databaseConfig)
     {
         closeDatabase(name);
-        _environment.removeDatabase(null, name);
+        getEnvironment().removeDatabase(null, name);
         return openDatabase(name, databaseConfig);
     }
 
@@ -587,12 +577,6 @@ public class ReplicatedEnvironmentFacade
     }
 
     @Override
-    public String getStoreLocation()
-    {
-        return _environmentDirectory.getAbsolutePath();
-    }
-
-    @Override
     public void stateChange(final StateChangeEvent stateChangeEvent)
     {
         if (LOGGER.isInfoEnabled())
@@ -623,6 +607,36 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
+    @Override
+    public long getTotalLogSize()
+    {
+        return getEnvironment().getStats(null).getTotalLogSize();
+    }
+
+    @Override
+    public void reduceSizeOnDisk()
+    {
+        BDBUtils.runCleaner(getEnvironment());
+    }
+
+    @Override
+    public void flushLog()
+    {
+        try
+        {
+            getEnvironment().flushLog(true);
+        }
+        catch (RuntimeException e)
+        {
+            throw handleDatabaseException("Exception whilst syncing data to disk", e);
+        }
+    }
+
+    public Set<ReplicationNode> getNodes()
+    {
+        return getEnvironment().getGroup().getNodes();
+    }
+
     private void stateChanged(StateChangeEvent stateChangeEvent)
     {
         if (LOGGER.isDebugEnabled())
@@ -697,7 +711,7 @@ public class ReplicatedEnvironmentFacade
         {
             return ReplicatedEnvironment.State.UNKNOWN.name();
         }
-        ReplicatedEnvironment.State state = _environment.getState();
+        ReplicatedEnvironment.State state = getEnvironment().getState();
         return state.toString();
     }
 
@@ -707,7 +721,7 @@ public class ReplicatedEnvironmentFacade
         {
             throw new IllegalStateException("Environment facade is not opened");
         }
-        return _environment.getRepMutableConfig().getDesignatedPrimary();
+        return getEnvironment().getRepMutableConfig().getDesignatedPrimary();
     }
 
     public Future<Void> setDesignatedPrimary(final boolean isPrimary)
@@ -732,9 +746,10 @@ public class ReplicatedEnvironmentFacade
     {
         try
         {
-            final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+            ReplicatedEnvironment environment = getEnvironment();
+            final ReplicationMutableConfig oldConfig = environment.getRepMutableConfig();
             final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary);
-            _environment.setRepMutableConfig(newConfig);
+            environment.setRepMutableConfig(newConfig);
 
             if (LOGGER.isInfoEnabled())
             {
@@ -743,7 +758,7 @@ public class ReplicatedEnvironmentFacade
         }
         catch (Exception e)
         {
-            LOGGER.error("Cannot set designated primary to " + isPrimary + " on node " + _prettyGroupNodeName, e);
+            throw new ConnectionScopedRuntimeException("Cannot set designated primary to " + isPrimary + " on node " + _prettyGroupNodeName, e);
         }
     }
 
@@ -753,7 +768,7 @@ public class ReplicatedEnvironmentFacade
         {
             throw new IllegalStateException("Environment facade is not opened");
         }
-        ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
+        ReplicationMutableConfig repConfig = getEnvironment().getRepMutableConfig();
         return repConfig.getNodePriority();
     }
 
@@ -779,9 +794,10 @@ public class ReplicatedEnvironmentFacade
     {
         try
         {
-            final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+            final ReplicatedEnvironment environment = getEnvironment();
+            final ReplicationMutableConfig oldConfig = environment.getRepMutableConfig();
             final ReplicationMutableConfig newConfig = oldConfig.setNodePriority(priority);
-            _environment.setRepMutableConfig(newConfig);
+            environment.setRepMutableConfig(newConfig);
 
             if (LOGGER.isDebugEnabled())
             {
@@ -790,7 +806,7 @@ public class ReplicatedEnvironmentFacade
         }
         catch (Exception e)
         {
-            LOGGER.error("Cannot set priority to " + priority + " on node " + _prettyGroupNodeName, e);
+            throw new ConnectionScopedRuntimeException("Cannot set priority to " + priority + " on node " + _prettyGroupNodeName, e);
         }
     }
 
@@ -800,7 +816,7 @@ public class ReplicatedEnvironmentFacade
         {
             throw new IllegalStateException("Environment facade is not opened");
         }
-        ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
+        ReplicationMutableConfig repConfig = getEnvironment().getRepMutableConfig();
         return repConfig.getElectableGroupSizeOverride();
     }
 
@@ -826,9 +842,10 @@ public class ReplicatedEnvironmentFacade
     {
         try
         {
-            final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+            final ReplicatedEnvironment environment = getEnvironment();
+            final ReplicationMutableConfig oldConfig = environment.getRepMutableConfig();
             final ReplicationMutableConfig newConfig = oldConfig.setElectableGroupSizeOverride(electableGroupOverride);
-            _environment.setRepMutableConfig(newConfig);
+            environment.setRepMutableConfig(newConfig);
 
             if (LOGGER.isDebugEnabled())
             {
@@ -837,7 +854,7 @@ public class ReplicatedEnvironmentFacade
         }
         catch (Exception e)
         {
-            LOGGER.error("Cannot set electable group size to " + electableGroupOverride + " on node " + _prettyGroupNodeName, e);
+            throw new ConnectionScopedRuntimeException("Cannot set electable group size to " + electableGroupOverride + " on node " + _prettyGroupNodeName, e);
         }
     }
 
@@ -849,6 +866,7 @@ public class ReplicatedEnvironmentFacade
 
     public Future<Void> transferMasterAsynchronously(final String nodeName)
     {
+        // TODO: Should this be executed in the EnvironmentJobExecutor?
         return _groupChangeExecutor.submit(new Callable<Void>()
         {
             @Override
@@ -889,7 +907,7 @@ public class ReplicatedEnvironmentFacade
     {
         if (_state.get() == State.OPEN)
         {
-            VLSNRange range = RepInternal.getRepImpl(_environment).getVLSNIndex().getRange();
+            VLSNRange range = RepInternal.getRepImpl(getEnvironment()).getVLSNIndex().getRange();
             VLSN lastTxnEnd = range.getLastTxnEnd();
             return lastTxnEnd.getSequence();
         }
@@ -902,17 +920,34 @@ public class ReplicatedEnvironmentFacade
     private ReplicationGroupAdmin createReplicationGroupAdmin()
     {
         final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
-        helpers.addAll(_environment.getRepConfig().getHelperSockets());
+        final ReplicationConfig repConfig = getEnvironment().getRepConfig();
 
-        final ReplicationConfig repConfig = _environment.getRepConfig();
+        helpers.addAll(repConfig.getHelperSockets());
         helpers.add(HostPortPair.getSocket(HostPortPair.getString(repConfig.getNodeHostname(), repConfig.getNodePort())));
 
         return new ReplicationGroupAdmin(_configuration.getGroupName(), helpers);
     }
 
-    public ReplicatedEnvironment getEnvironment()
+    private ReplicatedEnvironment getEnvironment()
+    {
+        final ReplicatedEnvironment environment = _environment.get();
+        if (environment == null)
+        {
+            throw new IllegalStateException("Environment is null.");
+        }
+        else if (!environment.isValid())
+        {
+            throw new IllegalStateException("Environment is invalid.");
+        }
+        return environment;
+
+    }
+
+    @Override
+    public void upgradeIfNecessary(ConfiguredObject<?> parent)
     {
-        return _environment;
+        Upgrader upgrader = new Upgrader(getEnvironment(), parent);
+        upgrader.upgradeIfNecessary();
     }
 
     public State getFacadeState()
@@ -924,7 +959,11 @@ public class ReplicatedEnvironmentFacade
     {
         if (_stateChangeListener.compareAndSet(null, stateChangeListener))
         {
-            _environment.setStateChangeListener(this);
+            final ReplicatedEnvironment environment = _environment.get();
+            if (environment != null)
+            {
+                environment.setStateChangeListener(this);
+            }
         }
         else
         {
@@ -942,37 +981,37 @@ public class ReplicatedEnvironmentFacade
         // Clean the log before closing. This makes sure it doesn't contain
         // redundant data. Closing without doing this means the cleaner may not
         // get a chance to finish.
-        try
-        {
-            if (_environment.isValid())
-            {
-                BDBUtils.runCleaner(_environment);
-            }
-        }
-        finally
+        ReplicatedEnvironment environment = _environment.getAndSet(null);
+        if (environment != null)
         {
-            // Try closing the environment but swallow EnvironmentFailureException
-            // if the environment becomes invalid while closing.
-            // This can be caused by potential race between facade close and DatabasePinger open.
             try
             {
-                _environment.close();
+                if (environment.isValid())
+                {
+                    BDBUtils.runCleaner(environment);
+                }
             }
-            catch (EnvironmentFailureException efe)
+            finally
             {
-                if (!_environment.isValid())
+                // Try closing the environment but swallow EnvironmentFailureException
+                // if the environment becomes invalid while closing.
+                // This can be caused by potential race between facade close and DatabasePinger open.
+                try
                 {
-                    LOGGER.debug("Environment became invalid on close, so ignore", efe);
+                    environment.close();
                 }
-                else
+                catch (EnvironmentFailureException efe)
                 {
-                    throw efe;
+                    if (!environment.isValid())
+                    {
+                        LOGGER.debug("Environment became invalid on close, so ignore", efe);
+                    }
+                    else
+                    {
+                        throw efe;
+                    }
                 }
             }
-            finally
-            {
-                _environment = null;
-            }
         }
     }
 
@@ -997,11 +1036,11 @@ public class ReplicatedEnvironmentFacade
 
         closeEnvironmentOnRestart();
 
-        createEnvironment(false);
+        createEnvironment(false, null);
 
         if (stateChangeListener != null)
         {
-            _environment.setStateChangeListener(this);
+            getEnvironment().setStateChangeListener(this);
         }
 
         LOGGER.info("Environment is restarted");
@@ -1009,7 +1048,7 @@ public class ReplicatedEnvironmentFacade
 
     private void closeEnvironmentOnRestart()
     {
-        ReplicatedEnvironment environment = _environment;
+        ReplicatedEnvironment environment = _environment.getAndSet(null);
         if (environment != null)
         {
             try
@@ -1088,7 +1127,7 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    private void createEnvironment(boolean createEnvironmentInSeparateThread)
+    private void createEnvironment(boolean createEnvironmentInSeparateThread, Runnable postCreationAction)
     {
         String groupName = _configuration.getGroupName();
         String helperHostPort = _configuration.getHelperHostPort();
@@ -1157,24 +1196,26 @@ public class ReplicatedEnvironmentFacade
 
         if (createEnvironmentInSeparateThread)
         {
-            createEnvironmentInSeparateThread(_environmentDirectory, envConfig, replicationConfig);
+            createEnvironmentInSeparateThread(_environmentDirectory, envConfig, replicationConfig, postCreationAction);
         }
         else
         {
-            createEnvironment(_environmentDirectory, envConfig, replicationConfig);
+            createEnvironment(_environmentDirectory, envConfig, replicationConfig, postCreationAction);
         }
     }
 
     private void createEnvironmentInSeparateThread(final File environmentPathFile, final EnvironmentConfig envConfig,
-            final ReplicationConfig replicationConfig)
+                                                   final ReplicationConfig replicationConfig, final Runnable postCreationAction)
     {
-        Future<Void> environmentFuture = _environmentJobExecutor.submit(new Callable<Void>(){
+        Future<Void> environmentFuture = _environmentJobExecutor.submit(new Callable<Void>()
+        {
             @Override
             public Void call() throws Exception
             {
-                createEnvironment(environmentPathFile, envConfig, replicationConfig);
+                createEnvironment(environmentPathFile, envConfig, replicationConfig, postCreationAction);
                 return null;
-            }});
+            }
+        });
 
         final long setUpTimeOutMillis = extractEnvSetupTimeoutMillis(replicationConfig);
         final long initialTimeOutMillis = Math.max(setUpTimeOutMillis / 4, 1000);
@@ -1217,13 +1258,13 @@ public class ReplicatedEnvironmentFacade
     }
 
     private void createEnvironment(File environmentPathFile, EnvironmentConfig envConfig,
-            final ReplicationConfig replicationConfig)
+                                   final ReplicationConfig replicationConfig, Runnable action)
     {
         String originalThreadName = Thread.currentThread().getName();
         try
         {
             _envSetupTimeoutMillis = extractEnvSetupTimeoutMillis(replicationConfig);
-            _environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
+            _environment.set(new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig));
         }
         catch (final InsufficientLogException ile)
         {
@@ -1233,13 +1274,26 @@ public class ReplicatedEnvironmentFacade
             config.setRetainLogFiles(false);
             restore.execute(ile, config);
             LOGGER.warn("Network restore complete.");
-            _environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
+            _environment.set(new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig));
         }
         finally
         {
             Thread.currentThread().setName(originalThreadName);
         }
 
+        if (action != null)
+        {
+            action.run();
+        }
+
+        if (_stateChangeListener.get() != null)
+        {
+            final ReplicatedEnvironment environment = _environment.get();
+            if (environment != null)
+            {
+                environment.setStateChangeListener(this);
+            }
+        }
         if (LOGGER.isInfoEnabled())
         {
             LOGGER.info("Environment is created for node " + _prettyGroupNodeName);
@@ -1257,7 +1311,7 @@ public class ReplicatedEnvironmentFacade
         {
             throw new IllegalStateException("Environment facade is not opened");
         }
-        return _environment.getGroup().getElectableNodes().size();
+        return getEnvironment().getGroup().getElectableNodes().size();
     }
 
     public boolean isMaster()
@@ -1420,7 +1474,7 @@ public class ReplicatedEnvironmentFacade
         if (!permittedNodes.isEmpty())
         {
             byte[] data = permittedNodeListToBytes(permittedNodes);
-            _environment.registerAppStateMonitor(new EnvironmentStateHolder(data));
+            getEnvironment().registerAppStateMonitor(new EnvironmentStateHolder(data));
         }
     }
 
@@ -1477,19 +1531,27 @@ public class ReplicatedEnvironmentFacade
 
     private void populateExistingRemoteReplicationNodes()
     {
-        ReplicationGroup group = _environment.getGroup();
-        Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes());
-        String localNodeName = getNodeName();
-
-        for (ReplicationNode replicationNode : nodes)
+        try
         {
-            String discoveredNodeName = replicationNode.getName();
-            if (!discoveredNodeName.equals(localNodeName))
+            ReplicationGroup group = getEnvironment().getGroup();
+            Set<ReplicationNode> nodes = new HashSet<>(group.getElectableNodes());
+            String localNodeName = getNodeName();
+
+            for (ReplicationNode replicationNode : nodes)
             {
-               _remoteReplicationNodes.put(replicationNode.getName(), replicationNode);
+                String discoveredNodeName = replicationNode.getName();
+                if (!discoveredNodeName.equals(localNodeName))
+                {
+                    _remoteReplicationNodes.put(replicationNode.getName(), replicationNode);
+                }
             }
         }
-     }
+        catch (RuntimeException e)
+        {
+            // should never happen
+            handleDatabaseException("Exception on discovery of existing nodes", e);
+        }
+    }
 
     private void notifyExistingRemoteReplicationNodes(ReplicationGroupListener listener)
     {
@@ -1544,13 +1606,14 @@ public class ReplicatedEnvironmentFacade
                     {
                         continueMonitoring = detectGroupChangesAndNotify();
                     }
-                    catch(DatabaseException e)
+                    catch(RuntimeException e)
                     {
                         handleDatabaseException("Exception on replication group check", e);
                     }
 
                     if (continueMonitoring)
                     {
+                        // TODO: this code block does not seem to handle exceptions correctly.
                         boolean currentDesignatedPrimary = isDesignatedPrimary();
                         int currentElectableGroupSizeOverride = getElectableGroupSizeOverride();
 
@@ -1588,7 +1651,7 @@ public class ReplicatedEnvironmentFacade
             }
             boolean shouldContinue = true;
             String groupName = _configuration.getGroupName();
-            ReplicatedEnvironment env = _environment;
+            ReplicatedEnvironment env = _environment.get();
             ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get();
             if (env != null)
             {
@@ -1747,7 +1810,7 @@ public class ReplicatedEnvironmentFacade
                                                                 final boolean currentDesignatedPrimary,
                                                                 final int currentElectableGroupSizeOverride)
         {
-            if (ReplicatedEnvironment.State.MASTER == _environment.getState())
+            if (ReplicatedEnvironment.State.MASTER == getEnvironment().getState())
             {
                 Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<>();
                 for (Map.Entry<ReplicationNode, NodeState> entry : nodeStates.entrySet())

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java Tue Aug 25 19:48:16 2015
@@ -56,6 +56,7 @@ import com.sleepycat.je.rep.StateChangeE
 import com.sleepycat.je.rep.StateChangeListener;
 import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
 import com.sleepycat.je.rep.utilint.HostPortPair;
+import org.apache.qpid.server.store.StoreException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -349,7 +350,7 @@ public class BDBHAVirtualHostNodeImpl ex
 
         try
         {
-            Set<ReplicationNode> remoteNodes = environmentFacade.getEnvironment().getGroup().getNodes();
+            Set<ReplicationNode> remoteNodes = environmentFacade.getNodes();
             for (ReplicationNode node : remoteNodes)
             {
                 String nodeAddress = node.getHostName() + ":" + node.getPort();
@@ -423,7 +424,14 @@ public class BDBHAVirtualHostNodeImpl ex
         ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
         if (environmentFacade != null && _environmentFacade.compareAndSet(environmentFacade, null))
         {
-            environmentFacade.close();
+            try
+            {
+                environmentFacade.close();
+            }
+            catch (RuntimeException e)
+            {
+                throw new StoreException("Exception occurred on environment facade close", e);
+            }
         }
     }
 

Modified: qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java (original)
+++ qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java Tue Aug 25 19:48:16 2015
@@ -50,6 +50,7 @@ import org.apache.qpid.server.model.Stat
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.berkeleydb.replication.DatabasePinger;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
 import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost;
 import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
@@ -102,6 +103,17 @@ public class BDBHAVirtualHostNodeTest ex
         String repStreamTimeout = "2 h";
         Map<String,String> context = (Map<String,String>)attributes.get(BDBHAVirtualHostNode.CONTEXT);
         context.put(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout);
+        context.put(EnvironmentConfig.ENV_IS_TRANSACTIONAL, "false");
+        try
+        {
+            _helper.createHaVHN(attributes);
+            fail("Exception was not thrown.");
+        }
+        catch (RuntimeException e)
+        {
+            assertTrue("Unexpected Exception being thrown.", e.getCause() instanceof IllegalArgumentException);
+        }
+        context.put(EnvironmentConfig.ENV_IS_TRANSACTIONAL, "true");
         BDBHAVirtualHostNode<?> node = _helper.createHaVHN(attributes);
 
         node.start();
@@ -113,16 +125,14 @@ public class BDBHAVirtualHostNodeTest ex
         assertNotNull(store);
 
         BDBConfigurationStore bdbConfigurationStore = (BDBConfigurationStore) store;
-        ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbConfigurationStore.getEnvironmentFacade().getEnvironment();
-        ReplicationConfig replicationConfig = environment.getRepConfig();
+        ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) bdbConfigurationStore.getEnvironmentFacade();
 
-        assertEquals(nodeName, environment.getNodeName());
-        assertEquals(groupName, environment.getGroup().getName());
-        assertEquals(helperAddress, replicationConfig.getNodeHostPort());
-        assertEquals(helperAddress, replicationConfig.getHelperHosts());
+        assertEquals(nodeName, environmentFacade.getNodeName());
+        assertEquals(groupName, environmentFacade.getGroupName());
+        assertEquals(helperAddress, environmentFacade.getHostPort());
+        assertEquals(helperAddress, environmentFacade.getHelperHostPort());
 
-        assertEquals("SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString());
-        assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
+        assertEquals("SYNC,NO_SYNC,SIMPLE_MAJORITY", environmentFacade.getMessageStoreDurability().toString());
 
         _helper.awaitForVirtualhost(node, 30000);
         VirtualHost<?, ?, ?> virtualHost = node.getVirtualHost();
@@ -153,20 +163,17 @@ public class BDBHAVirtualHostNodeTest ex
         Map<String, Object> attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
         BDBHAVirtualHostNode<?> node = _helper.createAndStartHaVHN(attributes);
 
-        BDBConfigurationStore bdbConfigurationStore = (BDBConfigurationStore) node.getConfigurationStore();
-        ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbConfigurationStore.getEnvironmentFacade().getEnvironment();
-
-        assertEquals("Unexpected node priority value before mutation", 1, environment.getRepMutableConfig().getNodePriority());
-        assertFalse("Unexpected designated primary value before mutation", environment.getRepMutableConfig().getDesignatedPrimary());
-        assertEquals("Unexpected electable group override value before mutation", 0, environment.getRepMutableConfig().getElectableGroupSizeOverride());
+        assertEquals("Unexpected node priority value before mutation", 1, node.getPriority());
+        assertFalse("Unexpected designated primary value before mutation", node.isDesignatedPrimary());
+        assertEquals("Unexpected electable group override value before mutation", 0, node.getQuorumOverride());
 
         node.setAttribute(BDBHAVirtualHostNode.PRIORITY, 1, 2);
         node.setAttribute(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, false, true);
         node.setAttribute(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 0, 1);
 
-        assertEquals("Unexpected node priority value after mutation", 2, environment.getRepMutableConfig().getNodePriority());
-        assertTrue("Unexpected designated primary value after mutation", environment.getRepMutableConfig().getDesignatedPrimary());
-        assertEquals("Unexpected electable group override value after mutation", 1, environment.getRepMutableConfig().getElectableGroupSizeOverride());
+        assertEquals("Unexpected node priority value after mutation", 2, node.getPriority());
+        assertTrue("Unexpected designated primary value after mutation", node.isDesignatedPrimary());
+        assertEquals("Unexpected electable group override value after mutation", 1, node.getQuorumOverride());
 
         assertNotNull("Join time should be set", node.getJoinTime());
         assertNotNull("Last known replication transaction id should be set", node.getLastKnownReplicationTransactionId());

Modified: qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactoryTest.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactoryTest.java (original)
+++ qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactoryTest.java Tue Aug 25 19:48:16 2015
@@ -26,10 +26,9 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.withSettings;
 
 import java.io.File;
-import java.util.HashMap;
+import java.util.Collections;
 
-
-import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.store.FileBasedSettings;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -40,7 +39,6 @@ import org.mockito.stubbing.Answer;
 
 public class StandardEnvironmentFacadeFactoryTest extends QpidTestCase
 {
-    private HashMap<String, String> _jeProperties;
     private File _path;
     private ConfiguredObject<?> _parent;
 
@@ -48,9 +46,7 @@ public class StandardEnvironmentFacadeFa
     public void setUp()throws Exception
     {
         super.setUp();
-        _jeProperties=  new HashMap<>();
-        _jeProperties.put("je.log.memOnly", "true");
-        _jeProperties.put("je.maxMemoryPercent", "5");
+
         _path = TestFileUtils.createTestDirectory(".je.test", true);
 
         // make mock object implementing FileBasedSettings
@@ -84,27 +80,19 @@ public class StandardEnvironmentFacadeFa
     public void testCreateEnvironmentFacade()
     {
         when(_parent.getName()).thenReturn(getTestName());
-        when(_parent.getContextKeys(any(boolean.class))).thenReturn(_jeProperties.keySet());
-        for (String key : _jeProperties.keySet())
-        {
-            when(_parent.getContextValue(String.class, key)).thenReturn(_jeProperties.get(key));
-        }
+        when(_parent.getContextKeys(any(boolean.class))).thenReturn(Collections.singleton(EnvironmentConfig.ENV_IS_TRANSACTIONAL));
+        when(_parent.getContextValue(String.class, EnvironmentConfig.ENV_IS_TRANSACTIONAL)).thenReturn("false");
 
         StandardEnvironmentFacadeFactory factory = new StandardEnvironmentFacadeFactory();
         EnvironmentFacade facade = factory.createEnvironmentFacade(_parent);
         try
         {
-            assertNotNull("Facade should not be null", facade);
-            Environment environment = facade.getEnvironment();
-            for (String key : _jeProperties.keySet())
-            {
-                when(_parent.getContextValue(String.class, key)).thenReturn(_jeProperties.get(key));
-                assertEquals("Unexpected environment setting", _jeProperties.get(key), environment.getConfig().getConfigParam(key));
-            }
+            facade.beginTransaction(null);
+            fail("Context variables were not picked up on environment creation");
         }
-        finally
+        catch(UnsupportedOperationException e)
         {
-            facade.getEnvironment().close();
+            //pass
         }
     }
 

Modified: qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java (original)
+++ qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java Tue Aug 25 19:48:16 2015
@@ -27,6 +27,8 @@ import java.io.File;
 import java.util.Collections;
 import java.util.Map;
 
+import com.sleepycat.je.Transaction;
+import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.util.FileUtils;
 
@@ -65,14 +67,6 @@ public class StandardEnvironmentFacadeTe
         }
     }
 
-    public void testEnvironmentFacade() throws Exception
-    {
-        EnvironmentFacade ef = createEnvironmentFacade();
-        assertNotNull("Environment should not be null", ef);
-        Environment e = ef.getEnvironment();
-        assertTrue("Environment is not valid", e.isValid());
-    }
-
     public void testSecondEnvironmentFacadeUsingSamePathRejected() throws Exception
     {
         EnvironmentFacade ef = createEnvironmentFacade();
@@ -97,21 +91,27 @@ public class StandardEnvironmentFacadeTe
     {
         EnvironmentFacade ef = createEnvironmentFacade();
         ef.close();
-        Environment e = ef.getEnvironment();
-
-        assertNull("Environment should be null after facade close", e);
     }
 
     public void testOverrideJeParameter() throws Exception
     {
-        String statCollectVarName = EnvironmentConfig.STATS_COLLECT;
-
+        // verify that transactions can be created by default
         EnvironmentFacade ef = createEnvironmentFacade();
-        assertEquals("false", ef.getEnvironment().getMutableConfig().getConfigParam(statCollectVarName));
+        Transaction t = ef.beginTransaction(null);
+        t.commit();
         ef.close();
 
-        ef = createEnvironmentFacade(Collections.singletonMap(statCollectVarName, "true"));
-        assertEquals("true", ef.getEnvironment().getMutableConfig().getConfigParam(statCollectVarName));
+        // customize the environment to be non-transactional
+        ef = createEnvironmentFacade(Collections.singletonMap(EnvironmentConfig.ENV_IS_TRANSACTIONAL, "false"));
+        try
+        {
+            ef.beginTransaction(null);
+            fail("Overridden settings were not picked up on environment creation");
+        }
+        catch(UnsupportedOperationException e)
+        {
+            // pass
+        }
         ef.close();
     }
 

Modified: qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1697761&r1=1697760&r2=1697761&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java (original)
+++ qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java Tue Aug 25 19:48:16 2015
@@ -48,9 +48,9 @@ import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
 import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.Durability;
-import com.sleepycat.je.Environment;
 import com.sleepycat.je.EnvironmentConfig;
 import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
 import com.sleepycat.je.rep.NodeState;
 import com.sleepycat.je.rep.ReplicaWriteException;
 import com.sleepycat.je.rep.ReplicatedEnvironment;
@@ -126,21 +126,12 @@ public class ReplicatedEnvironmentFacade
 
         _portHelper.waitUntilAllocatedPortsAreFree();
     }
-    public void testEnvironmentFacade() throws Exception
-    {
-        EnvironmentFacade ef = createMaster();
-        assertNotNull("Environment should not be null", ef);
-        Environment e = ef.getEnvironment();
-        assertTrue("Environment is not valid", e.isValid());
-    }
 
     public void testClose() throws Exception
     {
-        EnvironmentFacade ef = createMaster();
+        ReplicatedEnvironmentFacade ef = createMaster();
         ef.close();
-        Environment e = ef.getEnvironment();
-
-        assertNull("Environment should be null after facade close", e);
+        assertEquals("Unexpected state after close", ReplicatedEnvironmentFacade.State.CLOSED, ef.getFacadeState());
     }
 
     public void testOpenDatabaseReusesCachedHandle() throws Exception
@@ -675,7 +666,9 @@ public class ReplicatedEnvironmentFacade
         Transaction txn = null;
         try
         {
-            txn = facade.beginTransaction();
+            TransactionConfig transactionConfig = new TransactionConfig();
+            transactionConfig.setDurability(facade.getRealMessageStoreDurability());
+            txn = facade.beginTransaction(transactionConfig);
             assertNotNull("Transaction is not created", txn);
             txn.commit();
             txn = null;
@@ -771,9 +764,7 @@ public class ReplicatedEnvironmentFacade
 
         node1.setDesignatedPrimary(true);
 
-        Transaction txn = node1.beginTransaction();
-        Database db = node1.getEnvironment().openDatabase(txn, "mydb", createConfig);
-        txn.commit();
+        Database db = node1.openDatabase("mydb", createConfig);
 
         // Put a record (that will be replicated)
         putRecord(node1, db, 1, "value1");
@@ -788,6 +779,7 @@ public class ReplicatedEnvironmentFacade
         // Stop node1
         node1.close();
 
+        LOGGER.debug("RESTARTING " + replicaName);
         // Restart the node2, making it primary so it becomes master
         TestStateChangeListener node2StateChangeListener = new TestStateChangeListener(State.MASTER);
         node2 = addNode(replicaName, replicaNodeHostPort, true, node2StateChangeListener, new NoopReplicationGroupListener());
@@ -795,13 +787,12 @@ public class ReplicatedEnvironmentFacade
         assertTrue(replicaName + " did not go into desired state; current actual state is "
                    + node2StateChangeListener.getCurrentActualState(), awaitForStateChange);
 
-        txn = node2.beginTransaction();
-        db = node2.getEnvironment().openDatabase(txn, "mydb", DatabaseConfig.DEFAULT);
-        txn.commit();
+        db = node2.openDatabase("mydb", DatabaseConfig.DEFAULT);
 
         // Do a transaction on node2. The two environments will have diverged
         putRecord(node2, db, 3, "diverged");
 
+        LOGGER.debug("RESTARTING " + TEST_NODE_NAME);
         // Now restart node1 and ensure that it realises it needs to rollback before it can rejoin.
         TestStateChangeListener node1StateChangeListener = new TestStateChangeListener(State.REPLICA);
         final CountDownLatch _replicaRolledback = new CountDownLatch(1);
@@ -810,10 +801,11 @@ public class ReplicatedEnvironmentFacade
             @Override
             public void onNodeRolledback()
             {
+                LOGGER.debug("onNodeRolledback in " + TEST_NODE_NAME);
                 _replicaRolledback.countDown();
             }
         });
-        assertTrue("Node 1 did not go into desired state and remained in state " + node1.getNodeState(),
+        assertTrue("Node 1 did not go into desired state",
                    node1StateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
         assertTrue("Node 1 did not experience rollback within timeout",
                    _replicaRolledback.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
@@ -822,6 +814,7 @@ public class ReplicatedEnvironmentFacade
         putRecord(node2, db, 4, "value4");
         db.close();
 
+        LOGGER.debug("CLOSING");
         node1.close();
         node2.close();
     }
@@ -850,7 +843,7 @@ public class ReplicatedEnvironmentFacade
                 @Override
                 public Transaction call() throws Exception
                 {
-                    return  replica.getEnvironment().beginTransaction(null, null);
+                    return  replica.beginTransaction(null);
                 }
             });
             Transaction transaction = future.get(5, TimeUnit.SECONDS);
@@ -894,7 +887,9 @@ public class ReplicatedEnvironmentFacade
         DatabaseEntry key = new DatabaseEntry();
         DatabaseEntry data = new DatabaseEntry();
 
-        Transaction txn = master.beginTransaction();
+        TransactionConfig transactionConfig = new TransactionConfig();
+        transactionConfig.setDurability(master.getRealMessageStoreDurability());
+        Transaction txn = master.beginTransaction(transactionConfig);
         IntegerBinding.intToEntry(keyValue, key);
         StringBinding.stringToEntry(dataValue, data);
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org