You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2013/12/10 18:19:49 UTC
svn commit: r1549898 [3/4] - in /qpid/branches/java-broker-bdb-ha/qpid/java:
bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/
bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/
bdbstore/src/main/java/org/apac...
Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java?rev=1549898&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java Tue Dec 10 17:19:47 2013
@@ -0,0 +1,656 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.store.StoreFuture;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.EnvironmentFailureException;
+import com.sleepycat.je.ExceptionEvent;
+import com.sleepycat.je.ExceptionListener;
+import com.sleepycat.je.OperationFailureException;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.InsufficientReplicasException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationMutableConfig;
+import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+
+public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
+{
+ @SuppressWarnings("serial")
+ private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
+ {{
+ /**
+ * Parameter decreased as the 24h default may lead very large log files for most users.
+ */
+ put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h");
+ /**
+ * Parameter increased as the 5 s default may lead to spurious timeouts.
+ */
+ put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s");
+ /**
+ * Parameter increased as the 10 s default may lead to spurious timeouts.
+ */
+ put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s");
+ /**
+ * Parameter increased as the 10 h default may cause user confusion.
+ */
+ put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min");
+ /**
+ * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False
+ * is scheduled to become default after JE 5.0.48.
+ */
+ put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString());
+ /**
+ * Parameter decreased as a default 5min interval may lead to bigger data losses on Node
+ * with NO_SYN durability in case if such Node crushes.
+ */
+ put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
+
+ /**
+ * Timeout to transit into UNKNOWN state if the majority is not available.
+ * By default it is switched off.
+ */
+ put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s");
+ }});
+
+ private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class);
+ public static final String TYPE = "BDB-HA";
+
+ // TODO: get rid of these names
+ public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
+ public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
+
+ private volatile ReplicatedEnvironment _environment;
+ private CommitThreadWrapper _commitThreadWrapper;
+
+ private String _groupName;
+ private String _nodeName;
+ private String _nodeHostPort;
+ private String _helperHostPort;
+ private Durability _durability;
+ private boolean _designatedPrimary;
+ private boolean _coalescingSync;
+ private volatile StateChangeListener _stateChangeListener;
+ private String _environmentPath;
+ private Map<String, String> _environmentParameters;
+ private Map<String, String> _replicationEnvironmentParameters;
+ private String _name;
+ private ExecutorService _executor = Executors.newFixedThreadPool(1);
+ private AtomicReference<State> _state = new AtomicReference<State>(State.INITIAL);
+
+ private final ConcurrentMap<String, Database> _databases = new ConcurrentHashMap<String, Database>();
+
+ public ReplicatedEnvironmentFacade(String name, String environmentPath, String groupName, String nodeName, String nodeHostPort,
+ String helperHostPort, Durability durability, boolean designatedPrimary, boolean coalescingSync,
+ Map<String, String> environmentParameters, Map<String, String> replicationEnvironmentParameters)
+ {
+
+ _name = name;
+ _environmentPath = environmentPath;
+ _groupName = groupName;
+ _nodeName = nodeName;
+ _nodeHostPort = nodeHostPort;
+ _helperHostPort = helperHostPort;
+ _durability = durability;
+ _designatedPrimary = designatedPrimary;
+ _coalescingSync = coalescingSync;
+ _environmentParameters = environmentParameters;
+ _replicationEnvironmentParameters = replicationEnvironmentParameters;
+
+ _state.set(State.OPENING);
+ _environment = createEnvironment(environmentPath, groupName, nodeName, nodeHostPort, helperHostPort, durability,
+ designatedPrimary, environmentParameters, replicationEnvironmentParameters);
+ startCommitThread(name, _environment);
+ }
+
+ @Override
+ public StoreFuture commit(final Transaction tx, final boolean syncCommit) throws AMQStoreException
+ {
+ try
+ {
+ // Using commit() instead of commitNoSync() for the HA store
+ // to allow
+ // the HA durability configuration to influence resulting
+ // behaviour.
+ tx.commit();
+
+ if (_coalescingSync)
+ {
+ return _commitThreadWrapper.commit(tx, syncCommit);
+ }
+ else
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+ }
+ catch (DatabaseException de)
+ {
+ throw handleDatabaseException("Got DatabaseException on commit, closing environment", de);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ if (_state.compareAndSet(State.INITIAL, State.CLOSING) || _state.compareAndSet(State.OPENING, State.CLOSING) ||
+ _state.compareAndSet(State.OPEN, State.CLOSING) || _state.compareAndSet(State.RESTARTING, State.CLOSING) )
+ {
+ try
+ {
+ _executor.shutdownNow();
+ stopCommitThread();
+ closeDatabases();
+ closeEnvironment();
+ }
+ finally
+ {
+ _state.compareAndSet(State.CLOSING, State.CLOSED);
+ }
+ }
+ }
+
+ @Override
+ public AMQStoreException handleDatabaseException(String contextMessage, DatabaseException e)
+ {
+ boolean restart = (e instanceof InsufficientReplicasException || e instanceof InsufficientReplicasException);
+ if (restart)
+ {
+ if (_state.compareAndSet(State.OPEN, State.RESTARTING))
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Environment restarting due to exception " + e.getMessage(), e);
+ }
+ _executor.execute(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ restartEnvironment();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Exception on environment restart", e);
+ }
+ }
+ });
+
+ }
+ else
+ {
+ LOGGER.info("Cannot restart environment because of facade state: " + _state.get());
+ }
+ }
+ return new AMQStoreException(contextMessage, e);
+ }
+
+ @Override
+ public void openDatabases(String[] databaseNames, DatabaseConfig dbConfig) throws AMQStoreException
+ {
+ for (String databaseName : databaseNames)
+ {
+ Database database = _environment.openDatabase(null, databaseName, dbConfig);
+ _databases.put(databaseName, database);
+ }
+ }
+
+ @Override
+ public Database getOpenDatabase(String name)
+ {
+ if (!_environment.isValid())
+ {
+ throw new IllegalStateException("Environment is not valid");
+ }
+ Database database = _databases.get(name);
+ if (database == null)
+ {
+ throw new IllegalArgumentException("Database with name '" + name + "' has not been opened");
+ }
+ return database;
+ }
+
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ ReplicatedEnvironment.State state = stateChangeEvent.getState();
+ LOGGER.info("The node state is " + state);
+ if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER)
+ {
+ if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN))
+ {
+ LOGGER.info("The environment facade is in open state");
+ }
+ }
+ if (_state.get() != State.CLOSING && _state.get() != State.CLOSED)
+ {
+ StateChangeListener listener = _stateChangeListener;
+ if (listener != null)
+ {
+ listener.stateChange(stateChangeEvent);
+ }
+ }
+ }
+
+ public void setStateChangeListener(StateChangeListener listener)
+ {
+ _stateChangeListener = listener;
+ _environment.setStateChangeListener(this);
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public String getGroupName()
+ {
+ return _groupName;
+ }
+
+ public String getNodeName()
+ {
+ return _nodeName;
+ }
+
+ public String getNodeHostPort()
+ {
+ return _nodeHostPort;
+ }
+
+ public String getHelperHostPort()
+ {
+ return _helperHostPort;
+ }
+
+ public String getDurability()
+ {
+ return _durability.toString();
+ }
+
+ public boolean isCoalescingSync()
+ {
+ return _coalescingSync;
+ }
+
+ public String getNodeState()
+ {
+ ReplicatedEnvironment.State state = _environment.getState();
+ return state.toString();
+ }
+
+ public boolean isDesignatedPrimary()
+ {
+ return _environment.getRepMutableConfig().getDesignatedPrimary();
+ }
+
+ public List<Map<String, String>> getGroupMembers()
+ {
+ List<Map<String, String>> members = new ArrayList<Map<String, String>>();
+
+ for (ReplicationNode node : _environment.getGroup().getNodes())
+ {
+ Map<String, String> nodeMap = new HashMap<String, String>();
+ nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, node.getName());
+ nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort());
+ members.add(nodeMap);
+ }
+
+ return members;
+ }
+
+ public void removeNodeFromGroup(final String nodeName) throws AMQStoreException
+ {
+ try
+ {
+ createReplicationGroupAdmin().removeMember(nodeName);
+ }
+ catch (OperationFailureException ofe)
+ {
+ // TODO: I am not sure about the exception handing here
+ throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe);
+ }
+ catch (DatabaseException e)
+ {
+ // TODO: I am not sure about the exception handing here
+ throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e);
+ }
+ }
+
+ public void setDesignatedPrimary(final boolean isPrimary) throws AMQStoreException
+ {
+ try
+ {
+ final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+ final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary);
+ _environment.setRepMutableConfig(newConfig);
+
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group");
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ // TODO: I am not sure about the exception handing here
+ throw handleDatabaseException("Cannot set designated primary", e);
+ }
+ }
+
+ public void updateAddress(final String nodeName, final String newHostName, final int newPort) throws AMQStoreException
+ {
+ try
+ {
+ createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
+
+ }
+ catch (OperationFailureException ofe)
+ {
+ // TODO: I am not sure about the exception handing here
+ throw new AMQStoreException("Failed to update address for '" + nodeName + "' with new host " + newHostName
+ + " and new port " + newPort + ". " + ofe.getMessage(), ofe);
+ }
+ catch (DatabaseException e)
+ {
+ // TODO: I am not sure about the exception handing here
+ throw handleDatabaseException("Failed to update address for '" + nodeName + "' with new host " + newHostName
+ + " and new port " + newPort + ". " + e.getMessage(), e);
+ }
+ }
+
+ public int getNodePriority()
+ {
+ ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
+ return repConfig.getNodePriority();
+ }
+
+ public int getElectableGroupSizeOverride()
+ {
+ ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
+ return repConfig.getElectableGroupSizeOverride();
+ }
+
+ public ReplicatedEnvironment getEnvironment()
+ {
+ return _environment;
+ }
+
+ public State getFacadeState()
+ {
+ return _state.get();
+ }
+
+ private ReplicationGroupAdmin createReplicationGroupAdmin()
+ {
+ final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
+ helpers.addAll(_environment.getRepConfig().getHelperSockets());
+
+ final ReplicationConfig repConfig = _environment.getRepConfig();
+ helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort()));
+
+ return new ReplicationGroupAdmin(_groupName, helpers);
+ }
+
+ private void closeEnvironment()
+ {
+ // Clean the log before closing. This makes sure it doesn't contain
+ // redundant data. Closing without doing this means the cleaner may not
+ // get a chance to finish.
+ try
+ {
+ if (_environment.isValid())
+ {
+ _environment.cleanLog();
+ }
+ }
+ finally
+ {
+ _environment.close();
+ _environment = null;
+ }
+ }
+
+ private void startCommitThread(String name, Environment environment)
+ {
+ if (_coalescingSync)
+ {
+ _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, environment);
+ _commitThreadWrapper.startCommitThread();
+ }
+ }
+
+ private void stopCommitThread()
+ {
+ if (_coalescingSync)
+ {
+ try
+ {
+ _commitThreadWrapper.stopCommitThread();
+ }
+ catch (InterruptedException e)
+ {
+ LOGGER.warn("Stopping of commit thread is interrupted", e);
+ Thread.interrupted();
+ }
+ }
+ }
+
+ private void restartEnvironment() throws AMQStoreException
+ {
+ LOGGER.info("Restarting environment");
+
+ stopCommitThread();
+
+ Set<String> databaseNames = new HashSet<String>(_databases.keySet());
+ closeEnvironmentSafely();
+
+ _environment = createEnvironment(_environmentPath, _groupName, _nodeName, _nodeHostPort, _helperHostPort, _durability,
+ _designatedPrimary, _environmentParameters, _replicationEnvironmentParameters);
+
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ // TODO Alex and I think this should be removed.
+ openDatabases(databaseNames.toArray(new String[databaseNames.size()]), dbConfig);
+
+ startCommitThread(_name, _environment);
+
+ _environment.setStateChangeListener(this);
+
+ LOGGER.info("Environment is restarted");
+
+ }
+
+ private void closeEnvironmentSafely()
+ {
+ Environment environment = _environment;
+ if (environment != null)
+ {
+ try
+ {
+ if (environment.isValid())
+ {
+ try
+ {
+ closeDatabases();
+ }
+ catch(Exception e)
+ {
+ LOGGER.warn("Ignoring an exception whilst closing databases", e);
+ }
+ }
+ environment.close();
+ }
+ catch (EnvironmentFailureException efe)
+ {
+ LOGGER.warn("Ignoring an exception whilst closing environment", efe);
+ }
+ }
+ }
+
+ private void closeDatabases()
+ {
+ RuntimeException firstThrownException = null;
+ for (Database database : _databases.values())
+ {
+ try
+ {
+ database.close();
+ }
+ catch(RuntimeException e)
+ {
+ if (firstThrownException == null)
+ {
+ firstThrownException = e;
+ }
+ }
+ }
+ _databases.clear();
+ if (firstThrownException != null)
+ {
+ throw firstThrownException;
+ }
+ }
+
+ private ReplicatedEnvironment createEnvironment(String environmentPath, String groupName, String nodeName, String nodeHostPort,
+ String helperHostPort, Durability durability, boolean designatedPrimary, Map<String, String> environmentParameters,
+ Map<String, String> replicationEnvironmentParameters)
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Creating environment");
+ LOGGER.info("Environment path " + environmentPath);
+ LOGGER.info("Group name " + groupName);
+ LOGGER.info("Node name " + nodeName);
+ LOGGER.info("Node host port " + nodeHostPort);
+ LOGGER.info("Helper host port " + helperHostPort);
+ LOGGER.info("Durability " + durability);
+ LOGGER.info("Coalescing sync " + _coalescingSync);
+ LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary);
+ }
+
+ Map<String, String> replicationEnvironmentSettings = new HashMap<String, String>(REPCONFIG_DEFAULTS);
+ if (replicationEnvironmentParameters != null && !replicationEnvironmentParameters.isEmpty())
+ {
+ replicationEnvironmentSettings.putAll(replicationEnvironmentParameters);
+ }
+ Map<String, String> environmentSettings = new HashMap<String, String>(EnvironmentFacade.ENVCONFIG_DEFAULTS);
+ if (environmentParameters != null && !environmentParameters.isEmpty())
+ {
+ environmentSettings.putAll(environmentParameters);
+ }
+
+ final ReplicationConfig replicationConfig = new ReplicationConfig(groupName, nodeName, nodeHostPort);
+ replicationConfig.setHelperHosts(helperHostPort);
+ replicationConfig.setDesignatedPrimary(designatedPrimary);
+
+ for (Map.Entry<String, String> configItem : replicationEnvironmentSettings.entrySet())
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ }
+ replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
+ envConfig.setDurability(durability);
+
+ for (Map.Entry<String, String> configItem : environmentSettings.entrySet())
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ }
+ envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+
+ ReplicatedEnvironment environment = null;
+ try
+ {
+ environment = new ReplicatedEnvironment(new File(environmentPath), replicationConfig, envConfig);
+ }
+ catch (final InsufficientLogException ile)
+ {
+ LOGGER.info("InsufficientLogException thrown and so full network restore required", ile);
+ NetworkRestore restore = new NetworkRestore();
+ NetworkRestoreConfig config = new NetworkRestoreConfig();
+ config.setRetainLogFiles(false);
+ restore.execute(ile, config);
+ environment = new ReplicatedEnvironment(new File(environmentPath), replicationConfig, envConfig);
+ }
+ return environment;
+ }
+
+ private class LoggingAsyncExceptionListener implements ExceptionListener
+ {
+ @Override
+ public void exceptionThrown(ExceptionEvent event)
+ {
+ LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException());
+ }
+ }
+
+ public static enum State
+ {
+ INITIAL,
+ OPENING,
+ OPEN,
+ RESTARTING,
+ CLOSING,
+ CLOSED
+ }
+
+}
Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java?rev=1549898&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java Tue Dec 10 17:19:47 2013
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.VirtualHost;
+
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Durability.ReplicaAckPolicy;
+import com.sleepycat.je.Durability.SyncPolicy;
+
+public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
+{
+
+ private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC,
+ ReplicaAckPolicy.SIMPLE_MAJORITY);
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public EnvironmentFacade createEnvironmentFacade(String name, String storeLocation, VirtualHost virtualHost)
+ {
+ // Mandatory configuration
+ String groupName = getValidatedStringAttribute(virtualHost, "haGroupName");
+ String nodeName = getValidatedStringAttribute(virtualHost, "haNodeName");
+ String nodeHostPort = getValidatedStringAttribute(virtualHost, "haNodeAddress");
+ String helperHostPort = getValidatedStringAttribute(virtualHost, "haHelperAddress");
+
+ // Optional configuration
+ Durability durability = null;
+ String durabilitySetting = getStringAttribute(virtualHost, "haDurability", null);
+ if (durabilitySetting == null)
+ {
+ durability = DEFAULT_DURABILITY;
+ }
+ else
+ {
+ durability = Durability.parse(durabilitySetting);
+ }
+ Boolean designatedPrimary = getBooleanAttribute(virtualHost, "haDesignatedPrimary", Boolean.FALSE);
+ Boolean coalescingSync = getBooleanAttribute(virtualHost, "haCoalescingSync", Boolean.TRUE);
+
+ Map<String, String> replicationConfig = null;
+ Object repConfigAttr = virtualHost.getAttribute("haReplicationConfig");
+ if (repConfigAttr instanceof Map)
+ {
+ replicationConfig = new HashMap<String, String>((Map<String, String>) repConfigAttr);
+ }
+
+ if (coalescingSync && durability.getLocalSync() == SyncPolicy.SYNC)
+ {
+ throw new IllegalConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC
+ + "! Please set highAvailability.coalescingSync to false in store configuration.");
+ }
+
+ Map<String, String> envConfigMap = null;
+ Object bdbEnvConfigAttr = virtualHost.getAttribute("bdbEnvironmentConfig");
+ if (bdbEnvConfigAttr instanceof Map)
+ {
+ envConfigMap = new HashMap<String, String>((Map<String, String>) bdbEnvConfigAttr);
+ }
+
+ return new ReplicatedEnvironmentFacade(name, storeLocation, groupName, nodeName, nodeHostPort, helperHostPort, durability,
+ designatedPrimary, coalescingSync, envConfigMap, replicationConfig);
+ }
+
+ private String getValidatedStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName)
+ {
+ Object attrValue = virtualHost.getAttribute(attributeName);
+ if (attrValue != null)
+ {
+ return attrValue.toString();
+ }
+ else
+ {
+ throw new IllegalConfigurationException("BDB HA configuration key not found. Please specify configuration attribute: "
+ + attributeName);
+ }
+ }
+
+ private String getStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, String defaultVal)
+ {
+ Object attrValue = virtualHost.getAttribute(attributeName);
+ if (attrValue != null)
+ {
+ return attrValue.toString();
+ }
+ return defaultVal;
+ }
+
+ private boolean getBooleanAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, boolean defaultVal)
+ {
+ Object attrValue = virtualHost.getAttribute(attributeName);
+ if (attrValue != null)
+ {
+ if (attrValue instanceof Boolean)
+ {
+ return ((Boolean) attrValue).booleanValue();
+ }
+ else if (attrValue instanceof String)
+ {
+ return Boolean.parseBoolean((String) attrValue);
+ }
+
+ }
+ return defaultVal;
+ }
+
+}
Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1549898&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Tue Dec 10 17:19:47 2013
@@ -0,0 +1,257 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.store.StoreFuture;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.ExceptionEvent;
+import com.sleepycat.je.ExceptionListener;
+
+public class StandardEnvironmentFacade implements EnvironmentFacade
+{
+ private static final Logger LOGGER = Logger.getLogger(StandardEnvironmentFacade.class);
+ public static final String TYPE = "BDB";
+
+ private Environment _environment;
+ private CommitThreadWrapper _commitThreadWrapper;
+ private final Map<String, Database> _databases = new HashMap<String, Database>();
+
+ public StandardEnvironmentFacade(String name, String storePath, Map<String, String> attributes)
+ {
+
+ LOGGER.info("BDB message store using environment path " + storePath);
+
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+
+ for (Map.Entry<String, String> configItem : attributes.entrySet())
+ {
+ LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+
+ envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
+
+ try
+ {
+ _environment = new Environment(new File(storePath), envConfig);
+ }
+ catch (DatabaseException de)
+ {
+ if (de.getMessage().contains("Environment.setAllowCreate is false"))
+ {
+ // Allow the creation this time
+ envConfig.setAllowCreate(true);
+ _environment = new Environment(new File(storePath), envConfig);
+ }
+ else
+ {
+ throw de;
+ }
+ }
+ _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, _environment);
+ _commitThreadWrapper.startCommitThread();
+ }
+
+
+ @Override
+ public StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException
+ {
+ try
+ {
+ tx.commitNoSync();
+ }
+ catch (DatabaseException de)
+ {
+ LOGGER.error("Got DatabaseException on commit, closing environment", de);
+
+ closeEnvironmentSafely();
+
+ throw handleDatabaseException("Got DatabaseException on commit", de);
+ }
+
+ return _commitThreadWrapper.commit(tx, syncCommit);
+ }
+
+ @Override
+ public void close()
+ {
+ stopCommitThread();
+ closeDatabases();
+ closeEnvironment();
+ }
+
+ private void closeDatabases()
+ {
+ RuntimeException firstThrownException = null;
+ for (Database database : _databases.values())
+ {
+ try
+ {
+ database.close();
+ }
+ catch(RuntimeException e)
+ {
+ if (firstThrownException == null)
+ {
+ firstThrownException = e;
+ }
+ }
+ }
+ if (firstThrownException != null)
+ {
+ throw firstThrownException;
+ }
+ }
+
+ private void closeEnvironmentSafely()
+ {
+ stopCommitThread();
+ if (_environment != null)
+ {
+ if (_environment.isValid())
+ {
+ try
+ {
+ closeDatabases();
+ }
+ catch(Exception e)
+ {
+ LOGGER.error("Exception closing environment databases", e);
+ }
+ }
+ try
+ {
+ _environment.close();
+ }
+ catch (DatabaseException ex)
+ {
+ LOGGER.error("Exception closing store environment", ex);
+ }
+ catch (IllegalStateException ex)
+ {
+ LOGGER.error("Exception closing store environment", ex);
+ }
+ finally
+ {
+ _environment = null;
+ }
+ }
+ }
+
+ @Override
+ public Environment getEnvironment()
+ {
+ return _environment;
+ }
+
+ private void closeEnvironment()
+ {
+ if (_environment != null)
+ {
+ // Clean the log before closing. This makes sure it doesn't contain
+ // redundant data. Closing without doing this means the cleaner may
+ // not get a chance to finish.
+ try
+ {
+ _environment.cleanLog();
+ }
+ finally
+ {
+ _environment.close();
+ _environment = null;
+ }
+ }
+ }
+
+ private void stopCommitThread()
+ {
+ if (_commitThreadWrapper != null)
+ {
+ try
+ {
+ _commitThreadWrapper.stopCommitThread();
+ }
+ catch (InterruptedException e)
+ {
+ LOGGER.warn("Stopping of commit thread is interrupted", e);
+ Thread.interrupted();
+ }
+ finally
+ {
+ _commitThreadWrapper = null;
+ }
+ }
+ }
+
+ @Override
+ public AMQStoreException handleDatabaseException(String contextMessage, DatabaseException e)
+ {
+ if (_environment != null && !_environment.isValid())
+ {
+ closeEnvironmentSafely();
+ }
+ return new AMQStoreException(contextMessage, e);
+ }
+
+ private class LoggingAsyncExceptionListener implements ExceptionListener
+ {
+ @Override
+ public void exceptionThrown(ExceptionEvent event)
+ {
+ LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException());
+ }
+ }
+
+ @Override
+ public void openDatabases(String[] databaseNames, DatabaseConfig dbConfig)
+ {
+ for (String databaseName : databaseNames)
+ {
+ Database database = _environment.openDatabase(null, databaseName, dbConfig);
+ _databases .put(databaseName, database);
+ }
+ }
+
+ @Override
+ public Database getOpenDatabase(String name)
+ {
+ Database database = _databases.get(name);
+ if (database == null)
+ {
+ throw new IllegalArgumentException("Database with name '" + name + "' has not been opened");
+ }
+ return database;
+ }
+
+}
Copied: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java (from r1549894, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java&r1=1549894&r2=1549898&rev=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java Tue Dec 10 17:19:47 2013
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,26 +20,27 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.test.utils.QpidTestCase;
+import java.util.HashMap;
+import java.util.Map;
-import static org.mockito.Mockito.mock;
+import org.apache.qpid.server.model.VirtualHost;
-public class HAMessageStoreSmokeTest extends QpidTestCase
+public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory
{
- private final BDBHAMessageStore _store = new BDBHAMessageStore();
- public void testMissingHAConfigThrowsException() throws Exception
+ @SuppressWarnings("unchecked")
+ @Override
+ public EnvironmentFacade createEnvironmentFacade(String name, String storePath, VirtualHost virtualHost)
{
- try
- {
- _store.configure(mock(VirtualHost.class));
- fail("Expected an exception to be thrown");
- }
- catch (ConfigurationException ce)
+ Map<String, String> envConfigMap = new HashMap<String, String>();
+ envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
+
+ Object bdbEnvConfigAttr = virtualHost.getAttribute("bdbEnvironmentConfig");
+ if (bdbEnvConfigAttr instanceof Map)
{
- assertTrue(ce.getMessage().contains("BDB HA configuration key not found"));
+ envConfigMap.putAll((Map<String, String>) bdbEnvConfigAttr);
}
+ return new StandardEnvironmentFacade(name, storePath, envConfigMap);
}
+
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java Tue Dec 10 17:19:47 2013
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.ber
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
+
import org.apache.qpid.AMQStoreException;
public interface StoreUpgrade
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java Tue Dec 10 17:19:47 2013
@@ -45,7 +45,6 @@ import org.apache.qpid.server.model.Exch
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java Tue Dec 10 17:19:47 2013
@@ -25,7 +25,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
@@ -34,7 +34,6 @@ import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
-import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
public class Upgrader
@@ -64,7 +63,7 @@ public class Upgrader
if(versionDb.count() == 0L)
{
- int sourceVersion = isEmpty ? AbstractBDBMessageStore.VERSION: identifyOldStoreVersion();
+ int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion();
DatabaseEntry key = new DatabaseEntry();
IntegerBinding.intToEntry(sourceVersion, key);
DatabaseEntry value = new DatabaseEntry();
@@ -74,11 +73,11 @@ public class Upgrader
}
int version = getSourceVersion(versionDb);
- if(version > AbstractBDBMessageStore.VERSION)
+ if(version > BDBMessageStore.VERSION)
{
throw new AMQStoreException("Database version " + version
+ " is higher than the most recent known version: "
- + AbstractBDBMessageStore.VERSION);
+ + BDBMessageStore.VERSION);
}
performUpgradeFromVersion(version, versionDb);
}
@@ -127,7 +126,7 @@ public class Upgrader
void performUpgradeFromVersion(int sourceVersion, Database versionDb)
throws AMQStoreException
{
- while(sourceVersion != AbstractBDBMessageStore.VERSION)
+ while(sourceVersion != BDBMessageStore.VERSION)
{
upgrade(sourceVersion, ++sourceVersion);
DatabaseEntry key = new DatabaseEntry();
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java Tue Dec 10 17:19:47 2013
@@ -32,9 +32,7 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
-import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationConfig;
import static org.mockito.Matchers.eq;
@@ -98,18 +96,18 @@ public class BDBHAMessageStoreTest exten
VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, _configXml.subset("virtualhosts.virtualhost." + vhostName), BrokerTestHelper.createBrokerMock());
_virtualHost = BrokerTestHelper.createVirtualHost(configuration,null,_modelVhost);
- BDBHAMessageStore store = (BDBHAMessageStore) _virtualHost.getMessageStore();
+ BDBMessageStore store = (BDBMessageStore) _virtualHost.getMessageStore();
// test whether JVM system settings were applied
- Environment env = store.getEnvironment();
- assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS));
- assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
+ EnvironmentFacade env = store.getEnvironmentFacade();
+ assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getEnvironment().getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS));
+ assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getEnvironment().getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
- ReplicatedEnvironment repEnv = store.getReplicatedEnvironment();
+ ReplicatedEnvironmentFacade repEnv = (ReplicatedEnvironmentFacade)store.getEnvironmentFacade();
assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES,
- repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES));
+ repEnv.getEnvironment().getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES));
assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT,
- repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT));
+ repEnv.getEnvironment().getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT));
}
private void addVirtualHostConfiguration() throws Exception
Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeTestCase.java?rev=1549898&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeTestCase.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeTestCase.java Tue Dec 10 17:19:47 2013
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Environment;
+
+public abstract class EnvironmentFacadeTestCase extends QpidTestCase
+{
+ protected File _storePath;
+ protected EnvironmentFacade _environmentFacade;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _storePath = TestFileUtils.createTestDirectory("bdb", true);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ if (_environmentFacade != null)
+ {
+ _environmentFacade.close();
+ }
+ }
+ finally
+ {
+ if (_storePath != null)
+ {
+ FileUtils.delete(_storePath, true);
+ }
+ }
+ }
+
+ public void testEnvironmentFacade()
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ assertNotNull("Environment should not be null", ef);
+ Environment e = ef.getEnvironment();
+ assertTrue("Environment is not valid", e.isValid());
+ }
+
+ public void testClose()
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ ef.close();
+ Environment e = ef.getEnvironment();
+
+ assertNull("Environment should be null after facade close", e);
+ }
+
+ public void testOpenDatabases() throws AMQStoreException
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(new String[]{"test1", "test2"}, dbConfig);
+ Database test1 = ef.getOpenDatabase("test1");
+ Database test2 = ef.getOpenDatabase("test2");
+
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
+ }
+
+ public void testGetOpenDatabaseForNonExistingDatabase() throws AMQStoreException
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(new String[]{"test1"}, dbConfig);
+ Database test1 = ef.getOpenDatabase("test1");
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ try
+ {
+ ef.getOpenDatabase("test2");
+ fail("An exception should be thrown for the non existing database");
+ }
+ catch(IllegalArgumentException e)
+ {
+ assertEquals("Unexpected exception message", "Database with name 'test2' has not been opened", e.getMessage());
+ }
+ }
+
+ abstract EnvironmentFacade createEnvironmentFacade();
+
+ EnvironmentFacade getEnvironmentFacade()
+ {
+ if (_environmentFacade == null)
+ {
+ _environmentFacade = createEnvironmentFacade();
+ }
+ return _environmentFacade;
+ }
+
+}
Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java?rev=1549898&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java Tue Dec 10 17:19:47 2013
@@ -0,0 +1,449 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.rep.InsufficientReplicasException;
+import com.sleepycat.je.rep.ReplicatedEnvironment.State;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
+public class ReplicatedEnvironmentFacadeTest extends EnvironmentFacadeTestCase
+{
+ private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
+ private static final TimeUnit WAIT_STATE_CHANGE_TIME_UNIT = TimeUnit.SECONDS;
+ private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
+ private static final String TEST_GROUP_NAME = "testGroupName";
+ private static final String TEST_NODE_NAME = "testNodeName";
+ private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT;
+ private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT;
+ private static final Durability TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY");
+ private static final boolean TEST_DESIGNATED_PRIMARY = true;
+ private static final boolean TEST_COALESCING_SYNC = true;
+ private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
+
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ for (EnvironmentFacade ef : _nodes.values())
+ {
+ ef.close();
+ }
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ public void testGetName()
+ {
+ assertEquals("Unexpected name", getName(), getEnvironmentFacade().getName());
+ }
+
+ public void testGetGroupName()
+ {
+ assertEquals("Unexpected group name", TEST_GROUP_NAME, getEnvironmentFacade().getGroupName());
+ }
+
+ public void testGetNodeName()
+ {
+ assertEquals("Unexpected group name", TEST_NODE_NAME, getEnvironmentFacade().getNodeName());
+ }
+
+ public void testGetNodeHostPort()
+ {
+ assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, getEnvironmentFacade().getNodeHostPort());
+ }
+
+ public void testGetHelperHostPort()
+ {
+ assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, getEnvironmentFacade().getHelperHostPort());
+ }
+
+ public void testGetDurability()
+ {
+ assertEquals("Unexpected durability", TEST_DURABILITY.toString(), getEnvironmentFacade().getDurability());
+ }
+
+ public void testIsCoalescingSync()
+ {
+ assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, getEnvironmentFacade().isCoalescingSync());
+ }
+
+ public void testGetNodeState()
+ {
+ assertEquals("Unexpected state", State.MASTER.name(), getEnvironmentFacade().getNodeState());
+ }
+
+ public void testIsDesignatedPrimary()
+ {
+ assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, getEnvironmentFacade().isDesignatedPrimary());
+ }
+
+ public void testGetGroupMembers()
+ {
+ List<Map<String, String>> groupMembers = getEnvironmentFacade().getGroupMembers();
+ Map<String, String> expectedMember = new HashMap<String, String>();
+ expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
+ expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
+ Set<Map<String, String>> expectedGroupMembers = Collections.singleton(expectedMember);
+ assertEquals("Unexpected group members", expectedGroupMembers, new HashSet<Map<String, String>>(groupMembers));
+ }
+
+ public void testRemoveNodeFromGroup() throws Exception
+ {
+ ReplicatedEnvironmentFacade environmentFacade = getEnvironmentFacade();
+ String nodeName = TEST_NODE_NAME + "_2";
+ ReplicatedEnvironmentFacade ref2 = joinReplica(nodeName, "localhost:" + getNextAvailable(TEST_NODE_PORT + 1));
+ List<Map<String, String>> groupMembers = environmentFacade.getGroupMembers();
+ assertEquals("Unexpected group members count", 2, groupMembers.size());
+ ref2.close();
+
+ environmentFacade.removeNodeFromGroup(nodeName);
+ groupMembers = environmentFacade.getGroupMembers();
+ assertEquals("Unexpected group members count", 1, groupMembers.size());
+ }
+
+ public void testSetDesignatedPrimary() throws AMQStoreException
+ {
+ ReplicatedEnvironmentFacade environmentFacade = getEnvironmentFacade();
+ environmentFacade.setDesignatedPrimary(false);
+ assertFalse("Unexpected designated primary", environmentFacade.isDesignatedPrimary());
+ }
+
+ public void testGetNodePriority()
+ {
+ assertEquals("Unexpected node priority", 1, getEnvironmentFacade().getNodePriority());
+ }
+
+ public void testGetElectableGroupSizeOverride()
+ {
+ assertEquals("Unexpected Electable Group Size Override", 0, getEnvironmentFacade().getElectableGroupSizeOverride());
+ }
+
+ public void testEnvironmentRestartOnInsufficientReplicas() throws Exception
+ {
+ ReplicatedEnvironmentFacade[] nodes = startClusterSequentially(3);
+ ReplicatedEnvironmentFacade environmentFacade = nodes[0];
+
+ String databaseName = "test";
+ DatabaseConfig dbConfig = createDatabase(environmentFacade, databaseName);
+
+ // close replicas
+ nodes[1].close();
+ nodes[2].close();
+
+ final CountDownLatch nodeAwaitLatch = new CountDownLatch(1);
+ Environment e = environmentFacade.getEnvironment();
+ Database db = environmentFacade.getOpenDatabase(databaseName);
+ try
+ {
+ environmentFacade.openDatabases(new String[] { "test2" }, dbConfig);
+ fail("Opening of new database without quorum should fail");
+ }
+ catch(InsufficientReplicasException ex)
+ {
+ environmentFacade.handleDatabaseException(null, ex);
+ }
+
+ // restore quorum
+ nodes[1] = joinReplica(TEST_NODE_NAME + "_1", nodes[1].getNodeHostPort());
+ nodes[2] = joinReplica(TEST_NODE_NAME + "_2", nodes[2].getNodeHostPort());
+
+ environmentFacade.setStateChangeListener(new StateChangeListener()
+ {
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ if (stateChangeEvent.getState() == State.MASTER || stateChangeEvent.getState() == State.REPLICA)
+ {
+ nodeAwaitLatch.countDown();
+ }
+ }
+ });
+
+ assertTrue("The node could not rejoin the cluster",
+ nodeAwaitLatch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
+
+ Environment e2 = environmentFacade.getEnvironment();
+ assertNotSame("Environment has not been restarted", e2, e);
+
+ Database db1 = environmentFacade.getOpenDatabase(databaseName);
+ assertNotSame("Database should be the re-created", db1, db);
+ }
+
+ public void testEnvironmentIsRestartOnlyOnceOnInsufficientReplicas() throws Exception
+ {
+ ReplicatedEnvironmentFacade[] nodes = startClusterSequentially(3);
+ final ReplicatedEnvironmentFacade environmentFacade = nodes[0];
+
+ int numberOfThreads = 100;
+
+ // restart counter
+ final AtomicInteger numberOfTimesElected = new AtomicInteger();
+ environmentFacade.setStateChangeListener(new StateChangeListener()
+ {
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ if (stateChangeEvent.getState() == State.MASTER)
+ {
+ numberOfTimesElected.incrementAndGet();
+ }
+ }
+ });
+
+ String databaseName = "test";
+ createDatabase(environmentFacade, databaseName);
+ final CountDownLatch latch = new CountDownLatch(numberOfThreads);
+
+ final Database db = environmentFacade.getOpenDatabase(databaseName);
+
+ // close replicas
+ nodes[1].close();
+ nodes[2].close();
+
+ // perform transactions in separate threads in order to provoke InsufficientReplicasException
+ ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);
+ try
+ {
+ List<Callable<Void>> tasks = new ArrayList<Callable<Void>>();
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ final int index = i;
+ tasks.add(new Callable<Void>(){
+
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ Transaction tx = environmentFacade.getEnvironment().beginTransaction(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ IntegerBinding.intToEntry(index, key);
+ IntegerBinding.intToEntry(index, data);
+ db.put(tx, key, data);
+ tx.commit();
+ }
+ catch(DatabaseException e)
+ {
+ _environmentFacade.handleDatabaseException("Exception", e);
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ return null;
+ }});
+ }
+ service.invokeAll(tasks);
+ assertTrue("Not all tasks have been executed",
+ latch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
+ }
+ finally
+ {
+ service.shutdown();
+ }
+
+ // restore quorum
+ nodes[1] = joinReplica(TEST_NODE_NAME + "_1", nodes[1].getNodeHostPort());
+ nodes[2] = joinReplica(TEST_NODE_NAME + "_2", nodes[2].getNodeHostPort());
+
+ long start = System.currentTimeMillis();
+ while(environmentFacade.getFacadeState() != ReplicatedEnvironmentFacade.State.OPEN && System.currentTimeMillis() - start < 10000l)
+ {
+ Thread.sleep(1000l);
+ }
+ assertEquals("EnvironmentFacade should be in open state", ReplicatedEnvironmentFacade.State.OPEN, environmentFacade.getFacadeState());
+
+ // it should be elected twice: once on first start-up and second time after environment restart
+ assertEquals("Elected master unexpected number of times", 2, numberOfTimesElected.get());
+ }
+
+ public void testFacadeStateTransitions() throws InterruptedException
+ {
+ String nodeName = "node1";
+ final String nodePath = createNodeWorkingFolder(nodeName);
+ ReplicatedEnvironmentFacade ref = null;
+ try
+ {
+ ref = createReplicatedEnvironmentFacade(nodeName, nodePath, TEST_NODE_HOST_PORT, false);
+ assertEquals("Unexpected state " + ref.getFacadeState(), ReplicatedEnvironmentFacade.State.OPENING, ref.getFacadeState());
+
+ final CountDownLatch nodeAwaitLatch = new CountDownLatch(1);
+ ref.setStateChangeListener(new StateChangeListener()
+ {
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ if (stateChangeEvent.getState() == State.MASTER)
+ {
+ nodeAwaitLatch.countDown();
+ }
+ }
+ });
+ assertTrue("Node did not join the cluster", nodeAwaitLatch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
+ assertEquals("Unexpected state " + ref.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, ref.getFacadeState());
+ ref.close();
+ assertEquals("Unexpected state " + ref.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, ref.getFacadeState());
+ }
+ finally
+ {
+ if (ref != null)
+ {
+ ref.close();
+ }
+ }
+ }
+
+ @Override
+ EnvironmentFacade createEnvironmentFacade()
+ {
+ try
+ {
+ return startNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, State.MASTER);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.interrupted();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ ReplicatedEnvironmentFacade getEnvironmentFacade()
+ {
+ return (ReplicatedEnvironmentFacade) super.getEnvironmentFacade();
+ }
+
+ private ReplicatedEnvironmentFacade joinReplica(final String nodeName, final String hostPort) throws InterruptedException
+ {
+ return startNode(nodeName, hostPort, false, State.REPLICA);
+ }
+
+ private ReplicatedEnvironmentFacade startNode(String nodeName, String nodeHostPort, boolean designatedPrimary, State targetState)
+ throws InterruptedException
+ {
+ final String nodePath = createNodeWorkingFolder(nodeName);
+ final CountDownLatch _nodeAwaitLatch = new CountDownLatch(1);
+ ReplicatedEnvironmentFacade ref = join(nodeName, nodePath, nodeHostPort, designatedPrimary, _nodeAwaitLatch, targetState);
+ assertTrue("Node did not join the cluster", _nodeAwaitLatch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
+ return ref;
+ }
+
+ private String createNodeWorkingFolder(String nodeName)
+ {
+ File nodeLocation = new File(_storePath, nodeName);
+ nodeLocation.mkdirs();
+ final String nodePath = nodeLocation.getAbsolutePath();
+ return nodePath;
+ }
+
+ private ReplicatedEnvironmentFacade join(String nodeName, String nodePath, String nodeHostPort, boolean designatedPrimary,
+ final CountDownLatch nodeAwaitLatch, final State expectedState)
+ {
+ ReplicatedEnvironmentFacade ref = createReplicatedEnvironmentFacade(nodeName, nodePath, nodeHostPort, designatedPrimary);
+
+ if (expectedState == State.REPLICA)
+ {
+ _nodes.put(nodeName, ref);
+ }
+ ref.setStateChangeListener(new StateChangeListener()
+ {
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ if (stateChangeEvent.getState() == expectedState)
+ {
+ nodeAwaitLatch.countDown();
+ }
+ }
+ });
+ return ref;
+ }
+
+ private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodeName, String nodePath, String nodeHostPort,
+ boolean designatedPrimary)
+ {
+ Map<String, String> repConfig = new HashMap<String, String>();
+ repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
+ repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
+
+ ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(getName(), nodePath, TEST_GROUP_NAME, nodeName,
+ nodeHostPort, TEST_NODE_HELPER_HOST_PORT, TEST_DURABILITY, designatedPrimary, TEST_COALESCING_SYNC,
+ Collections.<String, String> emptyMap(), repConfig);
+ return ref;
+ }
+
+ private ReplicatedEnvironmentFacade[] startClusterSequentially(int nodeNumber) throws InterruptedException
+ {
+ // master
+ ReplicatedEnvironmentFacade environmentFacade = getEnvironmentFacade();
+ ReplicatedEnvironmentFacade[] nodes = new ReplicatedEnvironmentFacade[nodeNumber];
+ nodes[0] = environmentFacade;
+
+ int nodePort = TEST_NODE_PORT;
+ for (int i = 1; i < nodeNumber; i++)
+ {
+ nodePort = getNextAvailable(nodePort + 1);
+ nodes[i] = joinReplica(TEST_NODE_NAME + "_" + i, "localhost:" + nodePort);
+ }
+ return nodes;
+ }
+
+ private DatabaseConfig createDatabase(ReplicatedEnvironmentFacade environmentFacade, String databaseName) throws AMQStoreException
+ {
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ environmentFacade.openDatabases(new String[] { databaseName }, dbConfig);
+ return dbConfig;
+ }
+}
Copied: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java (from r1549894, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java&r1=1549894&r2=1549898&rev=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java Tue Dec 10 17:19:47 2013
@@ -18,14 +18,17 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.upgrade;
+package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import org.apache.qpid.AMQStoreException;
+import java.util.Collections;
-public interface StoreUpgrade
+public class StandardEnvironmentFacadeTest extends EnvironmentFacadeTestCase
{
- void performUpgrade(Environment environment, UpgradeInteractionHandler handler, String virtualHostName)
- throws DatabaseException, AMQStoreException;
+
+ @Override
+ EnvironmentFacade createEnvironmentFacade()
+ {
+ return new StandardEnvironmentFacade(getName(), _storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
+ }
+
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java Tue Dec 10 17:19:47 2013
@@ -26,6 +26,7 @@ import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+
import junit.framework.TestCase;
import com.sleepycat.je.Database;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java Tue Dec 10 17:19:47 2013
@@ -47,7 +47,6 @@ import org.apache.qpid.server.model.Exch
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.berkeleydb.entry.Xid;
import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java Tue Dec 10 17:19:47 2013
@@ -20,22 +20,15 @@
*/
package org.apache.qpid.server.store.berkeleydb.upgrade;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+
import com.sleepycat.bind.tuple.IntegerBinding;
-import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.Transaction;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
-import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase
{
@@ -102,7 +95,7 @@ public class UpgraderFailOnNewerVersionT
catch(AMQStoreException ex)
{
assertEquals("Incorrect exception thrown", "Database version 999 is higher than the most recent known version: "
- + AbstractBDBMessageStore.VERSION, ex.getMessage());
+ + BDBMessageStore.VERSION, ex.getMessage());
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java?rev=1549898&r1=1549897&r2=1549898&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java Tue Dec 10 17:19:47 2013
@@ -24,7 +24,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
-import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
import com.sleepycat.bind.tuple.IntegerBinding;
@@ -33,6 +33,7 @@ import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Environment;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
@@ -53,7 +54,7 @@ public class UpgraderTest extends Abstra
_upgrader = new Upgrader(_environment, getVirtualHostName());
}
- private int getStoreVersion()
+ private int getStoreVersion(Environment environment)
{
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
@@ -63,7 +64,7 @@ public class UpgraderTest extends Abstra
Cursor cursor = null;
try
{
- versionDb = _environment.openDatabase(null, Upgrader.VERSION_DB_NAME, dbConfig);
+ versionDb = environment.openDatabase(null, Upgrader.VERSION_DB_NAME, dbConfig);
cursor = versionDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
@@ -92,9 +93,9 @@ public class UpgraderTest extends Abstra
public void testUpgrade() throws Exception
{
- assertEquals("Unexpected store version", -1, getStoreVersion());
+ assertEquals("Unexpected store version", -1, getStoreVersion(_environment));
_upgrader.upgradeIfNecessary();
- assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion());
+ assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion(_environment));
assertContent();
}
@@ -104,17 +105,24 @@ public class UpgraderTest extends Abstra
deleteDirectoryIfExists(nonExistentStoreLocation);
nonExistentStoreLocation.mkdir();
- _environment = createEnvironment(nonExistentStoreLocation);
- _upgrader = new Upgrader(_environment, getVirtualHostName());
- _upgrader.upgradeIfNecessary();
+ Environment emptyEnvironment = createEnvironment(nonExistentStoreLocation);
+ try
+ {
+ _upgrader = new Upgrader(emptyEnvironment, getVirtualHostName());
+ _upgrader.upgradeIfNecessary();
- List<String> databaseNames = _environment.getDatabaseNames();
- List<String> expectedDatabases = new ArrayList<String>();
- expectedDatabases.add(Upgrader.VERSION_DB_NAME);
- assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames);
- assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion());
+ List<String> databaseNames = emptyEnvironment.getDatabaseNames();
+ List<String> expectedDatabases = new ArrayList<String>();
+ expectedDatabases.add(Upgrader.VERSION_DB_NAME);
+ assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames);
+ assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion(emptyEnvironment));
- nonExistentStoreLocation.delete();
+ }
+ finally
+ {
+ emptyEnvironment.close();
+ nonExistentStoreLocation.delete();
+ }
}
private void assertContent()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org