You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2013/12/10 18:19:49 UTC

svn commit: r1549898 [3/4] - in /qpid/branches/java-broker-bdb-ha/qpid/java: bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/ bdbstore/src/main/java/org/apac...

Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java?rev=1549898&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java Tue Dec 10 17:19:47 2013
@@ -0,0 +1,656 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.store.StoreFuture;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.EnvironmentFailureException;
+import com.sleepycat.je.ExceptionEvent;
+import com.sleepycat.je.ExceptionListener;
+import com.sleepycat.je.OperationFailureException;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.InsufficientReplicasException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationMutableConfig;
+import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+
+public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
+{
+    @SuppressWarnings("serial")
+    private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
+    {{
+        /**
+         * Parameter decreased as the 24h default may lead very large log files for most users.
+         */
+        put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h");
+        /**
+         * Parameter increased as the 5 s default may lead to spurious timeouts.
+         */
+        put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s");
+        /**
+         * Parameter increased as the 10 s default may lead to spurious timeouts.
+         */
+        put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s");
+        /**
+         * Parameter increased as the 10 h default may cause user confusion.
+         */
+        put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min");
+        /**
+         * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False
+         * is scheduled to become default after JE 5.0.48.
+         */
+        put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString());
+        /**
+         * Parameter decreased as a default 5min interval may lead to bigger data losses on Node
+         * with NO_SYN durability in case if such Node crushes.
+         */
+        put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
+
+        /**
+         * Timeout to transit into UNKNOWN state if the majority is not available.
+         * By default it is switched off.
+         */
+        put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s");
+    }});
+
+    private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class);
+    public static final String TYPE = "BDB-HA";
+
+    // TODO: get rid of these names
+    public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
+    public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
+
+    private volatile ReplicatedEnvironment _environment;
+    private CommitThreadWrapper _commitThreadWrapper;
+
+    private String _groupName;
+    private String _nodeName;
+    private String _nodeHostPort;
+    private String _helperHostPort;
+    private Durability _durability;
+    private boolean _designatedPrimary;
+    private boolean _coalescingSync;
+    private volatile StateChangeListener _stateChangeListener;
+    private String _environmentPath;
+    private Map<String, String> _environmentParameters;
+    private Map<String, String> _replicationEnvironmentParameters;
+    private String _name;
+    private ExecutorService _executor = Executors.newFixedThreadPool(1);
+    private AtomicReference<State> _state = new AtomicReference<State>(State.INITIAL);
+
+    private final ConcurrentMap<String, Database> _databases = new ConcurrentHashMap<String, Database>();
+
+    public ReplicatedEnvironmentFacade(String name, String environmentPath, String groupName, String nodeName, String nodeHostPort,
+            String helperHostPort, Durability durability, boolean designatedPrimary, boolean coalescingSync,
+            Map<String, String> environmentParameters, Map<String, String> replicationEnvironmentParameters)
+    {
+
+        _name = name;
+        _environmentPath = environmentPath;
+        _groupName = groupName;
+        _nodeName = nodeName;
+        _nodeHostPort = nodeHostPort;
+        _helperHostPort = helperHostPort;
+        _durability = durability;
+        _designatedPrimary = designatedPrimary;
+        _coalescingSync = coalescingSync;
+        _environmentParameters = environmentParameters;
+        _replicationEnvironmentParameters = replicationEnvironmentParameters;
+
+        _state.set(State.OPENING);
+        _environment = createEnvironment(environmentPath, groupName, nodeName, nodeHostPort, helperHostPort, durability,
+                designatedPrimary, environmentParameters, replicationEnvironmentParameters);
+        startCommitThread(name, _environment);
+    }
+
+    @Override
+    public StoreFuture commit(final Transaction tx, final boolean syncCommit) throws AMQStoreException
+    {
+        try
+        {
+            // Using commit() instead of commitNoSync() for the HA store
+            // to allow
+            // the HA durability configuration to influence resulting
+            // behaviour.
+            tx.commit();
+
+            if (_coalescingSync)
+            {
+                return _commitThreadWrapper.commit(tx, syncCommit);
+            }
+            else
+            {
+                return StoreFuture.IMMEDIATE_FUTURE;
+            }
+        }
+        catch (DatabaseException de)
+        {
+            throw handleDatabaseException("Got DatabaseException on commit, closing environment", de);
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        if (_state.compareAndSet(State.INITIAL, State.CLOSING) || _state.compareAndSet(State.OPENING, State.CLOSING) ||
+                _state.compareAndSet(State.OPEN, State.CLOSING) || _state.compareAndSet(State.RESTARTING, State.CLOSING) )
+        {
+            try
+            {
+                _executor.shutdownNow();
+                stopCommitThread();
+                closeDatabases();
+                closeEnvironment();
+            }
+            finally
+            {
+                _state.compareAndSet(State.CLOSING, State.CLOSED);
+            }
+        }
+    }
+
+    @Override
+    public AMQStoreException handleDatabaseException(String contextMessage, DatabaseException e)
+    {
+        boolean restart = (e instanceof InsufficientReplicasException || e instanceof InsufficientReplicasException);
+        if (restart)
+        {
+            if (_state.compareAndSet(State.OPEN, State.RESTARTING))
+            {
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Environment restarting due to exception " + e.getMessage(), e);
+                }
+                _executor.execute(new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            restartEnvironment();
+                        }
+                        catch (Exception e)
+                        {
+                            LOGGER.error("Exception on environment restart", e);
+                        }
+                    }
+                });
+
+            }
+            else
+            {
+                LOGGER.info("Cannot restart environment because of facade state: " + _state.get());
+            }
+        }
+        return new AMQStoreException(contextMessage, e);
+    }
+
+    @Override
+    public void openDatabases(String[] databaseNames, DatabaseConfig dbConfig) throws AMQStoreException
+    {
+        for (String databaseName : databaseNames)
+        {
+            Database database = _environment.openDatabase(null, databaseName, dbConfig);
+            _databases.put(databaseName, database);
+        }
+    }
+
+    @Override
+    public Database getOpenDatabase(String name)
+    {
+        if (!_environment.isValid())
+        {
+            throw new IllegalStateException("Environment is not valid");
+        }
+        Database database = _databases.get(name);
+        if (database == null)
+        {
+            throw new IllegalArgumentException("Database with name '" + name + "' has not been opened");
+        }
+        return database;
+    }
+
+    @Override
+    public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+    {
+        ReplicatedEnvironment.State state = stateChangeEvent.getState();
+        LOGGER.info("The node state is " + state);
+        if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER)
+        {
+            if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN))
+            {
+                LOGGER.info("The environment facade is in open state");
+            }
+        }
+        if (_state.get() != State.CLOSING && _state.get() != State.CLOSED)
+        {
+            StateChangeListener listener = _stateChangeListener;
+            if (listener != null)
+            {
+                listener.stateChange(stateChangeEvent);
+            }
+        }
+    }
+
+    public void setStateChangeListener(StateChangeListener listener)
+    {
+        _stateChangeListener = listener;
+        _environment.setStateChangeListener(this);
+    }
+
+    public String getName()
+    {
+        return _name;
+    }
+
+    public String getGroupName()
+    {
+        return _groupName;
+    }
+
+    public String getNodeName()
+    {
+        return _nodeName;
+    }
+
+    public String getNodeHostPort()
+    {
+        return _nodeHostPort;
+    }
+
+    public String getHelperHostPort()
+    {
+        return _helperHostPort;
+    }
+
+    public String getDurability()
+    {
+        return _durability.toString();
+    }
+
+    public boolean isCoalescingSync()
+    {
+        return _coalescingSync;
+    }
+
+    public String getNodeState()
+    {
+        ReplicatedEnvironment.State state = _environment.getState();
+        return state.toString();
+    }
+
+    public boolean isDesignatedPrimary()
+    {
+        return _environment.getRepMutableConfig().getDesignatedPrimary();
+    }
+
+    public List<Map<String, String>> getGroupMembers()
+    {
+        List<Map<String, String>> members = new ArrayList<Map<String, String>>();
+
+        for (ReplicationNode node : _environment.getGroup().getNodes())
+        {
+            Map<String, String> nodeMap = new HashMap<String, String>();
+            nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, node.getName());
+            nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort());
+            members.add(nodeMap);
+        }
+
+        return members;
+    }
+
+    public void removeNodeFromGroup(final String nodeName) throws AMQStoreException
+    {
+        try
+        {
+            createReplicationGroupAdmin().removeMember(nodeName);
+        }
+        catch (OperationFailureException ofe)
+        {
+            // TODO: I am not sure about the exception handing here
+            throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe);
+        }
+        catch (DatabaseException e)
+        {
+            // TODO: I am not sure about the exception handing here
+            throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e);
+        }
+    }
+
+    public void setDesignatedPrimary(final boolean isPrimary) throws AMQStoreException
+    {
+        try
+        {
+            final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+            final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary);
+            _environment.setRepMutableConfig(newConfig);
+
+            if (LOGGER.isInfoEnabled())
+            {
+                LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group");
+            }
+
+        }
+        catch (DatabaseException e)
+        {
+            // TODO: I am not sure about the exception handing here
+            throw handleDatabaseException("Cannot set designated primary", e);
+        }
+    }
+
+    public void updateAddress(final String nodeName, final String newHostName, final int newPort) throws AMQStoreException
+    {
+        try
+        {
+            createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
+
+        }
+        catch (OperationFailureException ofe)
+        {
+            // TODO: I am not sure about the exception handing here
+            throw new AMQStoreException("Failed to update address for '" + nodeName + "' with new host " + newHostName
+                    + " and new port " + newPort + ". " + ofe.getMessage(), ofe);
+        }
+        catch (DatabaseException e)
+        {
+            // TODO: I am not sure about the exception handing here
+            throw handleDatabaseException("Failed to update address for '" + nodeName + "' with new host " + newHostName
+                    + " and new port " + newPort + ". " + e.getMessage(), e);
+        }
+    }
+
+    public int getNodePriority()
+    {
+        ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
+        return repConfig.getNodePriority();
+    }
+
+    public int getElectableGroupSizeOverride()
+    {
+        ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
+        return repConfig.getElectableGroupSizeOverride();
+    }
+
+    public ReplicatedEnvironment getEnvironment()
+    {
+        return _environment;
+    }
+
+    public State getFacadeState()
+    {
+        return _state.get();
+    }
+
+    private ReplicationGroupAdmin createReplicationGroupAdmin()
+    {
+        final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
+        helpers.addAll(_environment.getRepConfig().getHelperSockets());
+
+        final ReplicationConfig repConfig = _environment.getRepConfig();
+        helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort()));
+
+        return new ReplicationGroupAdmin(_groupName, helpers);
+    }
+
+    private void closeEnvironment()
+    {
+        // Clean the log before closing. This makes sure it doesn't contain
+        // redundant data. Closing without doing this means the cleaner may not
+        // get a chance to finish.
+        try
+        {
+            if (_environment.isValid())
+            {
+                _environment.cleanLog();
+            }
+        }
+        finally
+        {
+            _environment.close();
+            _environment = null;
+        }
+    }
+
+    private void startCommitThread(String name, Environment environment)
+    {
+        if (_coalescingSync)
+        {
+            _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, environment);
+            _commitThreadWrapper.startCommitThread();
+        }
+    }
+
+    private void stopCommitThread()
+    {
+        if (_coalescingSync)
+        {
+            try
+            {
+                _commitThreadWrapper.stopCommitThread();
+            }
+            catch (InterruptedException e)
+            {
+                LOGGER.warn("Stopping of commit thread is interrupted", e);
+                Thread.interrupted();
+            }
+        }
+    }
+
+    private void restartEnvironment() throws AMQStoreException
+    {
+        LOGGER.info("Restarting environment");
+
+        stopCommitThread();
+
+        Set<String> databaseNames = new HashSet<String>(_databases.keySet());
+        closeEnvironmentSafely();
+
+        _environment = createEnvironment(_environmentPath, _groupName, _nodeName, _nodeHostPort, _helperHostPort, _durability,
+                _designatedPrimary, _environmentParameters, _replicationEnvironmentParameters);
+
+        DatabaseConfig dbConfig = new DatabaseConfig();
+        dbConfig.setTransactional(true);
+        // TODO Alex and I think this should be removed.
+        openDatabases(databaseNames.toArray(new String[databaseNames.size()]), dbConfig);
+
+        startCommitThread(_name, _environment);
+
+        _environment.setStateChangeListener(this);
+
+        LOGGER.info("Environment is restarted");
+
+    }
+
+    private void closeEnvironmentSafely()
+    {
+        Environment environment = _environment;
+        if (environment != null)
+        {
+            try
+            {
+                if (environment.isValid())
+                {
+                    try
+                    {
+                        closeDatabases();
+                    }
+                    catch(Exception e)
+                    {
+                        LOGGER.warn("Ignoring an exception whilst closing databases", e);
+                    }
+                }
+                environment.close();
+            }
+            catch (EnvironmentFailureException efe)
+            {
+                LOGGER.warn("Ignoring an exception whilst closing environment", efe);
+            }
+        }
+    }
+
+    private void closeDatabases()
+    {
+        RuntimeException firstThrownException = null;
+        for (Database database : _databases.values())
+        {
+            try
+            {
+                database.close();
+            }
+            catch(RuntimeException e)
+            {
+                if (firstThrownException == null)
+                {
+                    firstThrownException = e;
+                }
+            }
+        }
+        _databases.clear();
+        if (firstThrownException != null)
+        {
+            throw firstThrownException;
+        }
+    }
+
+    private ReplicatedEnvironment createEnvironment(String environmentPath, String groupName, String nodeName, String nodeHostPort,
+            String helperHostPort, Durability durability, boolean designatedPrimary, Map<String, String> environmentParameters,
+            Map<String, String> replicationEnvironmentParameters)
+    {
+        if (LOGGER.isInfoEnabled())
+        {
+            LOGGER.info("Creating environment");
+            LOGGER.info("Environment path " + environmentPath);
+            LOGGER.info("Group name " + groupName);
+            LOGGER.info("Node name " + nodeName);
+            LOGGER.info("Node host port " + nodeHostPort);
+            LOGGER.info("Helper host port " + helperHostPort);
+            LOGGER.info("Durability " + durability);
+            LOGGER.info("Coalescing sync " + _coalescingSync);
+            LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary);
+        }
+
+        Map<String, String> replicationEnvironmentSettings = new HashMap<String, String>(REPCONFIG_DEFAULTS);
+        if (replicationEnvironmentParameters != null && !replicationEnvironmentParameters.isEmpty())
+        {
+            replicationEnvironmentSettings.putAll(replicationEnvironmentParameters);
+        }
+        Map<String, String> environmentSettings = new HashMap<String, String>(EnvironmentFacade.ENVCONFIG_DEFAULTS);
+        if (environmentParameters != null && !environmentParameters.isEmpty())
+        {
+            environmentSettings.putAll(environmentParameters);
+        }
+
+        final ReplicationConfig replicationConfig = new ReplicationConfig(groupName, nodeName, nodeHostPort);
+        replicationConfig.setHelperHosts(helperHostPort);
+        replicationConfig.setDesignatedPrimary(designatedPrimary);
+
+        for (Map.Entry<String, String> configItem : replicationEnvironmentSettings.entrySet())
+        {
+            if (LOGGER.isInfoEnabled())
+            {
+                LOGGER.info("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+            }
+            replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+        }
+
+        EnvironmentConfig envConfig = new EnvironmentConfig();
+        envConfig.setAllowCreate(true);
+        envConfig.setTransactional(true);
+        envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
+        envConfig.setDurability(durability);
+
+        for (Map.Entry<String, String> configItem : environmentSettings.entrySet())
+        {
+            if (LOGGER.isInfoEnabled())
+            {
+                LOGGER.info("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+            }
+            envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+        }
+
+        ReplicatedEnvironment environment = null;
+        try
+        {
+            environment = new ReplicatedEnvironment(new File(environmentPath), replicationConfig, envConfig);
+        }
+        catch (final InsufficientLogException ile)
+        {
+            LOGGER.info("InsufficientLogException thrown and so full network restore required", ile);
+            NetworkRestore restore = new NetworkRestore();
+            NetworkRestoreConfig config = new NetworkRestoreConfig();
+            config.setRetainLogFiles(false);
+            restore.execute(ile, config);
+            environment = new ReplicatedEnvironment(new File(environmentPath), replicationConfig, envConfig);
+        }
+        return environment;
+    }
+
+    private class LoggingAsyncExceptionListener implements ExceptionListener
+    {
+        @Override
+        public void exceptionThrown(ExceptionEvent event)
+        {
+            LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException());
+        }
+    }
+
+    public static enum State
+    {
+        INITIAL,
+        OPENING,
+        OPEN,
+        RESTARTING,
+        CLOSING,
+        CLOSED
+    }
+
+}

Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java?rev=1549898&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java Tue Dec 10 17:19:47 2013
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.VirtualHost;
+
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Durability.ReplicaAckPolicy;
+import com.sleepycat.je.Durability.SyncPolicy;
+
+public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
+{
+
+    private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC,
+            ReplicaAckPolicy.SIMPLE_MAJORITY);
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public EnvironmentFacade createEnvironmentFacade(String name, String storeLocation, VirtualHost virtualHost)
+    {
+        // Mandatory configuration
+        String groupName = getValidatedStringAttribute(virtualHost, "haGroupName");
+        String nodeName = getValidatedStringAttribute(virtualHost, "haNodeName");
+        String nodeHostPort = getValidatedStringAttribute(virtualHost, "haNodeAddress");
+        String helperHostPort = getValidatedStringAttribute(virtualHost, "haHelperAddress");
+
+        // Optional configuration
+        Durability durability = null;
+        String durabilitySetting = getStringAttribute(virtualHost, "haDurability", null);
+        if (durabilitySetting == null)
+        {
+            durability = DEFAULT_DURABILITY;
+        }
+        else
+        {
+            durability = Durability.parse(durabilitySetting);
+        }
+        Boolean designatedPrimary = getBooleanAttribute(virtualHost, "haDesignatedPrimary", Boolean.FALSE);
+        Boolean coalescingSync = getBooleanAttribute(virtualHost, "haCoalescingSync", Boolean.TRUE);
+
+        Map<String, String> replicationConfig = null;
+        Object repConfigAttr = virtualHost.getAttribute("haReplicationConfig");
+        if (repConfigAttr instanceof Map)
+        {
+            replicationConfig = new HashMap<String, String>((Map<String, String>) repConfigAttr);
+        }
+
+        if (coalescingSync && durability.getLocalSync() == SyncPolicy.SYNC)
+        {
+            throw new IllegalConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC
+                    + "! Please set highAvailability.coalescingSync to false in store configuration.");
+        }
+
+        Map<String, String> envConfigMap = null;
+        Object bdbEnvConfigAttr = virtualHost.getAttribute("bdbEnvironmentConfig");
+        if (bdbEnvConfigAttr instanceof Map)
+        {
+            envConfigMap = new HashMap<String, String>((Map<String, String>) bdbEnvConfigAttr);
+        }
+
+        return new ReplicatedEnvironmentFacade(name, storeLocation, groupName, nodeName, nodeHostPort, helperHostPort, durability,
+                designatedPrimary, coalescingSync, envConfigMap, replicationConfig);
+    }
+
+    private String getValidatedStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName)
+    {
+        Object attrValue = virtualHost.getAttribute(attributeName);
+        if (attrValue != null)
+        {
+            return attrValue.toString();
+        }
+        else
+        {
+            throw new IllegalConfigurationException("BDB HA configuration key not found. Please specify configuration attribute: "
+                    + attributeName);
+        }
+    }
+
+    private String getStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, String defaultVal)
+    {
+        Object attrValue = virtualHost.getAttribute(attributeName);
+        if (attrValue != null)
+        {
+            return attrValue.toString();
+        }
+        return defaultVal;
+    }
+
+    private boolean getBooleanAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, boolean defaultVal)
+    {
+        Object attrValue = virtualHost.getAttribute(attributeName);
+        if (attrValue != null)
+        {
+            if (attrValue instanceof Boolean)
+            {
+                return ((Boolean) attrValue).booleanValue();
+            }
+            else if (attrValue instanceof String)
+            {
+                return Boolean.parseBoolean((String) attrValue);
+            }
+
+        }
+        return defaultVal;
+    }
+
+}

Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1549898&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Tue Dec 10 17:19:47 2013
@@ -0,0 +1,257 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.store.StoreFuture;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.ExceptionEvent;
+import com.sleepycat.je.ExceptionListener;
+
+public class StandardEnvironmentFacade implements EnvironmentFacade
+{
+    private static final Logger LOGGER = Logger.getLogger(StandardEnvironmentFacade.class);
+    public static final String TYPE = "BDB";
+
+    private Environment _environment;
+    private CommitThreadWrapper _commitThreadWrapper;
+    private final Map<String, Database> _databases = new HashMap<String, Database>();
+
+    public StandardEnvironmentFacade(String name, String storePath, Map<String, String> attributes)
+    {
+
+        LOGGER.info("BDB message store using environment path " + storePath);
+
+        EnvironmentConfig envConfig = new EnvironmentConfig();
+        envConfig.setAllowCreate(true);
+        envConfig.setTransactional(true);
+
+        for (Map.Entry<String, String> configItem : attributes.entrySet())
+        {
+            LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+            envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+        }
+
+        envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
+
+        try
+        {
+            _environment = new Environment(new File(storePath), envConfig);
+        }
+        catch (DatabaseException de)
+        {
+            if (de.getMessage().contains("Environment.setAllowCreate is false"))
+            {
+                // Allow the creation this time
+                envConfig.setAllowCreate(true);
+                _environment = new Environment(new File(storePath), envConfig);
+            }
+            else
+            {
+                throw de;
+            }
+        }
+        _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, _environment);
+        _commitThreadWrapper.startCommitThread();
+    }
+
+
+    @Override
+    public StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException
+    {
+        try
+        {
+            tx.commitNoSync();
+        }
+        catch (DatabaseException de)
+        {
+            LOGGER.error("Got DatabaseException on commit, closing environment", de);
+
+            closeEnvironmentSafely();
+
+            throw handleDatabaseException("Got DatabaseException on commit", de);
+        }
+
+        return _commitThreadWrapper.commit(tx, syncCommit);
+    }
+
+    @Override
+    public void close()
+    {
+        stopCommitThread();
+        closeDatabases();
+        closeEnvironment();
+    }
+
+    private void closeDatabases()
+    {
+        RuntimeException firstThrownException = null;
+        for (Database database : _databases.values())
+        {
+            try
+            {
+                database.close();
+            }
+            catch(RuntimeException e)
+            {
+                if (firstThrownException == null)
+                {
+                    firstThrownException = e;
+                }
+            }
+        }
+        if (firstThrownException != null)
+        {
+            throw firstThrownException;
+        }
+    }
+
+    private void closeEnvironmentSafely()
+    {
+        stopCommitThread();
+        if (_environment != null)
+        {
+            if (_environment.isValid())
+            {
+                try
+                {
+                    closeDatabases();
+                }
+                catch(Exception e)
+                {
+                    LOGGER.error("Exception closing environment databases", e);
+                }
+            }
+            try
+            {
+                _environment.close();
+            }
+            catch (DatabaseException ex)
+            {
+                LOGGER.error("Exception closing store environment", ex);
+            }
+            catch (IllegalStateException ex)
+            {
+                LOGGER.error("Exception closing store environment", ex);
+            }
+            finally
+            {
+                _environment = null;
+            }
+        }
+    }
+
+    @Override
+    public Environment getEnvironment()
+    {
+        return _environment;
+    }
+
+    private void closeEnvironment()
+    {
+        if (_environment != null)
+        {
+            // Clean the log before closing. This makes sure it doesn't contain
+            // redundant data. Closing without doing this means the cleaner may
+            // not get a chance to finish.
+            try
+            {
+                _environment.cleanLog();
+            }
+            finally
+            {
+                _environment.close();
+                _environment = null;
+            }
+        }
+    }
+
+    private void stopCommitThread()
+    {
+        if (_commitThreadWrapper != null)
+        {
+            try
+            {
+                _commitThreadWrapper.stopCommitThread();
+            }
+            catch (InterruptedException e)
+            {
+                LOGGER.warn("Stopping of commit thread is interrupted", e);
+                Thread.interrupted();
+            }
+            finally
+            {
+                _commitThreadWrapper = null;
+            }
+        }
+    }
+
+    @Override
+    public AMQStoreException handleDatabaseException(String contextMessage, DatabaseException e)
+    {
+        if (_environment != null && !_environment.isValid())
+        {
+            closeEnvironmentSafely();
+        }
+        return new AMQStoreException(contextMessage, e);
+    }
+
+    private class LoggingAsyncExceptionListener implements ExceptionListener
+    {
+        @Override
+        public void exceptionThrown(ExceptionEvent event)
+        {
+            LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException());
+        }
+    }
+
+    @Override
+    public void openDatabases(String[] databaseNames, DatabaseConfig dbConfig)
+    {
+        for (String databaseName : databaseNames)
+        {
+            Database database = _environment.openDatabase(null, databaseName, dbConfig);
+            _databases .put(databaseName, database);
+        }
+    }
+
+    @Override
+    public Database getOpenDatabase(String name)
+    {
+        Database database = _databases.get(name);
+        if (database == null)
+        {
+            throw new IllegalArgumentException("Database with name '" + name + "' has not been opened");
+        }
+        return database;
+    }
+
+}

Copied: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java (from r1549894, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java&r1=1549894&r2=1549898&rev=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java Tue Dec 10 17:19:47 2013
@@ -1,4 +1,5 @@
 /*
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,26 +20,27 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.test.utils.QpidTestCase;
+import java.util.HashMap;
+import java.util.Map;
 
-import static org.mockito.Mockito.mock;
+import org.apache.qpid.server.model.VirtualHost;
 
-public class HAMessageStoreSmokeTest extends QpidTestCase
+public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory
 {
-    private final BDBHAMessageStore _store = new BDBHAMessageStore();
 
-    public void testMissingHAConfigThrowsException() throws Exception
+    @SuppressWarnings("unchecked")
+    @Override
+    public EnvironmentFacade createEnvironmentFacade(String name, String storePath, VirtualHost virtualHost)
     {
-        try
-        {
-            _store.configure(mock(VirtualHost.class));
-            fail("Expected an exception to be thrown");
-        }
-        catch (ConfigurationException ce)
+        Map<String, String> envConfigMap = new HashMap<String, String>();
+        envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
+
+        Object bdbEnvConfigAttr = virtualHost.getAttribute("bdbEnvironmentConfig");
+        if (bdbEnvConfigAttr instanceof Map)
         {
-            assertTrue(ce.getMessage().contains("BDB HA configuration key not found"));
+            envConfigMap.putAll((Map<String, String>) bdbEnvConfigAttr);
         }
+        return new StandardEnvironmentFacade(name, storePath, envConfigMap);
     }
+
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java Tue Dec 10 17:19:47 2013
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.ber
 
 import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.Environment;
+
 import org.apache.qpid.AMQStoreException;
 
 public interface StoreUpgrade

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java Tue Dec 10 17:19:47 2013
@@ -45,7 +45,6 @@ import org.apache.qpid.server.model.Exch
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
 import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
 import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java Tue Dec 10 17:19:47 2013
@@ -25,7 +25,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 
 import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
 
 import com.sleepycat.bind.tuple.IntegerBinding;
 import com.sleepycat.bind.tuple.LongBinding;
@@ -34,7 +34,6 @@ import com.sleepycat.je.DatabaseConfig;
 import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.Environment;
-import com.sleepycat.je.LockMode;
 import com.sleepycat.je.OperationStatus;
 
 public class Upgrader
@@ -64,7 +63,7 @@ public class Upgrader
 
             if(versionDb.count() == 0L)
             {
-                int sourceVersion = isEmpty ? AbstractBDBMessageStore.VERSION: identifyOldStoreVersion();
+                int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion();
                 DatabaseEntry key = new DatabaseEntry();
                 IntegerBinding.intToEntry(sourceVersion, key);
                 DatabaseEntry value = new DatabaseEntry();
@@ -74,11 +73,11 @@ public class Upgrader
             }
 
             int version = getSourceVersion(versionDb);
-            if(version > AbstractBDBMessageStore.VERSION)
+            if(version > BDBMessageStore.VERSION)
             {
                 throw new AMQStoreException("Database version " + version
                                             + " is higher than the most recent known version: "
-                                            + AbstractBDBMessageStore.VERSION);
+                                            + BDBMessageStore.VERSION);
             }
             performUpgradeFromVersion(version, versionDb);
         }
@@ -127,7 +126,7 @@ public class Upgrader
     void performUpgradeFromVersion(int sourceVersion, Database versionDb)
             throws AMQStoreException
     {
-        while(sourceVersion != AbstractBDBMessageStore.VERSION)
+        while(sourceVersion != BDBMessageStore.VERSION)
         {
             upgrade(sourceVersion, ++sourceVersion);
             DatabaseEntry key = new DatabaseEntry();

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java Tue Dec 10 17:19:47 2013
@@ -32,9 +32,7 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.util.FileUtils;
 
-import com.sleepycat.je.Environment;
 import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.rep.ReplicatedEnvironment;
 import com.sleepycat.je.rep.ReplicationConfig;
 
 import static org.mockito.Matchers.eq;
@@ -98,18 +96,18 @@ public class BDBHAMessageStoreTest exten
         VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, _configXml.subset("virtualhosts.virtualhost." + vhostName), BrokerTestHelper.createBrokerMock());
 
         _virtualHost = BrokerTestHelper.createVirtualHost(configuration,null,_modelVhost);
-        BDBHAMessageStore store = (BDBHAMessageStore) _virtualHost.getMessageStore();
+        BDBMessageStore store = (BDBMessageStore) _virtualHost.getMessageStore();
 
         // test whether JVM system settings were applied
-        Environment env = store.getEnvironment();
-        assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS));
-        assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
+        EnvironmentFacade env = store.getEnvironmentFacade();
+        assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getEnvironment().getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS));
+        assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getEnvironment().getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
 
-        ReplicatedEnvironment repEnv = store.getReplicatedEnvironment();
+        ReplicatedEnvironmentFacade repEnv = (ReplicatedEnvironmentFacade)store.getEnvironmentFacade();
         assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES,
-                repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES));
+                repEnv.getEnvironment().getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES));
         assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT,
-                repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT));
+                repEnv.getEnvironment().getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT));
     }
 
     private void addVirtualHostConfiguration() throws Exception

Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeTestCase.java?rev=1549898&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeTestCase.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeTestCase.java Tue Dec 10 17:19:47 2013
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Environment;
+
+public abstract class EnvironmentFacadeTestCase extends QpidTestCase
+{
+    protected File _storePath;
+    protected EnvironmentFacade _environmentFacade;
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        _storePath = TestFileUtils.createTestDirectory("bdb", true);
+    }
+
+    protected void tearDown() throws Exception
+    {
+        try
+        {
+            super.tearDown();
+            if (_environmentFacade != null)
+            {
+                _environmentFacade.close();
+            }
+        }
+        finally
+        {
+            if (_storePath != null)
+            {
+                FileUtils.delete(_storePath, true);
+            }
+        }
+    }
+
+    public void testEnvironmentFacade()
+    {
+        EnvironmentFacade ef = getEnvironmentFacade();
+        assertNotNull("Environment should not be null", ef);
+        Environment e = ef.getEnvironment();
+        assertTrue("Environment is not valid", e.isValid());
+    }
+
+    public void testClose()
+    {
+        EnvironmentFacade ef = getEnvironmentFacade();
+        ef.close();
+        Environment e = ef.getEnvironment();
+
+        assertNull("Environment should be null after facade close", e);
+    }
+
+    public void testOpenDatabases() throws AMQStoreException
+    {
+        EnvironmentFacade ef = getEnvironmentFacade();
+        DatabaseConfig dbConfig = new DatabaseConfig();
+        dbConfig.setTransactional(true);
+        dbConfig.setAllowCreate(true);
+        ef.openDatabases(new String[]{"test1", "test2"}, dbConfig);
+        Database test1 = ef.getOpenDatabase("test1");
+        Database test2 = ef.getOpenDatabase("test2");
+
+        assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+        assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
+    }
+
+    public void testGetOpenDatabaseForNonExistingDatabase() throws AMQStoreException
+    {
+        EnvironmentFacade ef = getEnvironmentFacade();
+        DatabaseConfig dbConfig = new DatabaseConfig();
+        dbConfig.setTransactional(true);
+        dbConfig.setAllowCreate(true);
+        ef.openDatabases(new String[]{"test1"}, dbConfig);
+        Database test1 = ef.getOpenDatabase("test1");
+        assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+        try
+        {
+            ef.getOpenDatabase("test2");
+            fail("An exception should be thrown for the non existing database");
+        }
+        catch(IllegalArgumentException e)
+        {
+            assertEquals("Unexpected exception message", "Database with name 'test2' has not been opened", e.getMessage());
+        }
+    }
+
+    abstract EnvironmentFacade createEnvironmentFacade();
+
+    EnvironmentFacade getEnvironmentFacade()
+    {
+        if (_environmentFacade == null)
+        {
+            _environmentFacade = createEnvironmentFacade();
+        }
+        return _environmentFacade;
+    }
+
+}

Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java?rev=1549898&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java Tue Dec 10 17:19:47 2013
@@ -0,0 +1,449 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.rep.InsufficientReplicasException;
+import com.sleepycat.je.rep.ReplicatedEnvironment.State;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
+public class ReplicatedEnvironmentFacadeTest extends EnvironmentFacadeTestCase
+{
+    private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
+    private static final TimeUnit WAIT_STATE_CHANGE_TIME_UNIT = TimeUnit.SECONDS;
+    private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
+    private static final String TEST_GROUP_NAME = "testGroupName";
+    private static final String TEST_NODE_NAME = "testNodeName";
+    private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT;
+    private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT;
+    private static final Durability TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY");
+    private static final boolean TEST_DESIGNATED_PRIMARY = true;
+    private static final boolean TEST_COALESCING_SYNC = true;
+    private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
+
+    public void tearDown() throws Exception
+    {
+        try
+        {
+            for (EnvironmentFacade ef : _nodes.values())
+            {
+                ef.close();
+            }
+        }
+        finally
+        {
+            super.tearDown();
+        }
+    }
+
+    public void testGetName()
+    {
+        assertEquals("Unexpected name", getName(), getEnvironmentFacade().getName());
+    }
+
+    public void testGetGroupName()
+    {
+        assertEquals("Unexpected group name", TEST_GROUP_NAME, getEnvironmentFacade().getGroupName());
+    }
+
+    public void testGetNodeName()
+    {
+        assertEquals("Unexpected group name", TEST_NODE_NAME, getEnvironmentFacade().getNodeName());
+    }
+
+    public void testGetNodeHostPort()
+    {
+        assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, getEnvironmentFacade().getNodeHostPort());
+    }
+
+    public void testGetHelperHostPort()
+    {
+        assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, getEnvironmentFacade().getHelperHostPort());
+    }
+
+    public void testGetDurability()
+    {
+        assertEquals("Unexpected durability", TEST_DURABILITY.toString(), getEnvironmentFacade().getDurability());
+    }
+
+    public void testIsCoalescingSync()
+    {
+        assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, getEnvironmentFacade().isCoalescingSync());
+    }
+
+    public void testGetNodeState()
+    {
+        assertEquals("Unexpected state", State.MASTER.name(), getEnvironmentFacade().getNodeState());
+    }
+
+    public void testIsDesignatedPrimary()
+    {
+        assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, getEnvironmentFacade().isDesignatedPrimary());
+    }
+
+    public void testGetGroupMembers()
+    {
+        List<Map<String, String>> groupMembers = getEnvironmentFacade().getGroupMembers();
+        Map<String, String> expectedMember = new HashMap<String, String>();
+        expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
+        expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
+        Set<Map<String, String>> expectedGroupMembers = Collections.singleton(expectedMember);
+        assertEquals("Unexpected group members", expectedGroupMembers, new HashSet<Map<String, String>>(groupMembers));
+    }
+
+    public void testRemoveNodeFromGroup() throws Exception
+    {
+        ReplicatedEnvironmentFacade environmentFacade = getEnvironmentFacade();
+        String nodeName = TEST_NODE_NAME + "_2";
+        ReplicatedEnvironmentFacade ref2 = joinReplica(nodeName, "localhost:" + getNextAvailable(TEST_NODE_PORT + 1));
+        List<Map<String, String>> groupMembers = environmentFacade.getGroupMembers();
+        assertEquals("Unexpected group members count", 2, groupMembers.size());
+        ref2.close();
+
+        environmentFacade.removeNodeFromGroup(nodeName);
+        groupMembers = environmentFacade.getGroupMembers();
+        assertEquals("Unexpected group members count", 1, groupMembers.size());
+    }
+
+    public void testSetDesignatedPrimary() throws AMQStoreException
+    {
+        ReplicatedEnvironmentFacade environmentFacade = getEnvironmentFacade();
+        environmentFacade.setDesignatedPrimary(false);
+        assertFalse("Unexpected designated primary", environmentFacade.isDesignatedPrimary());
+    }
+
+    public void testGetNodePriority()
+    {
+        assertEquals("Unexpected node priority", 1, getEnvironmentFacade().getNodePriority());
+    }
+
+    public void testGetElectableGroupSizeOverride()
+    {
+        assertEquals("Unexpected Electable Group Size Override", 0, getEnvironmentFacade().getElectableGroupSizeOverride());
+    }
+
+    public void testEnvironmentRestartOnInsufficientReplicas() throws Exception
+    {
+        ReplicatedEnvironmentFacade[] nodes = startClusterSequentially(3);
+        ReplicatedEnvironmentFacade environmentFacade = nodes[0];
+
+        String databaseName = "test";
+        DatabaseConfig dbConfig = createDatabase(environmentFacade, databaseName);
+
+        // close replicas
+        nodes[1].close();
+        nodes[2].close();
+
+        final CountDownLatch nodeAwaitLatch = new CountDownLatch(1);
+        Environment e = environmentFacade.getEnvironment();
+        Database db = environmentFacade.getOpenDatabase(databaseName);
+        try
+        {
+            environmentFacade.openDatabases(new String[] { "test2" }, dbConfig);
+            fail("Opening of new database without quorum should fail");
+        }
+        catch(InsufficientReplicasException ex)
+        {
+            environmentFacade.handleDatabaseException(null, ex);
+        }
+
+        // restore quorum
+        nodes[1] = joinReplica(TEST_NODE_NAME + "_1", nodes[1].getNodeHostPort());
+        nodes[2] = joinReplica(TEST_NODE_NAME + "_2", nodes[2].getNodeHostPort());
+
+        environmentFacade.setStateChangeListener(new StateChangeListener()
+        {
+            @Override
+            public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+            {
+                if (stateChangeEvent.getState() == State.MASTER || stateChangeEvent.getState() == State.REPLICA)
+                {
+                    nodeAwaitLatch.countDown();
+                }
+            }
+        });
+
+        assertTrue("The node could not rejoin the cluster",
+                nodeAwaitLatch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
+
+        Environment e2 = environmentFacade.getEnvironment();
+        assertNotSame("Environment has not been restarted", e2, e);
+
+        Database db1 = environmentFacade.getOpenDatabase(databaseName);
+        assertNotSame("Database should be the re-created", db1, db);
+    }
+
+    public void testEnvironmentIsRestartOnlyOnceOnInsufficientReplicas() throws Exception
+    {
+        ReplicatedEnvironmentFacade[] nodes = startClusterSequentially(3);
+        final ReplicatedEnvironmentFacade environmentFacade = nodes[0];
+
+        int numberOfThreads = 100;
+
+        // restart counter
+        final AtomicInteger numberOfTimesElected = new AtomicInteger();
+        environmentFacade.setStateChangeListener(new StateChangeListener()
+        {
+            @Override
+            public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+            {
+                if (stateChangeEvent.getState() == State.MASTER)
+                {
+                    numberOfTimesElected.incrementAndGet();
+                }
+            }
+        });
+
+        String databaseName = "test";
+        createDatabase(environmentFacade, databaseName);
+        final CountDownLatch latch = new CountDownLatch(numberOfThreads);
+
+        final Database db = environmentFacade.getOpenDatabase(databaseName);
+
+        // close replicas
+        nodes[1].close();
+        nodes[2].close();
+
+        // perform transactions in separate threads in order to provoke InsufficientReplicasException
+        ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);
+        try
+        {
+            List<Callable<Void>> tasks = new ArrayList<Callable<Void>>();
+            for (int i = 0; i < numberOfThreads; i++)
+            {
+                final int index = i;
+                tasks.add(new Callable<Void>(){
+
+                    @Override
+                    public Void call() throws Exception
+                    {
+                        try
+                        {
+                            Transaction tx = environmentFacade.getEnvironment().beginTransaction(null, null);
+                            DatabaseEntry key = new DatabaseEntry();
+                            DatabaseEntry data = new DatabaseEntry();
+                            IntegerBinding.intToEntry(index, key);
+                            IntegerBinding.intToEntry(index, data);
+                            db.put(tx, key, data);
+                            tx.commit();
+                        }
+                        catch(DatabaseException e)
+                        {
+                            _environmentFacade.handleDatabaseException("Exception", e);
+                        }
+                        finally
+                        {
+                            latch.countDown();
+                        }
+                        return null;
+                    }});
+            }
+            service.invokeAll(tasks);
+            assertTrue("Not all tasks have been executed",
+                    latch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
+        }
+        finally
+        {
+            service.shutdown();
+        }
+
+        // restore quorum
+        nodes[1] = joinReplica(TEST_NODE_NAME + "_1", nodes[1].getNodeHostPort());
+        nodes[2] = joinReplica(TEST_NODE_NAME + "_2", nodes[2].getNodeHostPort());
+
+        long start = System.currentTimeMillis();
+        while(environmentFacade.getFacadeState() != ReplicatedEnvironmentFacade.State.OPEN && System.currentTimeMillis() - start < 10000l)
+        {
+            Thread.sleep(1000l);
+        }
+        assertEquals("EnvironmentFacade should be in open state", ReplicatedEnvironmentFacade.State.OPEN, environmentFacade.getFacadeState());
+
+        // it should be elected twice: once on first start-up and second time after environment restart
+        assertEquals("Elected master unexpected number of times", 2, numberOfTimesElected.get());
+    }
+
+    public void testFacadeStateTransitions() throws InterruptedException
+    {
+        String nodeName = "node1";
+        final String nodePath = createNodeWorkingFolder(nodeName);
+        ReplicatedEnvironmentFacade ref = null;
+        try
+        {
+            ref = createReplicatedEnvironmentFacade(nodeName, nodePath, TEST_NODE_HOST_PORT, false);
+            assertEquals("Unexpected state " + ref.getFacadeState(), ReplicatedEnvironmentFacade.State.OPENING, ref.getFacadeState());
+
+            final CountDownLatch nodeAwaitLatch = new CountDownLatch(1);
+            ref.setStateChangeListener(new StateChangeListener()
+            {
+                @Override
+                public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+                {
+                    if (stateChangeEvent.getState() == State.MASTER)
+                    {
+                        nodeAwaitLatch.countDown();
+                    }
+                }
+            });
+            assertTrue("Node did not join the cluster", nodeAwaitLatch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
+            assertEquals("Unexpected state " + ref.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, ref.getFacadeState());
+            ref.close();
+            assertEquals("Unexpected state " + ref.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, ref.getFacadeState());
+        }
+        finally
+        {
+            if (ref != null)
+            {
+                ref.close();
+            }
+        }
+    }
+
+    @Override
+    EnvironmentFacade createEnvironmentFacade()
+    {
+        try
+        {
+            return startNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, State.MASTER);
+        }
+        catch (InterruptedException e)
+        {
+            Thread.interrupted();
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    ReplicatedEnvironmentFacade getEnvironmentFacade()
+    {
+        return (ReplicatedEnvironmentFacade) super.getEnvironmentFacade();
+    }
+
+    private ReplicatedEnvironmentFacade joinReplica(final String nodeName, final String hostPort) throws InterruptedException
+    {
+        return startNode(nodeName, hostPort, false, State.REPLICA);
+    }
+
+    private ReplicatedEnvironmentFacade startNode(String nodeName, String nodeHostPort, boolean designatedPrimary, State targetState)
+            throws InterruptedException
+    {
+        final String nodePath = createNodeWorkingFolder(nodeName);
+        final CountDownLatch _nodeAwaitLatch = new CountDownLatch(1);
+        ReplicatedEnvironmentFacade ref = join(nodeName, nodePath, nodeHostPort, designatedPrimary, _nodeAwaitLatch, targetState);
+        assertTrue("Node did not join the cluster", _nodeAwaitLatch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
+        return ref;
+    }
+
+    private String createNodeWorkingFolder(String nodeName)
+    {
+        File nodeLocation = new File(_storePath, nodeName);
+        nodeLocation.mkdirs();
+        final String nodePath = nodeLocation.getAbsolutePath();
+        return nodePath;
+    }
+
+    private ReplicatedEnvironmentFacade join(String nodeName, String nodePath, String nodeHostPort, boolean designatedPrimary,
+            final CountDownLatch nodeAwaitLatch, final State expectedState)
+    {
+        ReplicatedEnvironmentFacade ref = createReplicatedEnvironmentFacade(nodeName, nodePath, nodeHostPort, designatedPrimary);
+
+        if (expectedState == State.REPLICA)
+        {
+            _nodes.put(nodeName, ref);
+        }
+        ref.setStateChangeListener(new StateChangeListener()
+        {
+            @Override
+            public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+            {
+                if (stateChangeEvent.getState() == expectedState)
+                {
+                    nodeAwaitLatch.countDown();
+                }
+            }
+        });
+        return ref;
+    }
+
+    private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodeName, String nodePath, String nodeHostPort,
+            boolean designatedPrimary)
+    {
+        Map<String, String> repConfig = new HashMap<String, String>();
+        repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
+        repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
+
+        ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(getName(), nodePath, TEST_GROUP_NAME, nodeName,
+                nodeHostPort, TEST_NODE_HELPER_HOST_PORT, TEST_DURABILITY, designatedPrimary, TEST_COALESCING_SYNC,
+                Collections.<String, String> emptyMap(), repConfig);
+        return ref;
+    }
+
+    private ReplicatedEnvironmentFacade[] startClusterSequentially(int nodeNumber) throws InterruptedException
+    {
+        // master
+        ReplicatedEnvironmentFacade environmentFacade = getEnvironmentFacade();
+        ReplicatedEnvironmentFacade[] nodes = new ReplicatedEnvironmentFacade[nodeNumber];
+        nodes[0] = environmentFacade;
+
+        int nodePort = TEST_NODE_PORT;
+        for (int i = 1; i < nodeNumber; i++)
+        {
+            nodePort = getNextAvailable(nodePort + 1);
+            nodes[i] = joinReplica(TEST_NODE_NAME + "_" + i, "localhost:" + nodePort);
+        }
+        return nodes;
+    }
+
+    private DatabaseConfig createDatabase(ReplicatedEnvironmentFacade environmentFacade, String databaseName) throws AMQStoreException
+    {
+        DatabaseConfig dbConfig = new DatabaseConfig();
+        dbConfig.setTransactional(true);
+        dbConfig.setAllowCreate(true);
+        environmentFacade.openDatabases(new String[] { databaseName }, dbConfig);
+        return dbConfig;
+    }
+}

Copied: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java (from r1549894, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java&r1=1549894&r2=1549898&rev=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java Tue Dec 10 17:19:47 2013
@@ -18,14 +18,17 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.store.berkeleydb.upgrade;
+package org.apache.qpid.server.store.berkeleydb;
 
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import org.apache.qpid.AMQStoreException;
+import java.util.Collections;
 
-public interface StoreUpgrade
+public class StandardEnvironmentFacadeTest extends EnvironmentFacadeTestCase
 {
-    void performUpgrade(Environment environment, UpgradeInteractionHandler handler, String virtualHostName)
-            throws DatabaseException, AMQStoreException;
+
+    @Override
+    EnvironmentFacade createEnvironmentFacade()
+    {
+        return new StandardEnvironmentFacade(getName(), _storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
+    }
+
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java Tue Dec 10 17:19:47 2013
@@ -26,6 +26,7 @@ import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+
 import junit.framework.TestCase;
 
 import com.sleepycat.je.Database;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java Tue Dec 10 17:19:47 2013
@@ -47,7 +47,6 @@ import org.apache.qpid.server.model.Exch
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
 import org.apache.qpid.server.store.berkeleydb.entry.Xid;
 import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java Tue Dec 10 17:19:47 2013
@@ -20,22 +20,15 @@
  */
 package org.apache.qpid.server.store.berkeleydb.upgrade;
 
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+
 import com.sleepycat.bind.tuple.IntegerBinding;
-import com.sleepycat.bind.tuple.LongBinding;
 import com.sleepycat.je.Cursor;
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
 import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
 import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.Transaction;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
-import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
 
 public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase
 {
@@ -102,7 +95,7 @@ public class UpgraderFailOnNewerVersionT
         catch(AMQStoreException ex)
         {
             assertEquals("Incorrect exception thrown", "Database version 999 is higher than the most recent known version: "
-                                                        + AbstractBDBMessageStore.VERSION, ex.getMessage());
+                                                        + BDBMessageStore.VERSION, ex.getMessage());
         }
     }
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java Tue Dec 10 17:19:47 2013
@@ -24,7 +24,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
 import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
 
 import com.sleepycat.bind.tuple.IntegerBinding;
@@ -33,6 +33,7 @@ import com.sleepycat.je.Cursor;
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
 import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Environment;
 import com.sleepycat.je.OperationStatus;
 import com.sleepycat.je.Transaction;
 
@@ -53,7 +54,7 @@ public class UpgraderTest extends Abstra
         _upgrader = new Upgrader(_environment, getVirtualHostName());
     }
 
-    private int getStoreVersion()
+    private int getStoreVersion(Environment environment)
     {
         DatabaseConfig dbConfig = new DatabaseConfig();
         dbConfig.setTransactional(true);
@@ -63,7 +64,7 @@ public class UpgraderTest extends Abstra
         Cursor cursor = null;
         try
         {
-            versionDb = _environment.openDatabase(null, Upgrader.VERSION_DB_NAME, dbConfig);
+            versionDb = environment.openDatabase(null, Upgrader.VERSION_DB_NAME, dbConfig);
             cursor = versionDb.openCursor(null, null);
             DatabaseEntry key = new DatabaseEntry();
             DatabaseEntry value = new DatabaseEntry();
@@ -92,9 +93,9 @@ public class UpgraderTest extends Abstra
 
     public void testUpgrade() throws Exception
     {
-        assertEquals("Unexpected store version", -1, getStoreVersion());
+        assertEquals("Unexpected store version", -1, getStoreVersion(_environment));
         _upgrader.upgradeIfNecessary();
-        assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion());
+        assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion(_environment));
         assertContent();
     }
 
@@ -104,17 +105,24 @@ public class UpgraderTest extends Abstra
         deleteDirectoryIfExists(nonExistentStoreLocation);
 
         nonExistentStoreLocation.mkdir();
-        _environment = createEnvironment(nonExistentStoreLocation);
-        _upgrader = new Upgrader(_environment, getVirtualHostName());
-        _upgrader.upgradeIfNecessary();
+        Environment emptyEnvironment = createEnvironment(nonExistentStoreLocation);
+        try
+        {
+            _upgrader = new Upgrader(emptyEnvironment, getVirtualHostName());
+            _upgrader.upgradeIfNecessary();
 
-        List<String> databaseNames = _environment.getDatabaseNames();
-        List<String> expectedDatabases = new ArrayList<String>();
-        expectedDatabases.add(Upgrader.VERSION_DB_NAME);
-        assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames);
-        assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion());
+            List<String> databaseNames = emptyEnvironment.getDatabaseNames();
+            List<String> expectedDatabases = new ArrayList<String>();
+            expectedDatabases.add(Upgrader.VERSION_DB_NAME);
+            assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames);
+            assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion(emptyEnvironment));
 
-        nonExistentStoreLocation.delete();
+        }
+        finally
+        {
+            emptyEnvironment.close();
+            nonExistentStoreLocation.delete();
+        }
     }
 
     private void assertContent()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org