You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/02/05 16:41:20 UTC

svn commit: r1564815 - in /qpid/branches/java-broker-bdb-ha/qpid/java: bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/ bdbstore/src/main/java/org/apache/qpi...

Author: kwall
Date: Wed Feb  5 15:41:20 2014
New Revision: 1564815

URL: http://svn.apache.org/r1564815
Log:
QPID-5409: Refactor RemoteReplicationNode to delegate je calls to ReplicatedEnvironmentFacade

Modified:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java?rev=1564815&r1=1564814&r2=1564815&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java Wed Feb  5 15:41:20 2014
@@ -185,7 +185,7 @@ public class BDBHAMessageStoreManagerMBe
         {
             _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName);
         }
-        catch (AMQStoreException e)
+        catch (Exception e)
         {
             LOGGER.error("Failed to remove node " + nodeName + " from group", e);
             throw new JMException(e.getMessage());

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java?rev=1564815&r1=1564814&r2=1564815&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java Wed Feb  5 15:41:20 2014
@@ -47,6 +47,8 @@ import org.apache.qpid.server.store.berk
 import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
 import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
 
+import com.sleepycat.je.DatabaseException;
+
 public class BDBHAMessageStoreManagerMBeanTest extends TestCase
 {
     private static final String TEST_GROUP_NAME = "testGroupName";
@@ -168,7 +170,7 @@ public class BDBHAMessageStoreManagerMBe
 
     public void testRemoveNodeFromReplicationGroupWithError() throws Exception
     {
-        doThrow(new AMQStoreException("mocked exception")).when(_replicatedEnvironmentFacadee).removeNodeFromGroup(TEST_NODE_NAME);
+        doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacadee).removeNodeFromGroup(TEST_NODE_NAME);
 
         try
         {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java?rev=1564815&r1=1564814&r2=1564815&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java Wed Feb  5 15:41:20 2014
@@ -29,7 +29,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -47,8 +46,6 @@ import org.apache.qpid.server.util.MapVa
 
 import com.sleepycat.je.rep.NodeState;
 import com.sleepycat.je.rep.ReplicatedEnvironment;
-import com.sleepycat.je.rep.util.DbPing;
-import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
 import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException;
 
 /**
@@ -67,23 +64,21 @@ public class RemoteReplicationNode exten
     private final com.sleepycat.je.rep.ReplicationNode _replicationNode;
     private final String _hostPort;
     private final String _groupName;
-    private final DbPing _dbPing;
-    private final ReplicationGroupAdmin _replicationGroupAdmin;
+    private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
 
     private volatile String _role;
     private volatile long _joinTime;
     private volatile long _lastTransactionId;
 
-    public RemoteReplicationNode(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName, VirtualHost virtualHost,
-            TaskExecutor taskExecutor, DbPing dbPing, ReplicationGroupAdmin admin)
+    public RemoteReplicationNode(com.sleepycat.je.rep.ReplicationNode replicationNode, VirtualHost virtualHost,
+            TaskExecutor taskExecutor, ReplicatedEnvironmentFacade replicatedEnvironmentFacade)
     {
-        super(UUIDGenerator.generateReplicationNodeId(groupName, replicationNode.getName()), null, null, taskExecutor);
+        super(UUIDGenerator.generateReplicationNodeId(replicatedEnvironmentFacade.getGroupName(), replicationNode.getName()), null, null, taskExecutor);
         addParent(VirtualHost.class, virtualHost);
-        _groupName = groupName;
+        _groupName = replicatedEnvironmentFacade.getGroupName();
         _hostPort = replicationNode.getHostName() + ":" + replicationNode.getPort();
         _replicationNode = replicationNode;
-        _dbPing = dbPing;
-        _replicationGroupAdmin = admin;
+        _replicatedEnvironmentFacade = replicatedEnvironmentFacade;
     }
 
     @Override
@@ -175,8 +170,15 @@ public class RemoteReplicationNode exten
                 {
                     LOGGER.debug("Deleting node " + _groupName + ":" + getName());
                 }
-                _replicationGroupAdmin.removeMember(getName());
-                return true;
+                try
+                {
+                    _replicatedEnvironmentFacade.removeNodeFromGroup(getName());
+                    return true;
+                }
+                catch (Exception e)
+                {
+                    LOGGER.warn("Failure to remove node remotely", e);
+                }
             }
         }
         return false;
@@ -240,7 +242,8 @@ public class RemoteReplicationNode exten
 
         try
         {
-            NodeState state = _dbPing.getNodeState();
+            //TODO: updateNodeState is called from ReplicatedEnvironmentFacade to call getRemoteNodeState. Odd!!!
+            NodeState state = _replicatedEnvironmentFacade.getRemoteNodeState(_replicationNode);
             _role = state.getNodeState().name();
             _joinTime = state.getJoinTime();
             _lastTransactionId = state.getCurrentTxnEndVLSN();
@@ -248,12 +251,12 @@ public class RemoteReplicationNode exten
         catch (IOException e)
         {
             _role = com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN.name();
-            LOGGER.warn("Cannot connect to node " + _replicationNode.getName() + " from " + _groupName, e);
+            LOGGER.warn("Cannot connect to node " + _replicationNode.getName() + " from " + _groupName);
         }
         catch (ServiceConnectFailedException e)
         {
             _role = com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN.name();
-            LOGGER.warn("Cannot retrieve the node details for node " + _replicationNode.getName() + " from " + _groupName, e);
+            LOGGER.warn("Cannot retrieve the node details for node " + _replicationNode.getName() + " from " + _groupName);
         }
 
         if (!_role.equals(oldRole))
@@ -306,7 +309,7 @@ public class RemoteReplicationNode exten
                         LOGGER.debug("Trying to transfer master to " + nodeName);
                     }
 
-                    _replicationGroupAdmin.transferMaster(Collections.singleton(nodeName), ReplicatedEnvironmentFacade.MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true);
+                    _replicatedEnvironmentFacade.transferMasterAsynchronously(nodeName);
 
                     if (LOGGER.isDebugEnabled())
                     {
@@ -346,4 +349,9 @@ public class RemoteReplicationNode exten
         }
     }
 
+    com.sleepycat.je.rep.ReplicationNode getReplicationNode()
+    {
+        return _replicationNode;
+    }
+
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java?rev=1564815&r1=1564814&r2=1564815&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java Wed Feb  5 15:41:20 2014
@@ -22,7 +22,7 @@ package org.apache.qpid.server.store.ber
 
 public interface RemoteReplicationNodeFactory
 {
-    RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode jeNode, String groupName);
+    RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode jeNode, ReplicatedEnvironmentFacade environmentFacade);
 
     long getRemoteNodeMonitorInterval();
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1564815&r1=1564814&r2=1564815&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Wed Feb  5 15:41:20 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.ber
 import static org.apache.qpid.server.model.ReplicationNode.*;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -65,6 +66,8 @@ import com.sleepycat.je.rep.Insufficient
 import com.sleepycat.je.rep.InsufficientReplicasException;
 import com.sleepycat.je.rep.NetworkRestore;
 import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.RepInternal;
 import com.sleepycat.je.rep.ReplicatedEnvironment;
 import com.sleepycat.je.rep.ReplicationConfig;
 import com.sleepycat.je.rep.ReplicationGroup;
@@ -72,8 +75,12 @@ import com.sleepycat.je.rep.ReplicationM
 import com.sleepycat.je.rep.ReplicationNode;
 import com.sleepycat.je.rep.StateChangeEvent;
 import com.sleepycat.je.rep.StateChangeListener;
+import com.sleepycat.je.rep.util.DbPing;
 import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException;
+import com.sleepycat.je.rep.vlsn.VLSNRange;
 import com.sleepycat.je.utilint.PropUtil;
+import com.sleepycat.je.utilint.VLSN;
 
 public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
 {
@@ -88,6 +95,11 @@ public class ReplicatedEnvironmentFacade
 
     public static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT);
 
+    public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout";
+    private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000;
+
+    private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT);
+
     @SuppressWarnings("serial")
     private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
     {{
@@ -148,7 +160,6 @@ public class ReplicatedEnvironmentFacade
 
     private volatile ReplicatedEnvironment _environment;
     private long _joinTime;
-    private long _lastKnownReplicationTransactionId;
 
     public ReplicatedEnvironmentFacade(LocalReplicationNode replicationNode, RemoteReplicationNodeFactory remoteReplicationNodeFactory)
     {
@@ -449,22 +460,9 @@ public class ReplicatedEnvironmentFacade
         return members;
     }
 
-    public void removeNodeFromGroup(final String nodeName) throws AMQStoreException
+    public void removeNodeFromGroup(final String nodeName)
     {
-        try
-        {
-            createReplicationGroupAdmin().removeMember(nodeName);
-        }
-        catch (OperationFailureException ofe)
-        {
-            // TODO: I am not sure about the exception handing here
-            throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe);
-        }
-        catch (DatabaseException e)
-        {
-            // TODO: I am not sure about the exception handing here
-            throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e);
-        }
+        createReplicationGroupAdmin().removeMember(nodeName);
     }
 
     public void setDesignatedPrimary(final boolean isPrimary) throws AMQStoreException
@@ -517,8 +515,6 @@ public class ReplicatedEnvironmentFacade
 
     public void setPriority(int priority) throws AMQStoreException
     {
-        checkNotOpeningAndEnvironmentIsValid();
-
         try
         {
             final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
@@ -537,11 +533,11 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    private void checkNotOpeningAndEnvironmentIsValid()
+    private void checkIsOpenAndEnvironmentIsValid()
     {
-        if (_state.get() == State.OPENING)
+        if (_state.get() != State.OPEN)
         {
-            throw new IllegalStateException("Environment facade is in opening state");
+            throw new IllegalStateException("Environment facade is not in open state");
         }
 
         if (!_environment.isValid())
@@ -558,8 +554,6 @@ public class ReplicatedEnvironmentFacade
 
     public void setElectableGroupSizeOverride(int electableGroupOverride) throws AMQStoreException
     {
-        checkNotOpeningAndEnvironmentIsValid();
-
         try
         {
             final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
@@ -586,13 +580,27 @@ public class ReplicatedEnvironmentFacade
 
     public long getLastKnownReplicationTransactionId()
     {
-        return _lastKnownReplicationTransactionId;
+        if (_state.get() == State.OPEN)
+        {
+            VLSNRange range = RepInternal.getRepImpl(_environment).getVLSNIndex().getRange();
+            VLSN lastTxnEnd = range.getLastTxnEnd();
+            LOGGER.debug("VLSN Range is " + range );
+            return lastTxnEnd.getSequence();
+        }
+        else
+        {
+            return -1L;
+        }
     }
 
-    public void transferMasterToSelfAsynchronously() throws AMQStoreException
+    public void transferMasterToSelfAsynchronously()
     {
-        checkNotOpeningAndEnvironmentIsValid();
+        final String nodeName = getNodeName();
+        transferMasterAsynchronously(nodeName);
+    }
 
+    public void transferMasterAsynchronously(final String nodeName)
+    {
         _groupChangeExecutor.submit(new Runnable()
         {
             @Override
@@ -601,7 +609,7 @@ public class ReplicatedEnvironmentFacade
                 try
                 {
                     ReplicationGroupAdmin admin = createReplicationGroupAdmin();
-                    String newMaster = admin.transferMaster(Collections.singleton(getNodeName()), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true);
+                    String newMaster = admin.transferMaster(Collections.singleton(nodeName), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true);
                     if (LOGGER.isDebugEnabled())
                     {
                         LOGGER.debug("The mastership has been transfered to " + newMaster);
@@ -661,7 +669,7 @@ public class ReplicatedEnvironmentFacade
             String discoveredNodeName = replicationNode.getName();
             if (!discoveredNodeName.equals(localNodeName))
             {
-                RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName());
+                RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, this);
 
                 _remoteReplicationNodes.put(replicationNode.getName(), remoteNode);
             }
@@ -679,10 +687,15 @@ public class ReplicatedEnvironmentFacade
     private ReplicationGroupAdmin createReplicationGroupAdmin()
     {
         final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
-        helpers.addAll(_environment.getRepConfig().getHelperSockets());
+        for (RemoteReplicationNode node : _remoteReplicationNodes.values())
+        {
+            helpers.add(node.getReplicationNode().getSocketAddress());
+        }
 
-        final ReplicationConfig repConfig = _environment.getRepConfig();
-        helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort()));
+        //TODO: refactor this into a method on LocalReplicationNode
+        String hostPort = (String)_replicationNode.getAttribute(org.apache.qpid.server.model.ReplicationNode.HOST_PORT);
+        String[] tokens = hostPort.split(":");
+        helpers.add(new InetSocketAddress(tokens[0], Integer.parseInt(tokens[1])));
 
         return new ReplicationGroupAdmin((String)_replicationNode.getAttribute(GROUP_NAME), helpers);
     }
@@ -936,6 +949,15 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
+    public NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException
+    {
+        if (repNode == null)
+        {
+            throw new IllegalArgumentException("Node cannot be null");
+        }
+        return new DbPing(repNode, (String)_replicationNode.getAttribute(GROUP_NAME), DB_PING_SOCKET_TIMEOUT).getNodeState();
+    }
+
     private final class GroupChangeLearner implements Runnable
     {
         @Override
@@ -968,7 +990,7 @@ public class ReplicatedEnvironmentFacade
                                 LOGGER.debug("Remote replication node added '" + replicationNode + "' to '" + groupName + "'");
                             }
 
-                            RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName());
+                            RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, ReplicatedEnvironmentFacade.this);
 
                             _remoteReplicationNodes.put(discoveredNodeName, remoteNode);
 
@@ -1113,5 +1135,4 @@ public class ReplicatedEnvironmentFacade
         }
 
     }
-
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java?rev=1564815&r1=1564814&r2=1564815&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java Wed Feb  5 15:41:20 2014
@@ -21,9 +21,6 @@
 package org.apache.qpid.server.store.berkeleydb.replication;
 
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.model.ReplicationNode;
@@ -33,8 +30,6 @@ import org.apache.qpid.server.store.berk
 
 import com.sleepycat.je.Durability;
 import com.sleepycat.je.Durability.SyncPolicy;
-import com.sleepycat.je.rep.util.DbPing;
-import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
 
 //TODO: Should LocalReplicationNode implement EnvironmentFacadeFactory instead of having this class?
 public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
@@ -79,19 +74,9 @@ public class ReplicatedEnvironmentFacade
         }
 
         @Override
-        public RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName)
+        public RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode replicationNode, ReplicatedEnvironmentFacade environmentFacade)
         {
-            Map<String, Object> attributes = new HashMap<String, Object>();
-            attributes.put(ReplicationNode.NAME, replicationNode.getName());
-            attributes.put(ReplicationNode.GROUP_NAME, groupName);
-            attributes.put(ReplicationNode.HOST_PORT, replicationNode.getHostName() + ":" + replicationNode.getPort());
-
-            Long monitorTimeout = (Long)_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT);
-            DbPing dbPing = new DbPing(replicationNode, groupName, monitorTimeout.intValue());
-
-            ReplicationGroupAdmin replicationGroupAdmin = new ReplicationGroupAdmin(groupName, Collections.singleton(replicationNode.getSocketAddress()));
-
-            return new RemoteReplicationNode(replicationNode, groupName, _virtualHost, _virtualHost.getTaskExecutor(), dbPing, replicationGroupAdmin);
+            return new RemoteReplicationNode(replicationNode, _virtualHost, _virtualHost.getTaskExecutor(), environmentFacade);
         }
 
         @Override

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java?rev=1564815&r1=1564814&r2=1564815&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java Wed Feb  5 15:41:20 2014
@@ -1,17 +1,26 @@
 package org.apache.qpid.server.store.berkeleydb.replication;
 
-import static org.mockito.Mockito.any;
+import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC;
+import static org.apache.qpid.server.model.ReplicationNode.DURABILITY;
+import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME;
+import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.JOIN_TIME;
+import static org.apache.qpid.server.model.ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID;
+import static org.apache.qpid.server.model.ReplicationNode.NAME;
+import static org.apache.qpid.server.model.ReplicationNode.PARAMETERS;
+import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
+import static org.apache.qpid.server.model.ReplicationNode.ROLE;
+import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
-import static org.apache.qpid.server.model.ReplicationNode.*;
+import static org.mockito.Mockito.when;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -21,8 +30,6 @@ import org.apache.qpid.test.utils.QpidTe
 import com.sleepycat.je.rep.NodeState;
 import com.sleepycat.je.rep.ReplicatedEnvironment.State;
 import com.sleepycat.je.rep.ReplicationNode;
-import com.sleepycat.je.rep.util.DbPing;
-import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
 
 public class RemoteReplicationNodeTest extends QpidTestCase
 {
@@ -34,8 +41,7 @@ public class RemoteReplicationNodeTest e
     private ReplicationNode _replicationNode;
     private String _nodeName;
     private int _port;
-    private DbPing _dbPing;
-    private ReplicationGroupAdmin _remoteReplicationAdmin;
+    private ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
 
     @Override
     protected void setUp() throws Exception
@@ -47,15 +53,15 @@ public class RemoteReplicationNodeTest e
         _replicationNode = mock(ReplicationNode.class);
         _virtualHost = mock(VirtualHost.class);
         _taskExecutor = mock(TaskExecutor.class);
-        _dbPing = mock(DbPing.class);
-        _remoteReplicationAdmin = mock(ReplicationGroupAdmin.class);
+        _replicatedEnvironmentFacade = mock(ReplicatedEnvironmentFacade.class);
+        when(_replicatedEnvironmentFacade.getGroupName()).thenReturn(_groupName);
 
         when(_taskExecutor.isTaskExecutorThread()).thenReturn(true);
         when(_replicationNode.getName()).thenReturn(_nodeName);
         when(_replicationNode.getHostName()).thenReturn("localhost");
         when(_replicationNode.getPort()).thenReturn(_port);
 
-        _node = new RemoteReplicationNode(_replicationNode, _groupName, _virtualHost, _taskExecutor, _dbPing, _remoteReplicationAdmin);
+        _node = new RemoteReplicationNode(_replicationNode, _virtualHost, _taskExecutor, _replicatedEnvironmentFacade);
     }
 
     public void testGetAttribute() throws Exception
@@ -73,14 +79,28 @@ public class RemoteReplicationNodeTest e
         assertEquals("Unexpected join time", joinTime, _node.getAttribute(JOIN_TIME));
     }
 
-    @SuppressWarnings("unchecked")
     public void testSetRoleAttribute() throws Exception
     {
         updateNodeState();
         _node.setAttributes(Collections.<String, Object>singletonMap(ROLE, State.MASTER.name()));
-        when(_remoteReplicationAdmin.transferMaster(any(Set.class), any(int.class), any(TimeUnit.class), any(boolean.class))).thenReturn(_nodeName);
 
-        verify(_remoteReplicationAdmin).transferMaster(any(Set.class), any(int.class), any(TimeUnit.class), any(boolean.class));
+        verify(_replicatedEnvironmentFacade).transferMasterAsynchronously(_nodeName);
+    }
+
+    public void testSetRoleAttributeDisallowedIfAlreadyMaster() throws Exception
+    {
+        updateNodeState(State.MASTER, System.currentTimeMillis(), 0L);
+        try
+        {
+            _node.setAttributes(Collections.<String, Object>singletonMap(ROLE, State.MASTER.name()));
+            fail("Exception not thrown");
+        }
+        catch (IllegalConfigurationException ice)
+        {
+            // pass
+        }
+
+        verify(_replicatedEnvironmentFacade, never()).transferMasterAsynchronously(_nodeName);
     }
 
     public void testSetImmutableAttributesThrowException() throws Exception
@@ -127,7 +147,7 @@ public class RemoteReplicationNodeTest e
     private void updateNodeState(State state, long joinTime, long currentTxnEndVLSN) throws Exception
     {
         NodeState nodeState = new NodeState(_nodeName, _groupName, state, null, null, joinTime, currentTxnEndVLSN, 2, 1, 0, null, 0.0);
-        when(_dbPing.getNodeState()).thenReturn(nodeState);
+        when(_replicatedEnvironmentFacade.getRemoteNodeState(_replicationNode)).thenReturn(nodeState);
         _node.updateNodeState();
     }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1564815&r1=1564814&r2=1564815&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java Wed Feb  5 15:41:20 2014
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.model.ReplicationNode;
@@ -62,6 +63,8 @@ import com.sleepycat.je.rep.StateChangeL
 
 public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
 {
+    private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacadeTest.class);
+
     private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
     private static final int LISTENER_TIMEOUT = 5;
     private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
@@ -92,7 +95,7 @@ public class ReplicatedEnvironmentFacade
         _storePath = TestFileUtils.createTestDirectory("bdb", true);
 
         when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL)).thenReturn(100L);
-        when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT)).thenReturn(100L);
+        setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100");
     }
 
     @Override
@@ -181,6 +184,13 @@ public class ReplicatedEnvironmentFacade
         assertEquals("Unexpected group name", TEST_NODE_NAME, createMaster().getNodeName());
     }
 
+    public void testLastKnownReplicationTransactionId() throws Exception
+    {
+        ReplicatedEnvironmentFacade master = createMaster();
+        long lastKnownReplicationTransactionId = master.getLastKnownReplicationTransactionId();
+        assertTrue("Unexpected LastKnownReplicationTransactionId " + lastKnownReplicationTransactionId, lastKnownReplicationTransactionId > 0);
+    }
+
     public void testGetNodeHostPort() throws Exception
     {
         assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, createMaster().getHostPort());
@@ -418,6 +428,8 @@ public class ReplicatedEnvironmentFacade
 
     public void testEnvironmentRestartOnInsufficientReplicas() throws Exception
     {
+        long startTime = System.currentTimeMillis();
+
         ReplicatedEnvironmentFacade master = createMaster();
 
         int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
@@ -429,7 +441,9 @@ public class ReplicatedEnvironmentFacade
         String replica2NodeName = TEST_NODE_NAME + "_2";
         String replica2NodeHostPort = "localhost:" + replica2Port;
         ReplicatedEnvironmentFacade replica2 = addReplica(replica2NodeName, replica2NodeHostPort);
-
+        
+        long setUpTime = System.currentTimeMillis();
+        LOGGER.debug("XXX Start Up Time " + (setUpTime - startTime));
         String databaseName = "test";
 
         DatabaseConfig dbConfig = createDatabase(master, databaseName);
@@ -438,6 +452,8 @@ public class ReplicatedEnvironmentFacade
         replica1.close();
         replica2.close();
 
+        long closeTime = System.currentTimeMillis();
+        LOGGER.debug("XXX Env close  Time " + (closeTime - setUpTime));
         Environment e = master.getEnvironment();
         Database db = master.getOpenDatabase(databaseName);
         try
@@ -449,17 +465,22 @@ public class ReplicatedEnvironmentFacade
         {
             master.handleDatabaseException(null, ex);
         }
+        long openDatabaseTime = System.currentTimeMillis();
+        LOGGER.debug("XXX Open db Time " + (openDatabaseTime - closeTime ));
 
         replica1 = addReplica(replica1NodeName, replica1NodeHostPort);
         replica2 = addReplica(replica2NodeName, replica2NodeHostPort);
 
+        long reopenTime = System.currentTimeMillis();
+        LOGGER.debug("XXX Restart Time " + (reopenTime - openDatabaseTime ));
         // Need to poll to await the remote node updating itself
         long timeout = System.currentTimeMillis() + 5000;
         while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout)
         {
             Thread.sleep(200);
         }
-
+        long recoverTime = System.currentTimeMillis();
+        LOGGER.debug("XXX Recover Time " + (recoverTime - reopenTime));
         assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(),
                 State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) );
 
@@ -659,7 +680,7 @@ public class ReplicatedEnvironmentFacade
         Map<String, String> repConfig = new HashMap<String, String>();
         repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
         repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
-        when(node.getAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig);
+        when(node.getActualAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig);
 
         when(node.getAttribute(STORE_PATH)).thenReturn(new File(_storePath, nodeName).getAbsolutePath());
         return node;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1564815&r1=1564814&r2=1564815&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Wed Feb  5 15:41:20 2014
@@ -92,7 +92,6 @@ public interface VirtualHost extends Con
 
     String QUIESCE_ON_MASTER_CHANGE             = "quiesceOnMasterChange";
     String REMOTE_REPLICATION_NODE_MONITOR_INTERVAL = "remoteReplicationNodeMonitorInterval";
-    String REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT = "remoteReplicationNodeMonitorTimeout";
 
     // Attributes
     public static final Collection<String> AVAILABLE_ATTRIBUTES =
@@ -130,8 +129,7 @@ public interface VirtualHost extends Con
                             QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
                             CONFIG_PATH,
                             QUIESCE_ON_MASTER_CHANGE,
-                            REMOTE_REPLICATION_NODE_MONITOR_INTERVAL,
-                            REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT));
+                            REMOTE_REPLICATION_NODE_MONITOR_INTERVAL));
 
     int CURRENT_CONFIG_VERSION = 3;
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1564815&r1=1564814&r2=1564815&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Wed Feb  5 15:41:20 2014
@@ -99,18 +99,15 @@ public final class VirtualHostAdapter ex
         put(CONFIG_PATH, String.class);
         put(DESIRED_STATE, State.class);
         put(REMOTE_REPLICATION_NODE_MONITOR_INTERVAL, Long.class);
-        put(REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT, Long.class);
         put(QUIESCE_ON_MASTER_CHANGE, Boolean.class);
     }});
 
     private static final long DEFAULT_REMOTE_REPLICATION_NODE_MONITOR_INTERVAL = 10000L;
-    private static final long DEFAULT_REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT = 1000L;
 
     @SuppressWarnings("serial")
     static final Map<String, Object> DEFAULTS = new HashMap<String, Object>()
     {{
         put(REMOTE_REPLICATION_NODE_MONITOR_INTERVAL, DEFAULT_REMOTE_REPLICATION_NODE_MONITOR_INTERVAL);
-        put(REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT, DEFAULT_REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT);
         put(QUIESCE_ON_MASTER_CHANGE, false);
     }};
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org