You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2013/12/20 11:53:02 UTC

svn commit: r1552591 [1/2] - in /qpid/branches/java-broker-bdb-ha/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ bdbstore/src/main/resources/META-IN...

Author: kwall
Date: Fri Dec 20 10:53:01 2013
New Revision: 1552591

URL: http://svn.apache.org/r1552591
Log:
QPID-5411: More LocalReplicationNode work.

Added:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/AbstractReplicationNode.java
      - copied, changed from r1550805, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java
      - copied, changed from r1550805, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ReplicationNodeFactory
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ReplicationNodeFactory.java
      - copied, changed from r1550805, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java
Removed:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
Modified:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/pom.xml
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/VirtualHostRecoverer.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/PluggableFactoryLoader.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java Fri Dec 20 10:53:01 2013
@@ -23,12 +23,18 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+
 import org.apache.commons.configuration.Configuration;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
 import org.apache.qpid.server.plugin.VirtualHostFactory;
 import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.store.MessageStoreConstants;
+import org.apache.qpid.server.store.berkeleydb.replication.LocalReplicationNode;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
@@ -155,4 +161,53 @@ public class BDBHAVirtualHostFactory imp
         }
         return attributes;
     }
+
+    @Override
+    public ReplicationNode createReplicationNode(Configuration configuration, org.apache.qpid.server.model.VirtualHost virtualHost)
+    {
+        Configuration storeConfiguration = configuration.subset("store");
+
+        String nodeName = storeConfiguration.getString("highAvailability.nodeName");
+        String groupName = storeConfiguration.getString("highAvailability.groupName");
+        Map<String, Object> attributes = new HashMap<String, Object>();
+        attributes.put(ReplicationNode.NAME, nodeName);
+        attributes.put(ReplicationNode.GROUP_NAME, groupName);
+        attributes.put(ReplicationNode.HOST_PORT, storeConfiguration.getString("highAvailability.nodeHostPort"));
+        attributes.put(ReplicationNode.HELPER_HOST_PORT, storeConfiguration.getString("highAvailability.helperHostPort"));
+
+        String durability = storeConfiguration.getString("highAvailability.durability");
+        if (durability != null)
+        {
+            attributes.put(ReplicationNode.DURABILITY, durability);
+        }
+
+        String designatedPrimary = storeConfiguration.getString("highAvailability.designatedPrimary");
+        if (designatedPrimary != null)
+        {
+            attributes.put(ReplicationNode.DESIGNATED_PRIMARY, designatedPrimary);
+        }
+
+        String coalescingSync = storeConfiguration.getString("highAvailability.coalescingSync");
+        if (coalescingSync != null)
+        {
+            attributes.put(ReplicationNode.COALESCING_SYNC, coalescingSync);
+        }
+
+        Map<String, String> envAttributes = getEnvironmentMap(storeConfiguration, "envConfig");
+        if (envAttributes != null && envAttributes.size() > 0)
+        {
+            attributes.put(ReplicationNode.PARAMETERS, envAttributes);
+        }
+
+        Map<String, String> repAttributes = getEnvironmentMap(storeConfiguration, "repConfig");
+        if (repAttributes != null && repAttributes.size() > 0)
+        {
+            attributes.put(ReplicationNode.REPLICATION_PARAMETERS, repAttributes);
+        }
+
+        Broker broker = virtualHost.getParent(Broker.class);
+        UUID uuid = UUIDGenerator.generateReplicationNodeId(groupName, nodeName);
+        return new LocalReplicationNode(uuid, attributes, virtualHost, broker.getTaskExecutor());
+    }
+
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java Fri Dec 20 10:53:01 2013
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
+import static org.apache.qpid.server.model.ReplicationNode.*;
+
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -136,34 +138,32 @@ public class ReplicatedEnvironmentFacade
     private ReplicationGroupListener _replicationGroupListener;
     private final RemoteReplicationNodeFactory _remoteReplicationNodeFactory;
 
-
+    @SuppressWarnings("unchecked")
     public ReplicatedEnvironmentFacade(String name, String environmentPath,
-            String groupName, String nodeName, String nodeHostPort,
-            String helperHostPort, Durability durability,
-            Boolean designatedPrimary, Boolean coalescingSync,
-            Map<String, String> envConfigMap,
-            Map<String, String> replicationConfig, RemoteReplicationNodeFactory remoteReplicationNodeFactory)
+            org.apache.qpid.server.model.ReplicationNode replicationNode,
+            RemoteReplicationNodeFactory remoteReplicationNodeFactory)
     {
-        _name = name;
+         _name = name;
         _environmentPath = environmentPath;
-        _groupName = groupName;
-        _nodeName = nodeName;
-        _nodeHostPort = nodeHostPort;
-        _helperHostPort = helperHostPort;
-        _durability = durability;
-        _designatedPrimary = designatedPrimary;
-        _coalescingSync = coalescingSync;
-        _environmentParameters = envConfigMap;
-        _replicationEnvironmentParameters = replicationConfig;
+        _groupName = (String)replicationNode.getAttribute(GROUP_NAME);
+        _nodeName = replicationNode.getName();
+        _nodeHostPort = (String)replicationNode.getAttribute(HOST_PORT);;
+        _helperHostPort = (String)replicationNode.getAttribute(HELPER_HOST_PORT);
+        _durability = Durability.parse((String)replicationNode.getAttribute(DURABILITY));
+        _designatedPrimary = (Boolean)replicationNode.getAttribute(DESIGNATED_PRIMARY);
+        _coalescingSync = (Boolean)replicationNode.getAttribute(COALESCING_SYNC);
+        _environmentParameters = (Map<String, String>)replicationNode.getAttribute(PARAMETERS);
+        _replicationEnvironmentParameters = (Map<String, String>)replicationNode.getAttribute(REPLICATION_PARAMETERS);
 
         _remoteReplicationNodeFactory = remoteReplicationNodeFactory;
         _state.set(State.OPENING);
-        _environment = createEnvironment(environmentPath, groupName, nodeName, nodeHostPort, helperHostPort, durability,
-                designatedPrimary, _environmentParameters, _replicationEnvironmentParameters);
+        _environment = createEnvironment();
         startCommitThread(_name, _environment);
     }
 
 
+
+
     @Override
     public StoreFuture commit(final Transaction tx, final boolean syncCommit) throws AMQStoreException
     {
@@ -473,7 +473,7 @@ public class ReplicatedEnvironmentFacade
                 // TODO remote replication nodes should be cached
                 RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(group.getName(),
                                                                              replicationNode.getName(),
-                                                                             replicationNode.getHostName(), replicationNode.getPort());
+                                                                             replicationNode.getHostName() + ":" + replicationNode.getPort());
                 listener.onReplicationNodeRecovered(remoteNode);
             }
         }
@@ -543,8 +543,7 @@ public class ReplicatedEnvironmentFacade
         Set<String> databaseNames = new HashSet<String>(_databases.keySet());
         closeEnvironmentSafely();
 
-        _environment = createEnvironment(_environmentPath, _groupName, _nodeName, _nodeHostPort, _helperHostPort, _durability,
-                _designatedPrimary, _environmentParameters, _replicationEnvironmentParameters);
+        _environment = createEnvironment();
 
         DatabaseConfig dbConfig = new DatabaseConfig();
         dbConfig.setTransactional(true);
@@ -610,37 +609,35 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    private ReplicatedEnvironment createEnvironment(String environmentPath, String groupName, String nodeName, String nodeHostPort,
-            String helperHostPort, Durability durability, boolean designatedPrimary, Map<String, String> environmentParameters,
-            Map<String, String> replicationEnvironmentParameters)
+    private ReplicatedEnvironment createEnvironment()
     {
         if (LOGGER.isInfoEnabled())
         {
             LOGGER.info("Creating environment");
-            LOGGER.info("Environment path " + environmentPath);
-            LOGGER.info("Group name " + groupName);
-            LOGGER.info("Node name " + nodeName);
-            LOGGER.info("Node host port " + nodeHostPort);
-            LOGGER.info("Helper host port " + helperHostPort);
-            LOGGER.info("Durability " + durability);
+            LOGGER.info("Environment path " + _environmentPath);
+            LOGGER.info("Group name " + _groupName);
+            LOGGER.info("Node name " + _nodeName);
+            LOGGER.info("Node host port " + _nodeHostPort);
+            LOGGER.info("Helper host port " + _helperHostPort);
+            LOGGER.info("Durability " + _durability);
             LOGGER.info("Coalescing sync " + _coalescingSync);
-            LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary);
+            LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary);
         }
 
         Map<String, String> replicationEnvironmentSettings = new HashMap<String, String>(REPCONFIG_DEFAULTS);
-        if (replicationEnvironmentParameters != null && !replicationEnvironmentParameters.isEmpty())
+        if (_replicationEnvironmentParameters != null && !_replicationEnvironmentParameters.isEmpty())
         {
-            replicationEnvironmentSettings.putAll(replicationEnvironmentParameters);
+            replicationEnvironmentSettings.putAll(_replicationEnvironmentParameters);
         }
         Map<String, String> environmentSettings = new HashMap<String, String>(EnvironmentFacade.ENVCONFIG_DEFAULTS);
-        if (environmentParameters != null && !environmentParameters.isEmpty())
+        if (_environmentParameters != null && !_environmentParameters.isEmpty())
         {
-            environmentSettings.putAll(environmentParameters);
+            environmentSettings.putAll(_environmentParameters);
         }
 
-        final ReplicationConfig replicationConfig = new ReplicationConfig(groupName, nodeName, nodeHostPort);
-        replicationConfig.setHelperHosts(helperHostPort);
-        replicationConfig.setDesignatedPrimary(designatedPrimary);
+        final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort);
+        replicationConfig.setHelperHosts(_helperHostPort);
+        replicationConfig.setDesignatedPrimary(_designatedPrimary);
 
         for (Map.Entry<String, String> configItem : replicationEnvironmentSettings.entrySet())
         {
@@ -655,7 +652,7 @@ public class ReplicatedEnvironmentFacade
         envConfig.setAllowCreate(true);
         envConfig.setTransactional(true);
         envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
-        envConfig.setDurability(durability);
+        envConfig.setDurability(_durability);
 
         for (Map.Entry<String, String> configItem : environmentSettings.entrySet())
         {
@@ -667,9 +664,10 @@ public class ReplicatedEnvironmentFacade
         }
 
         ReplicatedEnvironment environment = null;
+        File environmentPathFile = new File(_environmentPath);
         try
         {
-            environment = new ReplicatedEnvironment(new File(environmentPath), replicationConfig, envConfig);
+            environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
         }
         catch (final InsufficientLogException ile)
         {
@@ -678,7 +676,7 @@ public class ReplicatedEnvironmentFacade
             NetworkRestoreConfig config = new NetworkRestoreConfig();
             config.setRetainLogFiles(false);
             restore.execute(ile, config);
-            environment = new ReplicatedEnvironment(new File(environmentPath), replicationConfig, envConfig);
+            environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
         }
         return environment;
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java Fri Dec 20 10:53:01 2013
@@ -20,112 +20,38 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Collection;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.ReplicationNode;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
 import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory;
 
 import com.sleepycat.je.Durability;
-import com.sleepycat.je.Durability.ReplicaAckPolicy;
 import com.sleepycat.je.Durability.SyncPolicy;
 
 public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
 {
-
-    private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC,
-            ReplicaAckPolicy.SIMPLE_MAJORITY);
-
-    @SuppressWarnings("unchecked")
     @Override
     public EnvironmentFacade createEnvironmentFacade(String name, String storeLocation, VirtualHost virtualHost)
     {
-        // Mandatory configuration
-        String groupName = getValidatedStringAttribute(virtualHost, "haGroupName");
-        String nodeName = getValidatedStringAttribute(virtualHost, "haNodeName");
-        String nodeHostPort = getValidatedStringAttribute(virtualHost, "haNodeAddress");
-        String helperHostPort = getValidatedStringAttribute(virtualHost, "haHelperAddress");
-
-        // Optional configuration
-        Durability durability = null;
-        String durabilitySetting = getStringAttribute(virtualHost, "haDurability", null);
-        if (durabilitySetting == null)
-        {
-            durability = DEFAULT_DURABILITY;
-        }
-        else
-        {
-            durability = Durability.parse(durabilitySetting);
-        }
-        Boolean designatedPrimary = getBooleanAttribute(virtualHost, "haDesignatedPrimary", Boolean.FALSE);
-        Boolean coalescingSync = getBooleanAttribute(virtualHost, "haCoalescingSync", Boolean.TRUE);
-
-        Map<String, String> replicationConfig = null;
-        Object repConfigAttr = virtualHost.getAttribute("haReplicationConfig");
-        if (repConfigAttr instanceof Map)
+        Collection<ReplicationNode> replicationNodes = virtualHost.getChildren(ReplicationNode.class);
+        if (replicationNodes == null || replicationNodes.size() != 1)
         {
-            replicationConfig = new HashMap<String, String>((Map<String, String>) repConfigAttr);
+            throw new IllegalStateException("Expected exactly one replication node but got " + (replicationNodes==null ? 0 :replicationNodes.size()) + " nodes");
         }
+        ReplicationNode localNode = replicationNodes.iterator().next();
+        String durability = (String)localNode.getAttribute(ReplicationNode.DURABILITY);
+        Boolean coalescingSync = (Boolean)localNode.getAttribute(ReplicationNode.COALESCING_SYNC);
 
-        if (coalescingSync && durability.getLocalSync() == SyncPolicy.SYNC)
+        if (coalescingSync && Durability.parse(durability).getLocalSync() == SyncPolicy.SYNC)
         {
             throw new IllegalConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC
                     + "! Please set highAvailability.coalescingSync to false in store configuration.");
         }
 
-        Map<String, String> envConfigMap = null;
-        Object bdbEnvConfigAttr = virtualHost.getAttribute("bdbEnvironmentConfig");
-        if (bdbEnvConfigAttr instanceof Map)
-        {
-            envConfigMap = new HashMap<String, String>((Map<String, String>) bdbEnvConfigAttr);
-        }
-
-        return new ReplicatedEnvironmentFacade(name, storeLocation, groupName, nodeName, nodeHostPort, helperHostPort, durability,
-                designatedPrimary, coalescingSync, envConfigMap, replicationConfig, new RemoteReplicationNodeFactoryImpl(virtualHost));
-    }
-
-    private String getValidatedStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName)
-    {
-        Object attrValue = virtualHost.getAttribute(attributeName);
-        if (attrValue != null)
-        {
-            return attrValue.toString();
-        }
-        else
-        {
-            throw new IllegalConfigurationException("BDB HA configuration key not found. Please specify configuration attribute: "
-                    + attributeName);
-        }
-    }
-
-    private String getStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, String defaultVal)
-    {
-        Object attrValue = virtualHost.getAttribute(attributeName);
-        if (attrValue != null)
-        {
-            return attrValue.toString();
-        }
-        return defaultVal;
-    }
-
-    private boolean getBooleanAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, boolean defaultVal)
-    {
-        Object attrValue = virtualHost.getAttribute(attributeName);
-        if (attrValue != null)
-        {
-            if (attrValue instanceof Boolean)
-            {
-                return ((Boolean) attrValue).booleanValue();
-            }
-            else if (attrValue instanceof String)
-            {
-                return Boolean.parseBoolean((String) attrValue);
-            }
-
-        }
-        return defaultVal;
+        return new ReplicatedEnvironmentFacade(name, storeLocation, localNode, new RemoteReplicationNodeFactoryImpl(virtualHost));
     }
 
     private static class RemoteReplicationNodeFactoryImpl implements RemoteReplicationNodeFactory
@@ -138,9 +64,9 @@ public class ReplicatedEnvironmentFacade
         }
 
         @Override
-        public RemoteReplicationNode create(String groupName, String nodeName, String host, int port)
+        public RemoteReplicationNode create(String groupName, String nodeName, String hostPort)
         {
-            return new RemoteReplicationNode(groupName, nodeName, host, port, _virtualHost);
+            return new RemoteReplicationNode(groupName, nodeName, hostPort, _virtualHost);
         }
     }
 }

Copied: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/AbstractReplicationNode.java (from r1550805, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/AbstractReplicationNode.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/AbstractReplicationNode.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java&r1=1550805&r2=1552591&rev=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/AbstractReplicationNode.java Fri Dec 20 10:53:01 2013
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.store.berkeleydb.replication;
 
 import java.security.AccessControlException;
@@ -15,27 +35,22 @@ import org.apache.qpid.server.model.Stat
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.model.VirtualHost;
 
-/**
- * Represents a remote replication node in a BDB group.
- */
-public class RemoteReplicationNode implements ReplicationNode
+public abstract class AbstractReplicationNode implements ReplicationNode
 {
 
     private final UUID _id;
     private final String _groupName;
     private final String _nodeName;
-    private final String _host;
-    private final int _port;
+    private final String _hostPort;
     private final VirtualHost _virtualHost;
 
-    public RemoteReplicationNode(String groupName, String nodeName, String host, int port, VirtualHost virtualHost)
+    public AbstractReplicationNode(String groupName, String nodeName, String hostPort, VirtualHost virtualHost)
     {
         super();
         _id = UUIDGenerator.generateReplicationNodeId(groupName, nodeName);
         _groupName = groupName;
         _nodeName = nodeName;
-        _host = host;
-        _port = port;
+        _hostPort = hostPort;
         _virtualHost = virtualHost;
     }
 
@@ -168,7 +183,7 @@ public class RemoteReplicationNode imple
         }
         else if (ReplicationNode.HOST_PORT.equals(name))
         {
-            return _host + ":" + _port;
+            return _hostPort;
         }
         else if (ReplicationNode.GROUP_NAME.equals(name))
         {
@@ -217,4 +232,16 @@ public class RemoteReplicationNode imple
     {
         throw new UnsupportedOperationException();
     }
+
+    protected VirtualHost getVirtualHost()
+    {
+        return _virtualHost;
+    }
+
+    @Override
+    public String toString()
+    {
+        return this.getClass().getSimpleName() + " [id=" + _id + ", name=" + _nodeName + "]";
+    }
+
 }

Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java?rev=1552591&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java Fri Dec 20 10:53:01 2013
@@ -0,0 +1,268 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.lang.reflect.Type;
+import java.security.AccessControlException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.adapter.AbstractAdapter;
+import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.util.ParameterizedTypeImpl;
+
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Durability.ReplicaAckPolicy;
+import com.sleepycat.je.Durability.SyncPolicy;
+
+public class LocalReplicationNode extends AbstractAdapter implements ReplicationNode
+{
+
+    private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC,
+            ReplicaAckPolicy.SIMPLE_MAJORITY);
+
+    @SuppressWarnings("serial")
+    static final Map<String, Object> DEFAULTS = new HashMap<String, Object>()
+    {{
+        put(DURABILITY, DEFAULT_DURABILITY.toString());
+        put(COALESCING_SYNC, true);
+        put(DESIGNATED_PRIMARY, false);
+        //TODO: add defaults for parameters and replicatedParameters
+    }};
+
+    @SuppressWarnings("serial")
+    private static final Map<String, Type> ATTRIBUTE_TYPES = new HashMap<String, Type>()
+    {{
+        put(ID, UUID.class);
+        put(NAME, String.class);
+        put(GROUP_NAME, String.class);
+        put(HOST_PORT, String.class);
+        put(HELPER_HOST_PORT, String.class);
+        put(DURABILITY, String.class);
+        put(COALESCING_SYNC, Boolean.class);
+        put(DESIGNATED_PRIMARY, Boolean.class);
+        put(PRIORITY, Integer.class);
+        put(QUORUM_OVERRIDE, Integer.class);
+        put(ROLE, String.class);
+        put(JOIN_TIME, Long.class);
+        put(PARAMETERS, new ParameterizedTypeImpl(Map.class, String.class, String.class));
+        put(REPLICATION_PARAMETERS, new ParameterizedTypeImpl(Map.class, String.class, String.class));
+        put(STORE_PATH, String.class);
+    }};
+
+    private final VirtualHost _virtualHost;
+
+    //TODO: add state management
+    public LocalReplicationNode(UUID id, Map<String, Object> attributes, VirtualHost virtualHost, TaskExecutor taskExecutor)
+    {
+        super(id, DEFAULTS, validateAttributes(MapValueConverter.convert(attributes, ATTRIBUTE_TYPES)), taskExecutor);
+        _virtualHost = virtualHost;
+        addParent(VirtualHost.class, virtualHost);
+        validateAttributes(attributes);
+    }
+
+    private static Map<String, Object> validateAttributes(Map<String, Object> attributes)
+    {
+        if (attributes.get(NAME) == null)
+        {
+            throw new IllegalConfigurationException("Name is not specified");
+        }
+        if (attributes.get(GROUP_NAME) == null)
+        {
+            throw new IllegalConfigurationException("Group name is not specified");
+        }
+        if (attributes.get(HOST_PORT) == null)
+        {
+            throw new IllegalConfigurationException("Host and port attribute is not specified");
+        }
+        if (attributes.get(HELPER_HOST_PORT) == null)
+        {
+            throw new IllegalConfigurationException("Helper host and port attribute is not specified");
+        }
+        return attributes;
+    }
+
+    @Override
+    public String getName()
+    {
+        return (String)getAttribute(NAME);
+    }
+
+    @Override
+    public String setName(String currentName, String desiredName)
+            throws IllegalStateException, AccessControlException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public State getDesiredState()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public State getActualState()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void addChangeListener(ConfigurationChangeListener listener)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean removeChangeListener(ConfigurationChangeListener listener)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isDurable()
+    {
+        return true;
+    }
+
+    @Override
+    public void setDurable(boolean durable) throws IllegalStateException,
+            AccessControlException, IllegalArgumentException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public LifetimePolicy getLifetimePolicy()
+    {
+        return LifetimePolicy.PERMANENT;
+    }
+
+    @Override
+    public LifetimePolicy setLifetimePolicy(LifetimePolicy expected,
+            LifetimePolicy desired) throws IllegalStateException,
+            AccessControlException, IllegalArgumentException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getTimeToLive()
+    {
+        return 0;
+    }
+
+    @Override
+    public long setTimeToLive(long expected, long desired)
+            throws IllegalStateException, AccessControlException,
+            IllegalArgumentException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Collection<String> getAttributeNames()
+    {
+        return ReplicationNode.AVAILABLE_ATTRIBUTES;
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Object getAttribute(String attributeName)
+    {
+        if (ReplicationNode.ID.equals(attributeName))
+        {
+            return getId();
+        }
+        else if (ReplicationNode.LIFETIME_POLICY.equals(attributeName))
+        {
+            return getLifetimePolicy();
+        }
+        else if (ReplicationNode.DURABLE.equals(attributeName))
+        {
+            return isDurable();
+        }
+        return super.getAttribute(attributeName);
+    }
+
+    @Override
+    public Object setAttribute(String name, Object expected, Object desired)
+            throws IllegalStateException, AccessControlException,
+            IllegalArgumentException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Statistics getStatistics()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <C extends ConfiguredObject> C createChild(Class<C> childClass,
+            Map<String, Object> attributes, ConfiguredObject... otherParents)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setAttributes(Map<String, Object> attributes)
+            throws IllegalStateException, AccessControlException,
+            IllegalArgumentException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    protected VirtualHost getVirtualHost()
+    {
+        return _virtualHost;
+    }
+
+    @Override
+    protected boolean setState(State currentState, State desiredState)
+    {
+        if (desiredState == State.ACTIVE || desiredState == State.STOPPED)
+        {
+            return true;
+        }
+        return false;
+    }
+
+}

Copied: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java (from r1550805, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java&r1=1550805&r2=1552591&rev=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java Fri Dec 20 10:53:01 2013
@@ -20,7 +20,32 @@
  */
 package org.apache.qpid.server.store.berkeleydb.replication;
 
-public interface RemoteReplicationNodeFactory
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.ReplicationNodeFactory;
+import org.apache.qpid.server.store.berkeleydb.ReplicatedEnvironmentFacade;
+
+public class LocalReplicationNodeFactory implements ReplicationNodeFactory
 {
-    RemoteReplicationNode create(String groupName, String nodeName, String host, int port);
+
+    @Override
+    public String getType()
+    {
+        return ReplicatedEnvironmentFacade.TYPE;
+    }
+
+    @Override
+    public ReplicationNode createInstance(UUID id,
+            Map<String, Object> attributes, VirtualHost virtualHost)
+    {
+        // TODO KW Temporary code 
+        Broker broker = virtualHost.getParent(Broker.class);
+
+        return new LocalReplicationNode(id, attributes, virtualHost, broker.getTaskExecutor());
+    }
+
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java Fri Dec 20 10:53:01 2013
@@ -1,220 +1,16 @@
 package org.apache.qpid.server.store.berkeleydb.replication;
 
-import java.security.AccessControlException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.qpid.server.model.ConfigurationChangeListener;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.IllegalStateTransitionException;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.ReplicationNode;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.Statistics;
-import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.model.VirtualHost;
 
 /**
  * Represents a remote replication node in a BDB group.
  */
-public class RemoteReplicationNode implements ReplicationNode
+public class RemoteReplicationNode extends AbstractReplicationNode
 {
 
-    private final UUID _id;
-    private final String _groupName;
-    private final String _nodeName;
-    private final String _host;
-    private final int _port;
-    private final VirtualHost _virtualHost;
-
-    public RemoteReplicationNode(String groupName, String nodeName, String host, int port, VirtualHost virtualHost)
-    {
-        super();
-        _id = UUIDGenerator.generateReplicationNodeId(groupName, nodeName);
-        _groupName = groupName;
-        _nodeName = nodeName;
-        _host = host;
-        _port = port;
-        _virtualHost = virtualHost;
-    }
-
-    @Override
-    public UUID getId()
-    {
-        return _id;
-    }
-
-    @Override
-    public String getName()
-    {
-        return _nodeName;
-    }
-
-    @Override
-    public String setName(String currentName, String desiredName)
-            throws IllegalStateException, AccessControlException
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public State getDesiredState()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public State setDesiredState(State currentState, State desiredState)
-            throws IllegalStateTransitionException, AccessControlException
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public State getActualState()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void addChangeListener(ConfigurationChangeListener listener)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean removeChangeListener(ConfigurationChangeListener listener)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public <T extends ConfiguredObject> T getParent(Class<T> clazz)
-    {
-        if (clazz == VirtualHost.class)
-        {
-            return (T) _virtualHost;
-        }
-        throw new IllegalArgumentException();
-    }
-
-    @Override
-    public boolean isDurable()
-    {
-        return true;
-    }
-
-    @Override
-    public void setDurable(boolean durable) throws IllegalStateException,
-            AccessControlException, IllegalArgumentException
+    public RemoteReplicationNode(String groupName, String nodeName, String hostPort, VirtualHost virtualHost)
     {
-        throw new UnsupportedOperationException();
+        super(groupName, nodeName, hostPort, virtualHost);
     }
 
-    @Override
-    public LifetimePolicy getLifetimePolicy()
-    {
-        return LifetimePolicy.PERMANENT;
-    }
-
-    @Override
-    public LifetimePolicy setLifetimePolicy(LifetimePolicy expected,
-            LifetimePolicy desired) throws IllegalStateException,
-            AccessControlException, IllegalArgumentException
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public long getTimeToLive()
-    {
-        return 0;
-    }
-
-    @Override
-    public long setTimeToLive(long expected, long desired)
-            throws IllegalStateException, AccessControlException,
-            IllegalArgumentException
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Collection<String> getAttributeNames()
-    {
-        return ReplicationNode.AVAILABLE_ATTRIBUTES;
-    }
-
-    @Override
-    public Object getAttribute(String name)
-    {
-        if (ReplicationNode.ID.equals(name))
-        {
-            return getId();
-        }
-        else if (ReplicationNode.NAME.equals(name))
-        {
-            return getName();
-        }
-        else if (ReplicationNode.LIFETIME_POLICY.equals(name))
-        {
-            return getLifetimePolicy();
-        }
-        else if (ReplicationNode.DURABLE.equals(name))
-        {
-            return isDurable();
-        }
-        else if (ReplicationNode.HOST_PORT.equals(name))
-        {
-            return _host + ":" + _port;
-        }
-        else if (ReplicationNode.GROUP_NAME.equals(name))
-        {
-            return _groupName;
-        }
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Map<String, Object> getActualAttributes()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Object setAttribute(String name, Object expected, Object desired)
-            throws IllegalStateException, AccessControlException,
-            IllegalArgumentException
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Statistics getStatistics()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public <C extends ConfiguredObject> C createChild(Class<C> childClass,
-            Map<String, Object> attributes, ConfiguredObject... otherParents)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void setAttributes(Map<String, Object> attributes)
-            throws IllegalStateException, AccessControlException,
-            IllegalArgumentException
-    {
-        throw new UnsupportedOperationException();
-    }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java Fri Dec 20 10:53:01 2013
@@ -22,5 +22,5 @@ package org.apache.qpid.server.store.ber
 
 public interface RemoteReplicationNodeFactory
 {
-    RemoteReplicationNode create(String groupName, String nodeName, String host, int port);
+    RemoteReplicationNode create(String groupName, String nodeName, String hostPort);
 }

Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ReplicationNodeFactory
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ReplicationNodeFactory?rev=1552591&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ReplicationNodeFactory (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ReplicationNodeFactory Fri Dec 20 10:53:01 2013
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.store.berkeleydb.replication.LocalReplicationNodeFactory
\ No newline at end of file

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java Fri Dec 20 10:53:01 2013
@@ -20,9 +20,18 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import static org.mockito.Mockito.any;
+import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC;
+import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY;
+import static org.apache.qpid.server.model.ReplicationNode.DURABILITY;
+import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME;
+import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.NAME;
+import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -40,6 +49,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.model.ReplicationNode;
 import org.apache.qpid.server.replication.ReplicationGroupListener;
 import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
 import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory;
@@ -68,12 +78,18 @@ public class ReplicatedEnvironmentFacade
     private static final String TEST_NODE_NAME = "testNodeName";
     private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT;
     private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT;
-    private static final Durability TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY");
+    private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString();
     private static final boolean TEST_DESIGNATED_PRIMARY = true;
     private static final boolean TEST_COALESCING_SYNC = true;
     private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
-    private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = mock(RemoteReplicationNodeFactory.class);;
+    private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = mock(RemoteReplicationNodeFactory.class);
 
+    public void setUp() throws Exception
+    {
+        super.setUp();
+    }
+
+    @Override
     public void tearDown() throws Exception
     {
         try
@@ -159,7 +175,7 @@ public class ReplicatedEnvironmentFacade
         ReplicationGroupListener listener = mock(ReplicationGroupListener.class);
         replicatedEnvironmentFacade.setReplicationGroupListener(listener);
         verify(listener).onReplicationNodeRecovered(any(RemoteReplicationNode.class));
-        verify(_remoteReplicationNodeFactory).create(TEST_GROUP_NAME, nodeName2, host, port);
+        verify(_remoteReplicationNodeFactory).create(TEST_GROUP_NAME, nodeName2, node2NodeHostPort);
     }
 
     public void testRemoveNodeFromGroup() throws Exception
@@ -340,7 +356,7 @@ public class ReplicatedEnvironmentFacade
         ReplicatedEnvironmentFacade ref = null;
         try
         {
-            ref = createReplicatedEnvironmentFacade(nodeName, nodePath, TEST_NODE_HOST_PORT, false);
+            ref = createReplicatedEnvironmentFacade(nodePath, nodeName, TEST_NODE_HOST_PORT, false);
             assertEquals("Unexpected state " + ref.getFacadeState(), ReplicatedEnvironmentFacade.State.OPENING, ref.getFacadeState());
 
             final CountDownLatch nodeAwaitLatch = new CountDownLatch(1);
@@ -415,7 +431,7 @@ public class ReplicatedEnvironmentFacade
     private ReplicatedEnvironmentFacade join(String nodeName, String nodePath, String nodeHostPort, boolean designatedPrimary,
             final CountDownLatch nodeAwaitLatch, final State expectedState)
     {
-        ReplicatedEnvironmentFacade ref = createReplicatedEnvironmentFacade(nodeName, nodePath, nodeHostPort, designatedPrimary);
+        ReplicatedEnvironmentFacade ref = createReplicatedEnvironmentFacade(nodePath, nodeName, nodeHostPort, designatedPrimary);
 
         if (expectedState == State.REPLICA)
         {
@@ -435,17 +451,11 @@ public class ReplicatedEnvironmentFacade
         return ref;
     }
 
-    private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodeName, String nodePath, String nodeHostPort,
+    private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodePath, String nodeName, String nodeHostPort,
             boolean designatedPrimary)
     {
-        Map<String, String> repConfig = new HashMap<String, String>();
-        repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
-        repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
-
-        ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(getName(), nodePath, TEST_GROUP_NAME, nodeName,
-                nodeHostPort, TEST_NODE_HELPER_HOST_PORT, TEST_DURABILITY, designatedPrimary, TEST_COALESCING_SYNC,
-                Collections.<String, String> emptyMap(), repConfig, _remoteReplicationNodeFactory);
-        return ref;
+        ReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary);
+        return new ReplicatedEnvironmentFacade(getName(), nodePath, node, _remoteReplicationNodeFactory);
     }
 
     private ReplicatedEnvironmentFacade[] startClusterSequentially(int nodeNumber) throws InterruptedException
@@ -472,4 +482,24 @@ public class ReplicatedEnvironmentFacade
         environmentFacade.openDatabases(new String[] { databaseName }, dbConfig);
         return dbConfig;
     }
+
+    private ReplicationNode createReplicationNodeMock(String nodeName, String nodeHostPort, boolean designatedPrimary)
+    {
+        ReplicationNode node =  mock(ReplicationNode.class);
+        when(node.getAttribute(NAME)).thenReturn(nodeName);
+        when(node.getName()).thenReturn(nodeName);
+        when(node.getAttribute(HOST_PORT)).thenReturn(nodeHostPort);
+        when(node.getAttribute(DESIGNATED_PRIMARY)).thenReturn(designatedPrimary);
+        when(node.getAttribute(GROUP_NAME)).thenReturn(TEST_GROUP_NAME);
+        when(node.getAttribute(HELPER_HOST_PORT)).thenReturn(TEST_NODE_HELPER_HOST_PORT);
+        when(node.getAttribute(DURABILITY)).thenReturn(TEST_DURABILITY);
+        when(node.getAttribute(COALESCING_SYNC)).thenReturn(TEST_COALESCING_SYNC);
+
+        Map<String, String> repConfig = new HashMap<String, String>();
+        repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
+        repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
+        when(node.getAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig);
+        return node;
+    }
+
 }

Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java?rev=1552591&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java Fri Dec 20 10:53:01 2013
@@ -0,0 +1,205 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.ConfigurationEntry;
+import org.apache.qpid.server.configuration.RecovererProvider;
+import org.apache.qpid.server.configuration.startup.VirtualHostRecoverer;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+
+public class VirtualHostTest extends QpidTestCase
+{
+
+    private Broker _broker;
+    private StatisticsGatherer _statisticsGatherer;
+    private RecovererProvider _recovererProvider;
+    private File _configFile;
+    private File _bdbStorePath;
+    private VirtualHost _host;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+
+        _broker = BrokerTestHelper.createBrokerMock();
+        TaskExecutor taslExecutor = mock(TaskExecutor.class);
+        when(taslExecutor.isTaskExecutorThread()).thenReturn(true);
+        when(_broker.getTaskExecutor()).thenReturn(taslExecutor);
+
+        _recovererProvider = mock(RecovererProvider.class);
+        _statisticsGatherer = mock(StatisticsGatherer.class);
+
+        _bdbStorePath = new File(TMP_FOLDER, getTestName() + "." + System.currentTimeMillis());
+        _bdbStorePath.deleteOnExit();
+    }
+
+    @Override
+    protected void tearDown() throws Exception
+    {
+        try
+        {
+            if (_host != null)
+            {
+                _host.setDesiredState(_host.getActualState(), State.STOPPED);
+            }
+        }
+        finally
+        {
+            if (_configFile != null)
+            {
+                _configFile.delete();
+            }
+            if (_bdbStorePath != null)
+            {
+                FileUtils.delete(_bdbStorePath, true);
+            }
+            super.tearDown();
+            CurrentActor.remove();
+        }
+    }
+
+    public void testCreateBdbVirtualHostFromConfigurationFile()
+    {
+        String hostName = getName();
+        long logFileMax = 2000000;
+        _host = createHostFromConfiguration(hostName, logFileMax);
+        _host.setDesiredState(State.INITIALISING, State.ACTIVE);
+        assertEquals("Unexpected host name", hostName, _host.getName());
+        assertEquals("Unexpected host type", StandardVirtualHostFactory.TYPE, _host.getType());
+        assertEquals("Unexpected store type", BDBMessageStore.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE));
+        assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
+
+        BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
+        EnvironmentConfig envConfig = messageStore.getEnvironmentFacade().getEnvironment().getConfig();
+        assertEquals("Unexpected JE log file max", String.valueOf(logFileMax), envConfig.getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
+
+    }
+
+    public void testCreateBdbHaVirtualHostFromConfigurationFile()
+    {
+        String hostName = getName();
+
+        String repStreamTimeout = "2 h";
+        String nodeName = "node";
+        String groupName = "group";
+        String nodeHostPort = "localhost:" + findFreePort();
+        String helperHostPort = nodeHostPort;
+        String durability = "NO_SYNC,SYNC,NONE";
+        _host = createHaHostFromConfiguration(hostName, groupName, nodeName, nodeHostPort, helperHostPort, durability, repStreamTimeout);
+        _host.setDesiredState(State.INITIALISING, State.ACTIVE);
+        assertEquals("Unexpected host name", hostName, _host.getName());
+        assertEquals("Unexpected host type", BDBHAVirtualHostFactory.TYPE, _host.getType());
+        assertEquals("Unexpected store type", ReplicatedEnvironmentFacade.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE));
+        assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
+
+        ReplicationNode localNode = _host.getChildren(ReplicationNode.class).iterator().next();
+
+        assertEquals(nodeName, localNode.getName());
+        assertEquals(groupName, localNode.getAttribute(ReplicationNode.GROUP_NAME));
+        assertEquals(nodeHostPort, localNode.getAttribute(ReplicationNode.HOST_PORT));
+        assertEquals(helperHostPort, localNode.getAttribute(ReplicationNode.HELPER_HOST_PORT));
+        assertEquals(durability, localNode.getAttribute(ReplicationNode.DURABILITY));
+
+        BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
+        ReplicatedEnvironment environment = (ReplicatedEnvironment) messageStore.getEnvironmentFacade().getEnvironment();
+        ReplicationConfig envConfig = environment.getRepConfig();
+        assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, envConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
+    }
+
+    private VirtualHost createHost(Map<String, Object> attributes)
+    {
+        ConfigurationEntry entry = new ConfigurationEntry(UUID.randomUUID(), VirtualHost.class.getSimpleName(), attributes,
+                Collections.<UUID> emptySet(), null);
+
+        return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker);
+    }
+
+    private VirtualHost createHostFromConfiguration(String hostName, long logFileMax)
+    {
+        String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">"
+                        + "<store><class>" + BDBMessageStore.class.getName() + "</class>"
+                        + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>"
+                        + "<envConfig><name>" + EnvironmentConfig.LOG_FILE_MAX + "</name><value>" + logFileMax + "</value></envConfig>"
+                        + "</store>"
+                        + "</" + hostName + "></virtualhost></virtualhosts>";
+        Map<String, Object> attributes = writeConfigAndGenerateAttributes(content);
+        return createHost(attributes);
+    }
+
+    private VirtualHost createHaHostFromConfiguration(String hostName, String groupName, String nodeName, String nodeHostPort, String helperHostPort, String durability, String repStreamTimeout)
+    {
+        String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">"
+                        + "<type>" + BDBHAVirtualHostFactory.TYPE + "</type>"
+                        + "<store><class>" + BDBMessageStore.class.getName() + "</class>"
+                        + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>"
+                        + "<highAvailability>"
+                        + "<groupName>" + groupName + "</groupName>"
+                        + "<nodeName>" + nodeName + "</nodeName>"
+                        + "<nodeHostPort>" + nodeHostPort + "</nodeHostPort>"
+                        + "<helperHostPort>" + helperHostPort + "</helperHostPort>"
+                        + "<durability>" + durability.replaceAll(",", "\\\\,") + "</durability>"
+                        + "</highAvailability>"
+                        + "<repConfig><name>" + ReplicationConfig.REP_STREAM_TIMEOUT + "</name><value>" + repStreamTimeout + "</value></repConfig>"
+                        + "</store>"
+                        + "</" + hostName + "></virtualhost></virtualhosts>";
+        Map<String, Object> attributes = writeConfigAndGenerateAttributes(content);
+        return createHost(attributes);
+    }
+
+    private Map<String, Object> writeConfigAndGenerateAttributes(String content)
+    {
+        _configFile = TestFileUtils.createTempFile(this, ".virtualhost.xml", content);
+        Map<String, Object> attributes = new HashMap<String, Object>();
+        attributes.put(VirtualHost.NAME, getName());
+        attributes.put(VirtualHost.CONFIG_PATH, _configFile.getAbsolutePath());
+        return attributes;
+    }
+}
+
+    
\ No newline at end of file

Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java?rev=1552591&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java Fri Dec 20 10:53:01 2013
@@ -0,0 +1,142 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class LocalReplicationNodeTest extends QpidTestCase
+{
+
+    private static final Object INVALID_VALUE = new Object();
+    private UUID _id;
+    private VirtualHost _virtualHost;
+    private TaskExecutor _taskExecutor;
+    
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _taskExecutor = mock(TaskExecutor.class);
+        _virtualHost = mock(VirtualHost.class);
+    }
+
+    public void testCreateLocalReplicationNodeWithoutDefaultParametersAndValidParameters()
+    {
+        Map<String, Object> attributes = createValidAttributes();
+
+        LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor);
+
+        assertNodeAttributes(attributes, node);
+
+        for (Map.Entry<String, Object> attributeEntry : LocalReplicationNode.DEFAULTS.entrySet())
+        {
+            assertEquals("Unexpected attribute value for attribute name " + attributeEntry.getKey(), attributeEntry.getValue(), node.getAttribute(attributeEntry.getKey()));
+        }
+    }
+
+    public void testCreateLocalReplicationNodeWithoutDefaultParametersAndMissedParameters()
+    {
+        Map<String, Object> attributes = createValidAttributes();
+
+        for (Map.Entry<String, Object> attributeEntry : attributes.entrySet())
+        {
+            String name = attributeEntry.getKey();
+            Map<String, Object> incompleteAttributes = new HashMap<String, Object>(attributes);
+            incompleteAttributes.remove(name);
+            try
+            {
+                new LocalReplicationNode(_id, incompleteAttributes, _virtualHost, _taskExecutor);
+                fail("Node creation should fails when attribute " + name + " is missed");
+            }
+            catch(IllegalConfigurationException e)
+            {
+                // pass
+            }
+        }
+    }
+
+    public void testCreateLocalReplicationNodeWithoutDefaultParametersAndInvalidParameters()
+    {
+ 
+        Map<String, Object> attributes = createValidAttributes();
+
+        for (Map.Entry<String, Object> attributeEntry : attributes.entrySet())
+        {
+            String name = attributeEntry.getKey();
+            Object value = attributeEntry.getValue();
+            if (!(value instanceof String))
+            {
+                Map<String, Object> invalidAttributes = new HashMap<String, Object>(attributes);
+                invalidAttributes.put(name, INVALID_VALUE);
+                try
+                {
+                    new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor);
+                    fail("Node creation should fails when attribute " + name + " is invalid");
+                }
+                catch(IllegalConfigurationException e)
+                {
+                    // pass
+                }
+            }
+        }
+    }
+
+    public void testCreateLocalReplicationNodeWithOverriddenDefaultParameters()
+    {
+        Map<String, Object> attributes = createValidAttributes();
+        attributes.put(ReplicationNode.DURABILITY, "SYNC,SYNC,NONE");
+        attributes.put(ReplicationNode.COALESCING_SYNC, false);
+        attributes.put(ReplicationNode.DESIGNATED_PRIMARY, true);
+
+        LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor);
+
+        assertNodeAttributes(attributes, node);
+    }
+
+    private Map<String, Object> createValidAttributes()
+    {
+        Map<String, Object> attributes = new HashMap<String, Object>();
+        attributes.put(ReplicationNode.NAME, "testNode");
+        attributes.put(ReplicationNode.GROUP_NAME, "testGroup");
+        attributes.put(ReplicationNode.HOST_PORT, "localhost:5000");
+        attributes.put(ReplicationNode.HELPER_HOST_PORT, "localhost:5001");
+        return attributes;
+    }
+
+    private void assertNodeAttributes(Map<String, Object> expectedAttributes,
+            LocalReplicationNode node)
+    {
+        for (Map.Entry<String, Object> attributeEntry : expectedAttributes.entrySet())
+        {
+            assertEquals("Unexpected attribute value for attribute name " + attributeEntry.getKey(), attributeEntry.getValue(), node.getAttribute(attributeEntry.getKey()));
+        }
+    }
+
+}

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/pom.xml?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/pom.xml Fri Dec 20 10:53:01 2013
@@ -42,6 +42,13 @@
     </dependency>
 
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>${mockito-version}</version>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-systests</artifactId>
       <version>0.26-SNAPSHOT</version>

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/VirtualHostRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/VirtualHostRecoverer.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/VirtualHostRecoverer.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/VirtualHostRecoverer.java Fri Dec 20 10:53:01 2013
@@ -21,14 +21,21 @@
 package org.apache.qpid.server.configuration.startup;
 
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
 import org.apache.qpid.server.configuration.ConfigurationEntry;
 import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer;
 import org.apache.qpid.server.configuration.RecovererProvider;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ReplicationNode;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
+import org.apache.qpid.server.plugin.ReplicationNodeFactory;
 import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.util.MapValueConverter;
 
 public class VirtualHostRecoverer implements ConfiguredObjectRecoverer<VirtualHost>
 {
@@ -45,7 +52,54 @@ public class VirtualHostRecoverer implem
     {
         Broker broker = RecovererHelper.verifyOnlyBrokerIsParent(parents);
 
-        return new VirtualHostAdapter(entry.getId(), entry.getAttributes(), broker, _brokerStatisticsGatherer, broker.getTaskExecutor());
+        Map<String, Object> attributes = entry.getAttributes();
+        VirtualHostAdapter virtualHostAdapter = new VirtualHostAdapter(entry.getId(), attributes, broker, _brokerStatisticsGatherer, broker.getTaskExecutor());
+
+        // TODO temporary code to bridge from VH attributes to LocalReplicationNode - will be move into a new ReplicationNodeRecoverer
+        if (attributes.containsKey(VirtualHost.TYPE))
+        {
+            String type = MapValueConverter.getStringAttribute(VirtualHost.TYPE, attributes);
+            ReplicationNodeFactory replicationNodeFactory = ReplicationNodeFactory.FACTORIES.get(type);
+            
+            UUID uuid = null;
+            Map<String, Object> replicationNodeAttributes = new HashMap<String, Object>();
+            replicationNodeAttributes.put(ReplicationNode.NAME, attributes.get("haNodeName"));
+            replicationNodeAttributes.put(ReplicationNode.GROUP_NAME, attributes.get("haGroupName"));
+            replicationNodeAttributes.put(ReplicationNode.HOST_PORT, attributes.get("haNodeAddress"));
+            replicationNodeAttributes.put(ReplicationNode.HELPER_HOST_PORT, attributes.get("haHelperAddress"));
+
+            if (attributes.get("haDurability") != null)
+            {
+                replicationNodeAttributes.put(ReplicationNode.DURABILITY, attributes.get("haDurability"));
+            }
+
+            if (attributes.get("haDesignatedPrimary") != null)
+            {
+                replicationNodeAttributes.put(ReplicationNode.DESIGNATED_PRIMARY, attributes.get("haDesignatedPrimary"));
+            }
+
+            if (attributes.get("haCoalescingSync") != null)
+            {
+                replicationNodeAttributes.put(ReplicationNode.COALESCING_SYNC, attributes.get("haCoalescingSync"));
+            }
+
+            if (attributes.get("bdbEnvironmentConfig") != null)
+            {
+                replicationNodeAttributes.put(ReplicationNode.PARAMETERS, attributes.get("bdbEnvironmentConfig"));
+            }
+
+            if (attributes.get("haReplicationConfig") != null)
+            {
+                replicationNodeAttributes.put(ReplicationNode.REPLICATION_PARAMETERS, attributes.get("haReplicationConfig"));
+            }
+
+            if (replicationNodeFactory != null)
+            {
+                ReplicationNode node = replicationNodeFactory.createInstance(uuid , attributes, virtualHostAdapter);
+                virtualHostAdapter.onReplicationNodeRecovered(node);
+            }
+        }
+        return virtualHostAdapter;
     }
 
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Fri Dec 20 10:53:01 2013
@@ -120,8 +120,8 @@ public final class VirtualHostAdapter ex
         super(id, null, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES, false), taskExecutor, false);
         _broker = broker;
         _brokerStatisticsGatherer = brokerStatisticsGatherer;
-        validateAttributes();
         addParent(Broker.class, broker);
+        validateAttributes();
     }
 
     private void validateAttributes()
@@ -163,7 +163,7 @@ public final class VirtualHostAdapter ex
         // pre-load the configuration in order to validate
         try
         {
-            createVirtualHostConfiguration(name);
+            createVirtualHostConfiguration(name, null);
         }
         catch(ConfigurationException e)
         {
@@ -577,6 +577,7 @@ public final class VirtualHostAdapter ex
         return _statistics;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
     {
@@ -596,6 +597,10 @@ public final class VirtualHostAdapter ex
         {
             return (Collection<C>) getAliases();
         }
+        else if (clazz == ReplicationNode.class)
+        {
+            return (Collection<C>)getReplicationNodes();
+        }
         else
         {
             return Collections.emptySet();
@@ -629,6 +634,9 @@ public final class VirtualHostAdapter ex
         {
             throw new UnsupportedOperationException();
         }
+        
+        // TODO KW change add child to add the replication node?
+        
         throw new IllegalArgumentException("Cannot create a child of class " + childClass.getSimpleName());
     }
 
@@ -1131,7 +1139,7 @@ public final class VirtualHostAdapter ex
         String virtualHostName = getName();
         try
         {
-            VirtualHostConfiguration configuration = createVirtualHostConfiguration(virtualHostName);
+            VirtualHostConfiguration configuration = createVirtualHostConfiguration(virtualHostName, this);
             String type = configuration.getType();
             final VirtualHostFactory factory = VirtualHostFactory.FACTORIES.get(type);
             if(factory == null)
@@ -1171,12 +1179,13 @@ public final class VirtualHostAdapter ex
         }
     }
 
-    private VirtualHostConfiguration createVirtualHostConfiguration(String virtualHostName) throws ConfigurationException
+    private VirtualHostConfiguration createVirtualHostConfiguration(String virtualHostName, ReplicationGroupListener listener) throws ConfigurationException
     {
         VirtualHostConfiguration configuration;
         String configurationFile = (String)getAttribute(CONFIG_PATH);
         if (configurationFile == null)
         {
+            LOGGER.debug("Creating virtual host configuration from the attributes");
             final MyConfiguration basicConfiguration = new MyConfiguration();
             PropertiesConfiguration config = new PropertiesConfiguration();
             final String type = (String) getAttribute(TYPE);
@@ -1198,6 +1207,10 @@ public final class VirtualHostAdapter ex
         }
         else
         {
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Creating virtual host configuration from configuration file " + configurationFile);
+            }
             if (!new File(configurationFile).exists())
             {
                 throw new IllegalConfigurationException("Configuration file '" + configurationFile + "' does not exist");
@@ -1213,7 +1226,15 @@ public final class VirtualHostAdapter ex
                     changeAttribute(entry.getKey(), getAttribute(entry.getKey()), entry.getValue());
                 }
             }
-
+            ReplicationNode replicationNode = factory.createReplicationNode(configuration.getConfig(), this);
+            if (listener != null && replicationNode != null)
+            {
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Replication node " + replicationNode);
+                }
+                listener.onReplicationNodeRecovered(replicationNode);
+            }
         }
         return configuration;
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/PluggableFactoryLoader.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/PluggableFactoryLoader.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/PluggableFactoryLoader.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/PluggableFactoryLoader.java Fri Dec 20 10:53:01 2013
@@ -31,11 +31,17 @@ public class PluggableFactoryLoader<T ex
     private final Map<String, T> _factoriesMap;
     private final Set<String> _types;
 
+    
     public PluggableFactoryLoader(Class<T> factoryClass)
     {
+        this(factoryClass, true);
+    }
+
+    public PluggableFactoryLoader(Class<T> factoryClass, boolean atLeastOnce)
+    {
         Map<String, T> fm = new HashMap<String, T>();
         QpidServiceLoader<T> qpidServiceLoader = new QpidServiceLoader<T>();
-        Iterable<T> factories = qpidServiceLoader.atLeastOneInstanceOf(factoryClass);
+        Iterable<T> factories = atLeastOnce? qpidServiceLoader.atLeastOneInstanceOf(factoryClass) : qpidServiceLoader.instancesOf(factoryClass);
         for (T factory : factories)
         {
             String descriptiveType = factory.getType();

Copied: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ReplicationNodeFactory.java (from r1550805, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ReplicationNodeFactory.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ReplicationNodeFactory.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java&r1=1550805&r2=1552591&rev=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ReplicationNodeFactory.java Fri Dec 20 10:53:01 2013
@@ -18,9 +18,19 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.store.berkeleydb.replication;
+package org.apache.qpid.server.plugin;
 
-public interface RemoteReplicationNodeFactory
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.VirtualHost;
+
+public interface ReplicationNodeFactory extends Pluggable
 {
-    RemoteReplicationNode create(String groupName, String nodeName, String host, int port);
+
+    PluggableFactoryLoader<ReplicationNodeFactory> FACTORIES = new PluggableFactoryLoader<ReplicationNodeFactory>(ReplicationNodeFactory.class, false);
+
+    ReplicationNode createInstance(UUID id, Map<String, Object> attributes, VirtualHost virtualHost);
+
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java?rev=1552591&r1=1552590&r2=1552591&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java Fri Dec 20 10:53:01 2013
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.commons.configuration.Configuration;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.model.ReplicationNode;
 import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.stats.StatisticsGatherer;
@@ -49,6 +50,8 @@ public interface VirtualHostFactory exte
 
     Map<String,Object> convertVirtualHostConfiguration(Configuration configuration);
 
+    ReplicationNode createReplicationNode(Configuration configuration, org.apache.qpid.server.model.VirtualHost virtualHost);
+
     static final class TYPES
     {
         private TYPES()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org