You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2013/12/20 11:53:02 UTC
svn commit: r1552591 [1/2] - in /qpid/branches/java-broker-bdb-ha/qpid/java:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/
bdbstore/src/main/resources/META-IN...
Author: kwall
Date: Fri Dec 20 10:53:01 2013
New Revision: 1552591
URL: http://svn.apache.org/r1552591
Log:
QPID-5411: More LocalReplicationNode work.
Added:
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/AbstractReplicationNode.java
- copied, changed from r1550805, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java
- copied, changed from r1550805, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ReplicationNodeFactory
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ReplicationNodeFactory.java
- copied, changed from r1550805, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java
Removed:
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
Modified:
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/pom.xml
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/VirtualHostRecoverer.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/PluggableFactoryLoader.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java Fri Dec 20 10:53:01 2013
@@ -23,12 +23,18 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.MessageStoreConstants;
+import org.apache.qpid.server.store.berkeleydb.replication.LocalReplicationNode;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -155,4 +161,53 @@ public class BDBHAVirtualHostFactory imp
}
return attributes;
}
+
+ @Override
+ public ReplicationNode createReplicationNode(Configuration configuration, org.apache.qpid.server.model.VirtualHost virtualHost)
+ {
+ Configuration storeConfiguration = configuration.subset("store");
+
+ String nodeName = storeConfiguration.getString("highAvailability.nodeName");
+ String groupName = storeConfiguration.getString("highAvailability.groupName");
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(ReplicationNode.NAME, nodeName);
+ attributes.put(ReplicationNode.GROUP_NAME, groupName);
+ attributes.put(ReplicationNode.HOST_PORT, storeConfiguration.getString("highAvailability.nodeHostPort"));
+ attributes.put(ReplicationNode.HELPER_HOST_PORT, storeConfiguration.getString("highAvailability.helperHostPort"));
+
+ String durability = storeConfiguration.getString("highAvailability.durability");
+ if (durability != null)
+ {
+ attributes.put(ReplicationNode.DURABILITY, durability);
+ }
+
+ String designatedPrimary = storeConfiguration.getString("highAvailability.designatedPrimary");
+ if (designatedPrimary != null)
+ {
+ attributes.put(ReplicationNode.DESIGNATED_PRIMARY, designatedPrimary);
+ }
+
+ String coalescingSync = storeConfiguration.getString("highAvailability.coalescingSync");
+ if (coalescingSync != null)
+ {
+ attributes.put(ReplicationNode.COALESCING_SYNC, coalescingSync);
+ }
+
+ Map<String, String> envAttributes = getEnvironmentMap(storeConfiguration, "envConfig");
+ if (envAttributes != null && envAttributes.size() > 0)
+ {
+ attributes.put(ReplicationNode.PARAMETERS, envAttributes);
+ }
+
+ Map<String, String> repAttributes = getEnvironmentMap(storeConfiguration, "repConfig");
+ if (repAttributes != null && repAttributes.size() > 0)
+ {
+ attributes.put(ReplicationNode.REPLICATION_PARAMETERS, repAttributes);
+ }
+
+ Broker broker = virtualHost.getParent(Broker.class);
+ UUID uuid = UUIDGenerator.generateReplicationNodeId(groupName, nodeName);
+ return new LocalReplicationNode(uuid, attributes, virtualHost, broker.getTaskExecutor());
+ }
+
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java Fri Dec 20 10:53:01 2013
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import static org.apache.qpid.server.model.ReplicationNode.*;
+
import java.io.File;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -136,34 +138,32 @@ public class ReplicatedEnvironmentFacade
private ReplicationGroupListener _replicationGroupListener;
private final RemoteReplicationNodeFactory _remoteReplicationNodeFactory;
-
+ @SuppressWarnings("unchecked")
public ReplicatedEnvironmentFacade(String name, String environmentPath,
- String groupName, String nodeName, String nodeHostPort,
- String helperHostPort, Durability durability,
- Boolean designatedPrimary, Boolean coalescingSync,
- Map<String, String> envConfigMap,
- Map<String, String> replicationConfig, RemoteReplicationNodeFactory remoteReplicationNodeFactory)
+ org.apache.qpid.server.model.ReplicationNode replicationNode,
+ RemoteReplicationNodeFactory remoteReplicationNodeFactory)
{
- _name = name;
+ _name = name;
_environmentPath = environmentPath;
- _groupName = groupName;
- _nodeName = nodeName;
- _nodeHostPort = nodeHostPort;
- _helperHostPort = helperHostPort;
- _durability = durability;
- _designatedPrimary = designatedPrimary;
- _coalescingSync = coalescingSync;
- _environmentParameters = envConfigMap;
- _replicationEnvironmentParameters = replicationConfig;
+ _groupName = (String)replicationNode.getAttribute(GROUP_NAME);
+ _nodeName = replicationNode.getName();
+ _nodeHostPort = (String)replicationNode.getAttribute(HOST_PORT);;
+ _helperHostPort = (String)replicationNode.getAttribute(HELPER_HOST_PORT);
+ _durability = Durability.parse((String)replicationNode.getAttribute(DURABILITY));
+ _designatedPrimary = (Boolean)replicationNode.getAttribute(DESIGNATED_PRIMARY);
+ _coalescingSync = (Boolean)replicationNode.getAttribute(COALESCING_SYNC);
+ _environmentParameters = (Map<String, String>)replicationNode.getAttribute(PARAMETERS);
+ _replicationEnvironmentParameters = (Map<String, String>)replicationNode.getAttribute(REPLICATION_PARAMETERS);
_remoteReplicationNodeFactory = remoteReplicationNodeFactory;
_state.set(State.OPENING);
- _environment = createEnvironment(environmentPath, groupName, nodeName, nodeHostPort, helperHostPort, durability,
- designatedPrimary, _environmentParameters, _replicationEnvironmentParameters);
+ _environment = createEnvironment();
startCommitThread(_name, _environment);
}
+
+
@Override
public StoreFuture commit(final Transaction tx, final boolean syncCommit) throws AMQStoreException
{
@@ -473,7 +473,7 @@ public class ReplicatedEnvironmentFacade
// TODO remote replication nodes should be cached
RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(group.getName(),
replicationNode.getName(),
- replicationNode.getHostName(), replicationNode.getPort());
+ replicationNode.getHostName() + ":" + replicationNode.getPort());
listener.onReplicationNodeRecovered(remoteNode);
}
}
@@ -543,8 +543,7 @@ public class ReplicatedEnvironmentFacade
Set<String> databaseNames = new HashSet<String>(_databases.keySet());
closeEnvironmentSafely();
- _environment = createEnvironment(_environmentPath, _groupName, _nodeName, _nodeHostPort, _helperHostPort, _durability,
- _designatedPrimary, _environmentParameters, _replicationEnvironmentParameters);
+ _environment = createEnvironment();
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
@@ -610,37 +609,35 @@ public class ReplicatedEnvironmentFacade
}
}
- private ReplicatedEnvironment createEnvironment(String environmentPath, String groupName, String nodeName, String nodeHostPort,
- String helperHostPort, Durability durability, boolean designatedPrimary, Map<String, String> environmentParameters,
- Map<String, String> replicationEnvironmentParameters)
+ private ReplicatedEnvironment createEnvironment()
{
if (LOGGER.isInfoEnabled())
{
LOGGER.info("Creating environment");
- LOGGER.info("Environment path " + environmentPath);
- LOGGER.info("Group name " + groupName);
- LOGGER.info("Node name " + nodeName);
- LOGGER.info("Node host port " + nodeHostPort);
- LOGGER.info("Helper host port " + helperHostPort);
- LOGGER.info("Durability " + durability);
+ LOGGER.info("Environment path " + _environmentPath);
+ LOGGER.info("Group name " + _groupName);
+ LOGGER.info("Node name " + _nodeName);
+ LOGGER.info("Node host port " + _nodeHostPort);
+ LOGGER.info("Helper host port " + _helperHostPort);
+ LOGGER.info("Durability " + _durability);
LOGGER.info("Coalescing sync " + _coalescingSync);
- LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary);
+ LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary);
}
Map<String, String> replicationEnvironmentSettings = new HashMap<String, String>(REPCONFIG_DEFAULTS);
- if (replicationEnvironmentParameters != null && !replicationEnvironmentParameters.isEmpty())
+ if (_replicationEnvironmentParameters != null && !_replicationEnvironmentParameters.isEmpty())
{
- replicationEnvironmentSettings.putAll(replicationEnvironmentParameters);
+ replicationEnvironmentSettings.putAll(_replicationEnvironmentParameters);
}
Map<String, String> environmentSettings = new HashMap<String, String>(EnvironmentFacade.ENVCONFIG_DEFAULTS);
- if (environmentParameters != null && !environmentParameters.isEmpty())
+ if (_environmentParameters != null && !_environmentParameters.isEmpty())
{
- environmentSettings.putAll(environmentParameters);
+ environmentSettings.putAll(_environmentParameters);
}
- final ReplicationConfig replicationConfig = new ReplicationConfig(groupName, nodeName, nodeHostPort);
- replicationConfig.setHelperHosts(helperHostPort);
- replicationConfig.setDesignatedPrimary(designatedPrimary);
+ final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort);
+ replicationConfig.setHelperHosts(_helperHostPort);
+ replicationConfig.setDesignatedPrimary(_designatedPrimary);
for (Map.Entry<String, String> configItem : replicationEnvironmentSettings.entrySet())
{
@@ -655,7 +652,7 @@ public class ReplicatedEnvironmentFacade
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
- envConfig.setDurability(durability);
+ envConfig.setDurability(_durability);
for (Map.Entry<String, String> configItem : environmentSettings.entrySet())
{
@@ -667,9 +664,10 @@ public class ReplicatedEnvironmentFacade
}
ReplicatedEnvironment environment = null;
+ File environmentPathFile = new File(_environmentPath);
try
{
- environment = new ReplicatedEnvironment(new File(environmentPath), replicationConfig, envConfig);
+ environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
}
catch (final InsufficientLogException ile)
{
@@ -678,7 +676,7 @@ public class ReplicatedEnvironmentFacade
NetworkRestoreConfig config = new NetworkRestoreConfig();
config.setRetainLogFiles(false);
restore.execute(ile, config);
- environment = new ReplicatedEnvironment(new File(environmentPath), replicationConfig, envConfig);
+ environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
}
return environment;
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java Fri Dec 20 10:53:01 2013
@@ -20,112 +20,38 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Collection;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory;
import com.sleepycat.je.Durability;
-import com.sleepycat.je.Durability.ReplicaAckPolicy;
import com.sleepycat.je.Durability.SyncPolicy;
public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
{
-
- private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC,
- ReplicaAckPolicy.SIMPLE_MAJORITY);
-
- @SuppressWarnings("unchecked")
@Override
public EnvironmentFacade createEnvironmentFacade(String name, String storeLocation, VirtualHost virtualHost)
{
- // Mandatory configuration
- String groupName = getValidatedStringAttribute(virtualHost, "haGroupName");
- String nodeName = getValidatedStringAttribute(virtualHost, "haNodeName");
- String nodeHostPort = getValidatedStringAttribute(virtualHost, "haNodeAddress");
- String helperHostPort = getValidatedStringAttribute(virtualHost, "haHelperAddress");
-
- // Optional configuration
- Durability durability = null;
- String durabilitySetting = getStringAttribute(virtualHost, "haDurability", null);
- if (durabilitySetting == null)
- {
- durability = DEFAULT_DURABILITY;
- }
- else
- {
- durability = Durability.parse(durabilitySetting);
- }
- Boolean designatedPrimary = getBooleanAttribute(virtualHost, "haDesignatedPrimary", Boolean.FALSE);
- Boolean coalescingSync = getBooleanAttribute(virtualHost, "haCoalescingSync", Boolean.TRUE);
-
- Map<String, String> replicationConfig = null;
- Object repConfigAttr = virtualHost.getAttribute("haReplicationConfig");
- if (repConfigAttr instanceof Map)
+ Collection<ReplicationNode> replicationNodes = virtualHost.getChildren(ReplicationNode.class);
+ if (replicationNodes == null || replicationNodes.size() != 1)
{
- replicationConfig = new HashMap<String, String>((Map<String, String>) repConfigAttr);
+ throw new IllegalStateException("Expected exactly one replication node but got " + (replicationNodes==null ? 0 :replicationNodes.size()) + " nodes");
}
+ ReplicationNode localNode = replicationNodes.iterator().next();
+ String durability = (String)localNode.getAttribute(ReplicationNode.DURABILITY);
+ Boolean coalescingSync = (Boolean)localNode.getAttribute(ReplicationNode.COALESCING_SYNC);
- if (coalescingSync && durability.getLocalSync() == SyncPolicy.SYNC)
+ if (coalescingSync && Durability.parse(durability).getLocalSync() == SyncPolicy.SYNC)
{
throw new IllegalConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC
+ "! Please set highAvailability.coalescingSync to false in store configuration.");
}
- Map<String, String> envConfigMap = null;
- Object bdbEnvConfigAttr = virtualHost.getAttribute("bdbEnvironmentConfig");
- if (bdbEnvConfigAttr instanceof Map)
- {
- envConfigMap = new HashMap<String, String>((Map<String, String>) bdbEnvConfigAttr);
- }
-
- return new ReplicatedEnvironmentFacade(name, storeLocation, groupName, nodeName, nodeHostPort, helperHostPort, durability,
- designatedPrimary, coalescingSync, envConfigMap, replicationConfig, new RemoteReplicationNodeFactoryImpl(virtualHost));
- }
-
- private String getValidatedStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName)
- {
- Object attrValue = virtualHost.getAttribute(attributeName);
- if (attrValue != null)
- {
- return attrValue.toString();
- }
- else
- {
- throw new IllegalConfigurationException("BDB HA configuration key not found. Please specify configuration attribute: "
- + attributeName);
- }
- }
-
- private String getStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, String defaultVal)
- {
- Object attrValue = virtualHost.getAttribute(attributeName);
- if (attrValue != null)
- {
- return attrValue.toString();
- }
- return defaultVal;
- }
-
- private boolean getBooleanAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, boolean defaultVal)
- {
- Object attrValue = virtualHost.getAttribute(attributeName);
- if (attrValue != null)
- {
- if (attrValue instanceof Boolean)
- {
- return ((Boolean) attrValue).booleanValue();
- }
- else if (attrValue instanceof String)
- {
- return Boolean.parseBoolean((String) attrValue);
- }
-
- }
- return defaultVal;
+ return new ReplicatedEnvironmentFacade(name, storeLocation, localNode, new RemoteReplicationNodeFactoryImpl(virtualHost));
}
private static class RemoteReplicationNodeFactoryImpl implements RemoteReplicationNodeFactory
@@ -138,9 +64,9 @@ public class ReplicatedEnvironmentFacade
}
@Override
- public RemoteReplicationNode create(String groupName, String nodeName, String host, int port)
+ public RemoteReplicationNode create(String groupName, String nodeName, String hostPort)
{
- return new RemoteReplicationNode(groupName, nodeName, host, port, _virtualHost);
+ return new RemoteReplicationNode(groupName, nodeName, hostPort, _virtualHost);
}
}
}
Copied: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/AbstractReplicationNode.java (from r1550805, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/AbstractReplicationNode.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/AbstractReplicationNode.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java&r1=1550805&r2=1552591&rev=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/AbstractReplicationNode.java Fri Dec 20 10:53:01 2013
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.store.berkeleydb.replication;
import java.security.AccessControlException;
@@ -15,27 +35,22 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
-/**
- * Represents a remote replication node in a BDB group.
- */
-public class RemoteReplicationNode implements ReplicationNode
+public abstract class AbstractReplicationNode implements ReplicationNode
{
private final UUID _id;
private final String _groupName;
private final String _nodeName;
- private final String _host;
- private final int _port;
+ private final String _hostPort;
private final VirtualHost _virtualHost;
- public RemoteReplicationNode(String groupName, String nodeName, String host, int port, VirtualHost virtualHost)
+ public AbstractReplicationNode(String groupName, String nodeName, String hostPort, VirtualHost virtualHost)
{
super();
_id = UUIDGenerator.generateReplicationNodeId(groupName, nodeName);
_groupName = groupName;
_nodeName = nodeName;
- _host = host;
- _port = port;
+ _hostPort = hostPort;
_virtualHost = virtualHost;
}
@@ -168,7 +183,7 @@ public class RemoteReplicationNode imple
}
else if (ReplicationNode.HOST_PORT.equals(name))
{
- return _host + ":" + _port;
+ return _hostPort;
}
else if (ReplicationNode.GROUP_NAME.equals(name))
{
@@ -217,4 +232,16 @@ public class RemoteReplicationNode imple
{
throw new UnsupportedOperationException();
}
+
+ protected VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.getClass().getSimpleName() + " [id=" + _id + ", name=" + _nodeName + "]";
+ }
+
}
Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java?rev=1552591&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java Fri Dec 20 10:53:01 2013
@@ -0,0 +1,268 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.lang.reflect.Type;
+import java.security.AccessControlException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.adapter.AbstractAdapter;
+import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.util.ParameterizedTypeImpl;
+
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Durability.ReplicaAckPolicy;
+import com.sleepycat.je.Durability.SyncPolicy;
+
+public class LocalReplicationNode extends AbstractAdapter implements ReplicationNode
+{
+
+ private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC,
+ ReplicaAckPolicy.SIMPLE_MAJORITY);
+
+ @SuppressWarnings("serial")
+ static final Map<String, Object> DEFAULTS = new HashMap<String, Object>()
+ {{
+ put(DURABILITY, DEFAULT_DURABILITY.toString());
+ put(COALESCING_SYNC, true);
+ put(DESIGNATED_PRIMARY, false);
+ //TODO: add defaults for parameters and replicatedParameters
+ }};
+
+ @SuppressWarnings("serial")
+ private static final Map<String, Type> ATTRIBUTE_TYPES = new HashMap<String, Type>()
+ {{
+ put(ID, UUID.class);
+ put(NAME, String.class);
+ put(GROUP_NAME, String.class);
+ put(HOST_PORT, String.class);
+ put(HELPER_HOST_PORT, String.class);
+ put(DURABILITY, String.class);
+ put(COALESCING_SYNC, Boolean.class);
+ put(DESIGNATED_PRIMARY, Boolean.class);
+ put(PRIORITY, Integer.class);
+ put(QUORUM_OVERRIDE, Integer.class);
+ put(ROLE, String.class);
+ put(JOIN_TIME, Long.class);
+ put(PARAMETERS, new ParameterizedTypeImpl(Map.class, String.class, String.class));
+ put(REPLICATION_PARAMETERS, new ParameterizedTypeImpl(Map.class, String.class, String.class));
+ put(STORE_PATH, String.class);
+ }};
+
+ private final VirtualHost _virtualHost;
+
+ //TODO: add state management
+ public LocalReplicationNode(UUID id, Map<String, Object> attributes, VirtualHost virtualHost, TaskExecutor taskExecutor)
+ {
+ super(id, DEFAULTS, validateAttributes(MapValueConverter.convert(attributes, ATTRIBUTE_TYPES)), taskExecutor);
+ _virtualHost = virtualHost;
+ addParent(VirtualHost.class, virtualHost);
+ validateAttributes(attributes);
+ }
+
+ private static Map<String, Object> validateAttributes(Map<String, Object> attributes)
+ {
+ if (attributes.get(NAME) == null)
+ {
+ throw new IllegalConfigurationException("Name is not specified");
+ }
+ if (attributes.get(GROUP_NAME) == null)
+ {
+ throw new IllegalConfigurationException("Group name is not specified");
+ }
+ if (attributes.get(HOST_PORT) == null)
+ {
+ throw new IllegalConfigurationException("Host and port attribute is not specified");
+ }
+ if (attributes.get(HELPER_HOST_PORT) == null)
+ {
+ throw new IllegalConfigurationException("Helper host and port attribute is not specified");
+ }
+ return attributes;
+ }
+
+ @Override
+ public String getName()
+ {
+ return (String)getAttribute(NAME);
+ }
+
+ @Override
+ public String setName(String currentName, String desiredName)
+ throws IllegalStateException, AccessControlException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public State getDesiredState()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public State getActualState()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addChangeListener(ConfigurationChangeListener listener)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean removeChangeListener(ConfigurationChangeListener listener)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
+
+ @Override
+ public void setDurable(boolean durable) throws IllegalStateException,
+ AccessControlException, IllegalArgumentException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LifetimePolicy getLifetimePolicy()
+ {
+ return LifetimePolicy.PERMANENT;
+ }
+
+ @Override
+ public LifetimePolicy setLifetimePolicy(LifetimePolicy expected,
+ LifetimePolicy desired) throws IllegalStateException,
+ AccessControlException, IllegalArgumentException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getTimeToLive()
+ {
+ return 0;
+ }
+
+ @Override
+ public long setTimeToLive(long expected, long desired)
+ throws IllegalStateException, AccessControlException,
+ IllegalArgumentException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ return ReplicationNode.AVAILABLE_ATTRIBUTES;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object getAttribute(String attributeName)
+ {
+ if (ReplicationNode.ID.equals(attributeName))
+ {
+ return getId();
+ }
+ else if (ReplicationNode.LIFETIME_POLICY.equals(attributeName))
+ {
+ return getLifetimePolicy();
+ }
+ else if (ReplicationNode.DURABLE.equals(attributeName))
+ {
+ return isDurable();
+ }
+ return super.getAttribute(attributeName);
+ }
+
+ @Override
+ public Object setAttribute(String name, Object expected, Object desired)
+ throws IllegalStateException, AccessControlException,
+ IllegalArgumentException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Statistics getStatistics()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <C extends ConfiguredObject> C createChild(Class<C> childClass,
+ Map<String, Object> attributes, ConfiguredObject... otherParents)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setAttributes(Map<String, Object> attributes)
+ throws IllegalStateException, AccessControlException,
+ IllegalArgumentException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ protected VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ @Override
+ protected boolean setState(State currentState, State desiredState)
+ {
+ if (desiredState == State.ACTIVE || desiredState == State.STOPPED)
+ {
+ return true;
+ }
+ return false;
+ }
+
+}
Copied: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java (from r1550805, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java&r1=1550805&r2=1552591&rev=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java Fri Dec 20 10:53:01 2013
@@ -20,7 +20,32 @@
*/
package org.apache.qpid.server.store.berkeleydb.replication;
-public interface RemoteReplicationNodeFactory
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.ReplicationNodeFactory;
+import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade;
+
+public class LocalReplicationNodeFactory implements ReplicationNodeFactory
{
- RemoteReplicationNode create(String groupName, String nodeName, String host, int port);
+
+ @Override
+ public String getType()
+ {
+ return ReplicatedEnvironmentFacade.TYPE;
+ }
+
+ @Override
+ public ReplicationNode createInstance(UUID id,
+ Map<String, Object> attributes, VirtualHost virtualHost)
+ {
+ // TODO KW Temporary code
+ Broker broker = virtualHost.getParent(Broker.class);
+
+ return new LocalReplicationNode(id, attributes, virtualHost, broker.getTaskExecutor());
+ }
+
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java Fri Dec 20 10:53:01 2013
@@ -1,220 +1,16 @@
package org.apache.qpid.server.store.berkeleydb.replication;
-import java.security.AccessControlException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.qpid.server.model.ConfigurationChangeListener;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.IllegalStateTransitionException;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.ReplicationNode;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.Statistics;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
/**
* Represents a remote replication node in a BDB group.
*/
-public class RemoteReplicationNode implements ReplicationNode
+public class RemoteReplicationNode extends AbstractReplicationNode
{
- private final UUID _id;
- private final String _groupName;
- private final String _nodeName;
- private final String _host;
- private final int _port;
- private final VirtualHost _virtualHost;
-
- public RemoteReplicationNode(String groupName, String nodeName, String host, int port, VirtualHost virtualHost)
- {
- super();
- _id = UUIDGenerator.generateReplicationNodeId(groupName, nodeName);
- _groupName = groupName;
- _nodeName = nodeName;
- _host = host;
- _port = port;
- _virtualHost = virtualHost;
- }
-
- @Override
- public UUID getId()
- {
- return _id;
- }
-
- @Override
- public String getName()
- {
- return _nodeName;
- }
-
- @Override
- public String setName(String currentName, String desiredName)
- throws IllegalStateException, AccessControlException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public State getDesiredState()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public State setDesiredState(State currentState, State desiredState)
- throws IllegalStateTransitionException, AccessControlException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public State getActualState()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void addChangeListener(ConfigurationChangeListener listener)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean removeChangeListener(ConfigurationChangeListener listener)
- {
- throw new UnsupportedOperationException();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <T extends ConfiguredObject> T getParent(Class<T> clazz)
- {
- if (clazz == VirtualHost.class)
- {
- return (T) _virtualHost;
- }
- throw new IllegalArgumentException();
- }
-
- @Override
- public boolean isDurable()
- {
- return true;
- }
-
- @Override
- public void setDurable(boolean durable) throws IllegalStateException,
- AccessControlException, IllegalArgumentException
+ public RemoteReplicationNode(String groupName, String nodeName, String hostPort, VirtualHost virtualHost)
{
- throw new UnsupportedOperationException();
+ super(groupName, nodeName, hostPort, virtualHost);
}
- @Override
- public LifetimePolicy getLifetimePolicy()
- {
- return LifetimePolicy.PERMANENT;
- }
-
- @Override
- public LifetimePolicy setLifetimePolicy(LifetimePolicy expected,
- LifetimePolicy desired) throws IllegalStateException,
- AccessControlException, IllegalArgumentException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getTimeToLive()
- {
- return 0;
- }
-
- @Override
- public long setTimeToLive(long expected, long desired)
- throws IllegalStateException, AccessControlException,
- IllegalArgumentException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Collection<String> getAttributeNames()
- {
- return ReplicationNode.AVAILABLE_ATTRIBUTES;
- }
-
- @Override
- public Object getAttribute(String name)
- {
- if (ReplicationNode.ID.equals(name))
- {
- return getId();
- }
- else if (ReplicationNode.NAME.equals(name))
- {
- return getName();
- }
- else if (ReplicationNode.LIFETIME_POLICY.equals(name))
- {
- return getLifetimePolicy();
- }
- else if (ReplicationNode.DURABLE.equals(name))
- {
- return isDurable();
- }
- else if (ReplicationNode.HOST_PORT.equals(name))
- {
- return _host + ":" + _port;
- }
- else if (ReplicationNode.GROUP_NAME.equals(name))
- {
- return _groupName;
- }
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<String, Object> getActualAttributes()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object setAttribute(String name, Object expected, Object desired)
- throws IllegalStateException, AccessControlException,
- IllegalArgumentException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Statistics getStatistics()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <C extends ConfiguredObject> C createChild(Class<C> childClass,
- Map<String, Object> attributes, ConfiguredObject... otherParents)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setAttributes(Map<String, Object> attributes)
- throws IllegalStateException, AccessControlException,
- IllegalArgumentException
- {
- throw new UnsupportedOperationException();
- }
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java Fri Dec 20 10:53:01 2013
@@ -22,5 +22,5 @@ package org.apache.qpid.server.store.ber
public interface RemoteReplicationNodeFactory
{
- RemoteReplicationNode create(String groupName, String nodeName, String host, int port);
+ RemoteReplicationNode create(String groupName, String nodeName, String hostPort);
}
Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ReplicationNodeFactory
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ReplicationNodeFactory?rev=1552591&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ReplicationNodeFactory (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ReplicationNodeFactory Fri Dec 20 10:53:01 2013
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.store.berkeleydb.replication.LocalReplicationNodeFactory
\ No newline at end of file
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java Fri Dec 20 10:53:01 2013
@@ -20,9 +20,18 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import static org.mockito.Mockito.any;
+import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC;
+import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY;
+import static org.apache.qpid.server.model.ReplicationNode.DURABILITY;
+import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME;
+import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.NAME;
+import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.io.File;
import java.util.ArrayList;
@@ -40,6 +49,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.replication.ReplicationGroupListener;
import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory;
@@ -68,12 +78,18 @@ public class ReplicatedEnvironmentFacade
private static final String TEST_NODE_NAME = "testNodeName";
private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT;
private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT;
- private static final Durability TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY");
+ private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString();
private static final boolean TEST_DESIGNATED_PRIMARY = true;
private static final boolean TEST_COALESCING_SYNC = true;
private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
- private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = mock(RemoteReplicationNodeFactory.class);;
+ private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = mock(RemoteReplicationNodeFactory.class);
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ @Override
public void tearDown() throws Exception
{
try
@@ -159,7 +175,7 @@ public class ReplicatedEnvironmentFacade
ReplicationGroupListener listener = mock(ReplicationGroupListener.class);
replicatedEnvironmentFacade.setReplicationGroupListener(listener);
verify(listener).onReplicationNodeRecovered(any(RemoteReplicationNode.class));
- verify(_remoteReplicationNodeFactory).create(TEST_GROUP_NAME, nodeName2, host, port);
+ verify(_remoteReplicationNodeFactory).create(TEST_GROUP_NAME, nodeName2, node2NodeHostPort);
}
public void testRemoveNodeFromGroup() throws Exception
@@ -340,7 +356,7 @@ public class ReplicatedEnvironmentFacade
ReplicatedEnvironmentFacade ref = null;
try
{
- ref = createReplicatedEnvironmentFacade(nodeName, nodePath, TEST_NODE_HOST_PORT, false);
+ ref = createReplicatedEnvironmentFacade(nodePath, nodeName, TEST_NODE_HOST_PORT, false);
assertEquals("Unexpected state " + ref.getFacadeState(), ReplicatedEnvironmentFacade.State.OPENING, ref.getFacadeState());
final CountDownLatch nodeAwaitLatch = new CountDownLatch(1);
@@ -415,7 +431,7 @@ public class ReplicatedEnvironmentFacade
private ReplicatedEnvironmentFacade join(String nodeName, String nodePath, String nodeHostPort, boolean designatedPrimary,
final CountDownLatch nodeAwaitLatch, final State expectedState)
{
- ReplicatedEnvironmentFacade ref = createReplicatedEnvironmentFacade(nodeName, nodePath, nodeHostPort, designatedPrimary);
+ ReplicatedEnvironmentFacade ref = createReplicatedEnvironmentFacade(nodePath, nodeName, nodeHostPort, designatedPrimary);
if (expectedState == State.REPLICA)
{
@@ -435,17 +451,11 @@ public class ReplicatedEnvironmentFacade
return ref;
}
- private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodeName, String nodePath, String nodeHostPort,
+ private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodePath, String nodeName, String nodeHostPort,
boolean designatedPrimary)
{
- Map<String, String> repConfig = new HashMap<String, String>();
- repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
- repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
-
- ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(getName(), nodePath, TEST_GROUP_NAME, nodeName,
- nodeHostPort, TEST_NODE_HELPER_HOST_PORT, TEST_DURABILITY, designatedPrimary, TEST_COALESCING_SYNC,
- Collections.<String, String> emptyMap(), repConfig, _remoteReplicationNodeFactory);
- return ref;
+ ReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary);
+ return new ReplicatedEnvironmentFacade(getName(), nodePath, node, _remoteReplicationNodeFactory);
}
private ReplicatedEnvironmentFacade[] startClusterSequentially(int nodeNumber) throws InterruptedException
@@ -472,4 +482,24 @@ public class ReplicatedEnvironmentFacade
environmentFacade.openDatabases(new String[] { databaseName }, dbConfig);
return dbConfig;
}
+
+ private ReplicationNode createReplicationNodeMock(String nodeName, String nodeHostPort, boolean designatedPrimary)
+ {
+ ReplicationNode node = mock(ReplicationNode.class);
+ when(node.getAttribute(NAME)).thenReturn(nodeName);
+ when(node.getName()).thenReturn(nodeName);
+ when(node.getAttribute(HOST_PORT)).thenReturn(nodeHostPort);
+ when(node.getAttribute(DESIGNATED_PRIMARY)).thenReturn(designatedPrimary);
+ when(node.getAttribute(GROUP_NAME)).thenReturn(TEST_GROUP_NAME);
+ when(node.getAttribute(HELPER_HOST_PORT)).thenReturn(TEST_NODE_HELPER_HOST_PORT);
+ when(node.getAttribute(DURABILITY)).thenReturn(TEST_DURABILITY);
+ when(node.getAttribute(COALESCING_SYNC)).thenReturn(TEST_COALESCING_SYNC);
+
+ Map<String, String> repConfig = new HashMap<String, String>();
+ repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
+ repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
+ when(node.getAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig);
+ return node;
+ }
+
}
Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java?rev=1552591&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java Fri Dec 20 10:53:01 2013
@@ -0,0 +1,205 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.ConfigurationEntry;
+import org.apache.qpid.server.configuration.RecovererProvider;
+import org.apache.qpid.server.configuration.startup.VirtualHostRecoverer;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+
+public class VirtualHostTest extends QpidTestCase
+{
+
+ private Broker _broker;
+ private StatisticsGatherer _statisticsGatherer;
+ private RecovererProvider _recovererProvider;
+ private File _configFile;
+ private File _bdbStorePath;
+ private VirtualHost _host;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+
+ _broker = BrokerTestHelper.createBrokerMock();
+ TaskExecutor taslExecutor = mock(TaskExecutor.class);
+ when(taslExecutor.isTaskExecutorThread()).thenReturn(true);
+ when(_broker.getTaskExecutor()).thenReturn(taslExecutor);
+
+ _recovererProvider = mock(RecovererProvider.class);
+ _statisticsGatherer = mock(StatisticsGatherer.class);
+
+ _bdbStorePath = new File(TMP_FOLDER, getTestName() + "." + System.currentTimeMillis());
+ _bdbStorePath.deleteOnExit();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ if (_host != null)
+ {
+ _host.setDesiredState(_host.getActualState(), State.STOPPED);
+ }
+ }
+ finally
+ {
+ if (_configFile != null)
+ {
+ _configFile.delete();
+ }
+ if (_bdbStorePath != null)
+ {
+ FileUtils.delete(_bdbStorePath, true);
+ }
+ super.tearDown();
+ CurrentActor.remove();
+ }
+ }
+
+ public void testCreateBdbVirtualHostFromConfigurationFile()
+ {
+ String hostName = getName();
+ long logFileMax = 2000000;
+ _host = createHostFromConfiguration(hostName, logFileMax);
+ _host.setDesiredState(State.INITIALISING, State.ACTIVE);
+ assertEquals("Unexpected host name", hostName, _host.getName());
+ assertEquals("Unexpected host type", StandardVirtualHostFactory.TYPE, _host.getType());
+ assertEquals("Unexpected store type", BDBMessageStore.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE));
+ assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
+
+ BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
+ EnvironmentConfig envConfig = messageStore.getEnvironmentFacade().getEnvironment().getConfig();
+ assertEquals("Unexpected JE log file max", String.valueOf(logFileMax), envConfig.getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
+
+ }
+
+ public void testCreateBdbHaVirtualHostFromConfigurationFile()
+ {
+ String hostName = getName();
+
+ String repStreamTimeout = "2 h";
+ String nodeName = "node";
+ String groupName = "group";
+ String nodeHostPort = "localhost:" + findFreePort();
+ String helperHostPort = nodeHostPort;
+ String durability = "NO_SYNC,SYNC,NONE";
+ _host = createHaHostFromConfiguration(hostName, groupName, nodeName, nodeHostPort, helperHostPort, durability, repStreamTimeout);
+ _host.setDesiredState(State.INITIALISING, State.ACTIVE);
+ assertEquals("Unexpected host name", hostName, _host.getName());
+ assertEquals("Unexpected host type", BDBHAVirtualHostFactory.TYPE, _host.getType());
+ assertEquals("Unexpected store type", ReplicatedEnvironmentFacade.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE));
+ assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
+
+ ReplicationNode localNode = _host.getChildren(ReplicationNode.class).iterator().next();
+
+ assertEquals(nodeName, localNode.getName());
+ assertEquals(groupName, localNode.getAttribute(ReplicationNode.GROUP_NAME));
+ assertEquals(nodeHostPort, localNode.getAttribute(ReplicationNode.HOST_PORT));
+ assertEquals(helperHostPort, localNode.getAttribute(ReplicationNode.HELPER_HOST_PORT));
+ assertEquals(durability, localNode.getAttribute(ReplicationNode.DURABILITY));
+
+ BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
+ ReplicatedEnvironment environment = (ReplicatedEnvironment) messageStore.getEnvironmentFacade().getEnvironment();
+ ReplicationConfig envConfig = environment.getRepConfig();
+ assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, envConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
+ }
+
+ private VirtualHost createHost(Map<String, Object> attributes)
+ {
+ ConfigurationEntry entry = new ConfigurationEntry(UUID.randomUUID(), VirtualHost.class.getSimpleName(), attributes,
+ Collections.<UUID> emptySet(), null);
+
+ return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker);
+ }
+
+ private VirtualHost createHostFromConfiguration(String hostName, long logFileMax)
+ {
+ String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">"
+ + "<store><class>" + BDBMessageStore.class.getName() + "</class>"
+ + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>"
+ + "<envConfig><name>" + EnvironmentConfig.LOG_FILE_MAX + "</name><value>" + logFileMax + "</value></envConfig>"
+ + "</store>"
+ + "</" + hostName + "></virtualhost></virtualhosts>";
+ Map<String, Object> attributes = writeConfigAndGenerateAttributes(content);
+ return createHost(attributes);
+ }
+
+ private VirtualHost createHaHostFromConfiguration(String hostName, String groupName, String nodeName, String nodeHostPort, String helperHostPort, String durability, String repStreamTimeout)
+ {
+ String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">"
+ + "<type>" + BDBHAVirtualHostFactory.TYPE + "</type>"
+ + "<store><class>" + BDBMessageStore.class.getName() + "</class>"
+ + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>"
+ + "<highAvailability>"
+ + "<groupName>" + groupName + "</groupName>"
+ + "<nodeName>" + nodeName + "</nodeName>"
+ + "<nodeHostPort>" + nodeHostPort + "</nodeHostPort>"
+ + "<helperHostPort>" + helperHostPort + "</helperHostPort>"
+ + "<durability>" + durability.replaceAll(",", "\\\\,") + "</durability>"
+ + "</highAvailability>"
+ + "<repConfig><name>" + ReplicationConfig.REP_STREAM_TIMEOUT + "</name><value>" + repStreamTimeout + "</value></repConfig>"
+ + "</store>"
+ + "</" + hostName + "></virtualhost></virtualhosts>";
+ Map<String, Object> attributes = writeConfigAndGenerateAttributes(content);
+ return createHost(attributes);
+ }
+
+ private Map<String, Object> writeConfigAndGenerateAttributes(String content)
+ {
+ _configFile = TestFileUtils.createTempFile(this, ".virtualhost.xml", content);
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(VirtualHost.NAME, getName());
+ attributes.put(VirtualHost.CONFIG_PATH, _configFile.getAbsolutePath());
+ return attributes;
+ }
+}
+
+
\ No newline at end of file
Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java?rev=1552591&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java Fri Dec 20 10:53:01 2013
@@ -0,0 +1,142 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class LocalReplicationNodeTest extends QpidTestCase
+{
+
+ private static final Object INVALID_VALUE = new Object();
+ private UUID _id;
+ private VirtualHost _virtualHost;
+ private TaskExecutor _taskExecutor;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _taskExecutor = mock(TaskExecutor.class);
+ _virtualHost = mock(VirtualHost.class);
+ }
+
+ public void testCreateLocalReplicationNodeWithoutDefaultParametersAndValidParameters()
+ {
+ Map<String, Object> attributes = createValidAttributes();
+
+ LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor);
+
+ assertNodeAttributes(attributes, node);
+
+ for (Map.Entry<String, Object> attributeEntry : LocalReplicationNode.DEFAULTS.entrySet())
+ {
+ assertEquals("Unexpected attribute value for attribute name " + attributeEntry.getKey(), attributeEntry.getValue(), node.getAttribute(attributeEntry.getKey()));
+ }
+ }
+
+ public void testCreateLocalReplicationNodeWithoutDefaultParametersAndMissedParameters()
+ {
+ Map<String, Object> attributes = createValidAttributes();
+
+ for (Map.Entry<String, Object> attributeEntry : attributes.entrySet())
+ {
+ String name = attributeEntry.getKey();
+ Map<String, Object> incompleteAttributes = new HashMap<String, Object>(attributes);
+ incompleteAttributes.remove(name);
+ try
+ {
+ new LocalReplicationNode(_id, incompleteAttributes, _virtualHost, _taskExecutor);
+ fail("Node creation should fails when attribute " + name + " is missed");
+ }
+ catch(IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+ }
+
+ public void testCreateLocalReplicationNodeWithoutDefaultParametersAndInvalidParameters()
+ {
+
+ Map<String, Object> attributes = createValidAttributes();
+
+ for (Map.Entry<String, Object> attributeEntry : attributes.entrySet())
+ {
+ String name = attributeEntry.getKey();
+ Object value = attributeEntry.getValue();
+ if (!(value instanceof String))
+ {
+ Map<String, Object> invalidAttributes = new HashMap<String, Object>(attributes);
+ invalidAttributes.put(name, INVALID_VALUE);
+ try
+ {
+ new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor);
+ fail("Node creation should fails when attribute " + name + " is invalid");
+ }
+ catch(IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+ }
+ }
+
+ public void testCreateLocalReplicationNodeWithOverriddenDefaultParameters()
+ {
+ Map<String, Object> attributes = createValidAttributes();
+ attributes.put(ReplicationNode.DURABILITY, "SYNC,SYNC,NONE");
+ attributes.put(ReplicationNode.COALESCING_SYNC, false);
+ attributes.put(ReplicationNode.DESIGNATED_PRIMARY, true);
+
+ LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor);
+
+ assertNodeAttributes(attributes, node);
+ }
+
+ private Map<String, Object> createValidAttributes()
+ {
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(ReplicationNode.NAME, "testNode");
+ attributes.put(ReplicationNode.GROUP_NAME, "testGroup");
+ attributes.put(ReplicationNode.HOST_PORT, "localhost:5000");
+ attributes.put(ReplicationNode.HELPER_HOST_PORT, "localhost:5001");
+ return attributes;
+ }
+
+ private void assertNodeAttributes(Map<String, Object> expectedAttributes,
+ LocalReplicationNode node)
+ {
+ for (Map.Entry<String, Object> attributeEntry : expectedAttributes.entrySet())
+ {
+ assertEquals("Unexpected attribute value for attribute name " + attributeEntry.getKey(), attributeEntry.getValue(), node.getAttribute(attributeEntry.getKey()));
+ }
+ }
+
+}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/pom.xml?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/pom.xml Fri Dec 20 10:53:01 2013
@@ -42,6 +42,13 @@
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>${mockito-version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-systests</artifactId>
<version>0.26-SNAPSHOT</version>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/VirtualHostRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/VirtualHostRecoverer.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/VirtualHostRecoverer.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/VirtualHostRecoverer.java Fri Dec 20 10:53:01 2013
@@ -21,14 +21,21 @@
package org.apache.qpid.server.configuration.startup;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.qpid.server.configuration.ConfigurationEntry;
import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer;
import org.apache.qpid.server.configuration.RecovererProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
+import org.apache.qpid.server.plugin.ReplicationNodeFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.util.MapValueConverter;
public class VirtualHostRecoverer implements ConfiguredObjectRecoverer<VirtualHost>
{
@@ -45,7 +52,54 @@ public class VirtualHostRecoverer implem
{
Broker broker = RecovererHelper.verifyOnlyBrokerIsParent(parents);
- return new VirtualHostAdapter(entry.getId(), entry.getAttributes(), broker, _brokerStatisticsGatherer, broker.getTaskExecutor());
+ Map<String, Object> attributes = entry.getAttributes();
+ VirtualHostAdapter virtualHostAdapter = new VirtualHostAdapter(entry.getId(), attributes, broker, _brokerStatisticsGatherer, broker.getTaskExecutor());
+
+ // TODO temporary code to bridge from VH attributes to LocalReplicationNode - will be move into a new ReplicationNodeRecoverer
+ if (attributes.containsKey(VirtualHost.TYPE))
+ {
+ String type = MapValueConverter.getStringAttribute(VirtualHost.TYPE, attributes);
+ ReplicationNodeFactory replicationNodeFactory = ReplicationNodeFactory.FACTORIES.get(type);
+
+ UUID uuid = null;
+ Map<String, Object> replicationNodeAttributes = new HashMap<String, Object>();
+ replicationNodeAttributes.put(ReplicationNode.NAME, attributes.get("haNodeName"));
+ replicationNodeAttributes.put(ReplicationNode.GROUP_NAME, attributes.get("haGroupName"));
+ replicationNodeAttributes.put(ReplicationNode.HOST_PORT, attributes.get("haNodeAddress"));
+ replicationNodeAttributes.put(ReplicationNode.HELPER_HOST_PORT, attributes.get("haHelperAddress"));
+
+ if (attributes.get("haDurability") != null)
+ {
+ replicationNodeAttributes.put(ReplicationNode.DURABILITY, attributes.get("haDurability"));
+ }
+
+ if (attributes.get("haDesignatedPrimary") != null)
+ {
+ replicationNodeAttributes.put(ReplicationNode.DESIGNATED_PRIMARY, attributes.get("haDesignatedPrimary"));
+ }
+
+ if (attributes.get("haCoalescingSync") != null)
+ {
+ replicationNodeAttributes.put(ReplicationNode.COALESCING_SYNC, attributes.get("haCoalescingSync"));
+ }
+
+ if (attributes.get("bdbEnvironmentConfig") != null)
+ {
+ replicationNodeAttributes.put(ReplicationNode.PARAMETERS, attributes.get("bdbEnvironmentConfig"));
+ }
+
+ if (attributes.get("haReplicationConfig") != null)
+ {
+ replicationNodeAttributes.put(ReplicationNode.REPLICATION_PARAMETERS, attributes.get("haReplicationConfig"));
+ }
+
+ if (replicationNodeFactory != null)
+ {
+ ReplicationNode node = replicationNodeFactory.createInstance(uuid , attributes, virtualHostAdapter);
+ virtualHostAdapter.onReplicationNodeRecovered(node);
+ }
+ }
+ return virtualHostAdapter;
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Fri Dec 20 10:53:01 2013
@@ -120,8 +120,8 @@ public final class VirtualHostAdapter ex
super(id, null, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES, false), taskExecutor, false);
_broker = broker;
_brokerStatisticsGatherer = brokerStatisticsGatherer;
- validateAttributes();
addParent(Broker.class, broker);
+ validateAttributes();
}
private void validateAttributes()
@@ -163,7 +163,7 @@ public final class VirtualHostAdapter ex
// pre-load the configuration in order to validate
try
{
- createVirtualHostConfiguration(name);
+ createVirtualHostConfiguration(name, null);
}
catch(ConfigurationException e)
{
@@ -577,6 +577,7 @@ public final class VirtualHostAdapter ex
return _statistics;
}
+ @SuppressWarnings("unchecked")
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
@@ -596,6 +597,10 @@ public final class VirtualHostAdapter ex
{
return (Collection<C>) getAliases();
}
+ else if (clazz == ReplicationNode.class)
+ {
+ return (Collection<C>)getReplicationNodes();
+ }
else
{
return Collections.emptySet();
@@ -629,6 +634,9 @@ public final class VirtualHostAdapter ex
{
throw new UnsupportedOperationException();
}
+
+ // TODO KW change add child to add the replication node?
+
throw new IllegalArgumentException("Cannot create a child of class " + childClass.getSimpleName());
}
@@ -1131,7 +1139,7 @@ public final class VirtualHostAdapter ex
String virtualHostName = getName();
try
{
- VirtualHostConfiguration configuration = createVirtualHostConfiguration(virtualHostName);
+ VirtualHostConfiguration configuration = createVirtualHostConfiguration(virtualHostName, this);
String type = configuration.getType();
final VirtualHostFactory factory = VirtualHostFactory.FACTORIES.get(type);
if(factory == null)
@@ -1171,12 +1179,13 @@ public final class VirtualHostAdapter ex
}
}
- private VirtualHostConfiguration createVirtualHostConfiguration(String virtualHostName) throws ConfigurationException
+ private VirtualHostConfiguration createVirtualHostConfiguration(String virtualHostName, ReplicationGroupListener listener) throws ConfigurationException
{
VirtualHostConfiguration configuration;
String configurationFile = (String)getAttribute(CONFIG_PATH);
if (configurationFile == null)
{
+ LOGGER.debug("Creating virtual host configuration from the attributes");
final MyConfiguration basicConfiguration = new MyConfiguration();
PropertiesConfiguration config = new PropertiesConfiguration();
final String type = (String) getAttribute(TYPE);
@@ -1198,6 +1207,10 @@ public final class VirtualHostAdapter ex
}
else
{
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Creating virtual host configuration from configuration file " + configurationFile);
+ }
if (!new File(configurationFile).exists())
{
throw new IllegalConfigurationException("Configuration file '" + configurationFile + "' does not exist");
@@ -1213,7 +1226,15 @@ public final class VirtualHostAdapter ex
changeAttribute(entry.getKey(), getAttribute(entry.getKey()), entry.getValue());
}
}
-
+ ReplicationNode replicationNode = factory.createReplicationNode(configuration.getConfig(), this);
+ if (listener != null && replicationNode != null)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Replication node " + replicationNode);
+ }
+ listener.onReplicationNodeRecovered(replicationNode);
+ }
}
return configuration;
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/PluggableFactoryLoader.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/PluggableFactoryLoader.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/PluggableFactoryLoader.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/PluggableFactoryLoader.java Fri Dec 20 10:53:01 2013
@@ -31,11 +31,17 @@ public class PluggableFactoryLoader<T ex
private final Map<String, T> _factoriesMap;
private final Set<String> _types;
+
public PluggableFactoryLoader(Class<T> factoryClass)
{
+ this(factoryClass, true);
+ }
+
+ public PluggableFactoryLoader(Class<T> factoryClass, boolean atLeastOnce)
+ {
Map<String, T> fm = new HashMap<String, T>();
QpidServiceLoader<T> qpidServiceLoader = new QpidServiceLoader<T>();
- Iterable<T> factories = qpidServiceLoader.atLeastOneInstanceOf(factoryClass);
+ Iterable<T> factories = atLeastOnce? qpidServiceLoader.atLeastOneInstanceOf(factoryClass) : qpidServiceLoader.instancesOf(factoryClass);
for (T factory : factories)
{
String descriptiveType = factory.getType();
Copied: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ReplicationNodeFactory.java (from r1550805, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ReplicationNodeFactory.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ReplicationNodeFactory.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java&r1=1550805&r2=1552591&rev=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ReplicationNodeFactory.java Fri Dec 20 10:53:01 2013
@@ -18,9 +18,19 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.replication;
+package org.apache.qpid.server.plugin;
-public interface RemoteReplicationNodeFactory
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.VirtualHost;
+
+public interface ReplicationNodeFactory extends Pluggable
{
- RemoteReplicationNode create(String groupName, String nodeName, String host, int port);
+
+ PluggableFactoryLoader<ReplicationNodeFactory> FACTORIES = new PluggableFactoryLoader<ReplicationNodeFactory>(ReplicationNodeFactory.class, false);
+
+ ReplicationNode createInstance(UUID id, Map<String, Object> attributes, VirtualHost virtualHost);
+
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java Fri Dec 20 10:53:01 2013
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsGatherer;
@@ -49,6 +50,8 @@ public interface VirtualHostFactory exte
Map<String,Object> convertVirtualHostConfiguration(Configuration configuration);
+ ReplicationNode createReplicationNode(Configuration configuration, org.apache.qpid.server.model.VirtualHost virtualHost);
+
static final class TYPES
{
private TYPES()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org