You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/16 17:14:01 UTC

svn commit: r1558844 - in /qpid/branches/java-broker-bdb-ha/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ bdbstore/src/test/java/org/apache/qpid/se...

Author: kwall
Date: Thu Jan 16 16:14:00 2014
New Revision: 1558844

URL: http://svn.apache.org/r1558844
Log:
QPID-5409:  Add functionality to monitor for additions/removals from the group.  Add functionality
for RemoteReplicationNodes to montior themselves in order that their attributes reflect the state.

Also changed commit thread wrapper to abort any remaining commit futures on stop.

Removed:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/AbstractReplicationNode.java
Modified:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/replication/ReplicationGroupListener.java

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java?rev=1558844&r1=1558843&r2=1558844&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java Thu Jan 16 16:14:00 2014
@@ -46,12 +46,17 @@ public class CommitThreadWrapper
         _commitThread.start();
     }
 
-    public void stopCommitThread() throws InterruptedException
+    public void stopCommitThread(RuntimeException e) throws InterruptedException
     {
-        _commitThread.close();
+        _commitThread.close(e);
         _commitThread.join();
     }
 
+    public void stopCommitThread() throws InterruptedException
+    {
+        stopCommitThread(new RuntimeException("Stopping commit thread"));
+    }
+
     public StoreFuture commit(Transaction tx, boolean syncCommit)
     {
         BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
@@ -65,7 +70,7 @@ public class CommitThreadWrapper
 
         private final CommitThread _commitThread;
         private final Transaction _tx;
-        private DatabaseException _databaseException;
+        private RuntimeException _databaseException;
         private boolean _complete;
         private boolean _syncCommit;
 
@@ -87,7 +92,7 @@ public class CommitThreadWrapper
             notifyAll();
         }
 
-        public synchronized void abort(DatabaseException databaseException)
+        public synchronized void abort(RuntimeException databaseException)
         {
             _complete = true;
             _databaseException = databaseException;
@@ -269,7 +274,10 @@ public class CommitThreadWrapper
 
         public void addJob(BDBCommitFuture commit, final boolean sync)
         {
-
+            if (_stopped.get())
+            {
+                throw new IllegalStateException("Commit thread is stopped");
+            }
             _jobQueue.add(commit);
             if(sync)
             {
@@ -280,11 +288,16 @@ public class CommitThreadWrapper
             }
         }
 
-        public void close()
+        public void close(RuntimeException e)
         {
             synchronized (_lock)
             {
                 _stopped.set(true);
+                BDBCommitFuture commit = null;
+                while ((commit = _jobQueue.poll()) != null)
+                {
+                    commit.abort(e);
+                }
                 _lock.notifyAll();
             }
         }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java?rev=1558844&r1=1558843&r2=1558844&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java Thu Jan 16 16:14:00 2014
@@ -20,7 +20,14 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import static org.apache.qpid.server.model.ReplicationNode.*;
+import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC;
+import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY;
+import static org.apache.qpid.server.model.ReplicationNode.DURABILITY;
+import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME;
+import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.PARAMETERS;
+import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
 
 import java.io.File;
 import java.net.InetSocketAddress;
@@ -35,6 +42,9 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.log4j.Logger;
@@ -115,9 +125,6 @@ public class ReplicatedEnvironmentFacade
     public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
     public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
 
-    private volatile ReplicatedEnvironment _environment;
-    private CommitThreadWrapper _commitThreadWrapper;
-
     private final String _groupName;
     private final String _nodeName;
     private final String _nodeHostPort;
@@ -125,18 +132,21 @@ public class ReplicatedEnvironmentFacade
     private final Durability _durability;
     private final boolean _designatedPrimary;
     private final boolean _coalescingSync;
-    private volatile StateChangeListener _stateChangeListener;
     private final String _environmentPath;
     private final Map<String, String> _environmentParameters;
     private final Map<String, String> _replicationEnvironmentParameters;
     private final String _name;
-    private final ExecutorService _executor = Executors.newFixedThreadPool(1);
+    private final ExecutorService _restartEnvironmentExecutor = Executors.newFixedThreadPool(1);
+    private final ScheduledExecutorService _groupChangeExecutor;
     private final AtomicReference<State> _state = new AtomicReference<State>(State.INITIAL);
-
     private final ConcurrentMap<String, Database> _databases = new ConcurrentHashMap<String, Database>();
+    private final ConcurrentMap<String, org.apache.qpid.server.model.ReplicationNode> _remoteReplicationNodes = new ConcurrentHashMap<String, org.apache.qpid.server.model.ReplicationNode>();
+    private final RemoteReplicationNodeFactory _remoteReplicationNodeFactory;
 
+    private volatile CommitThreadWrapper _commitThreadWrapper;
+    private volatile StateChangeListener _stateChangeListener;
+    private volatile ReplicatedEnvironment _environment;
     private ReplicationGroupListener _replicationGroupListener;
-    private final RemoteReplicationNodeFactory _remoteReplicationNodeFactory;
     private long _joinTime;
     private String _lastKnownReplicationTransactionId;
 
@@ -157,10 +167,22 @@ public class ReplicatedEnvironmentFacade
         _environmentParameters = (Map<String, String>)replicationNode.getAttribute(PARAMETERS);
         _replicationEnvironmentParameters = (Map<String, String>)replicationNode.getAttribute(REPLICATION_PARAMETERS);
 
+        _groupChangeExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory()
+        {
+            @Override
+            public Thread newThread(Runnable r)
+            {
+                return new Thread(r, "GroupChangeLearner_" + _groupName);
+            }
+        });
         _remoteReplicationNodeFactory = remoteReplicationNodeFactory;
         _state.set(State.OPENING);
+        //TODO: add ability to alter the execution period
+        _groupChangeExecutor.scheduleWithFixedDelay(new GroupChangeLearner(), 1, 1, TimeUnit.SECONDS);
+
         _environment = createEnvironment();
-        startCommitThread(_name, _environment);
+        populateExistingRemoteReplicationNodes();
+        _commitThreadWrapper = startCommitThread(_name, _environment);
     }
 
     @Override
@@ -198,7 +220,8 @@ public class ReplicatedEnvironmentFacade
             try
             {
                 LOGGER.debug("Closing replicated environment facade");
-                _executor.shutdownNow();
+                _restartEnvironmentExecutor.shutdownNow();
+                _groupChangeExecutor.shutdownNow();
                 stopCommitThread();
                 closeDatabases();
                 closeEnvironment();
@@ -211,25 +234,25 @@ public class ReplicatedEnvironmentFacade
     }
 
     @Override
-    public AMQStoreException handleDatabaseException(String contextMessage, DatabaseException e)
+    public AMQStoreException handleDatabaseException(String contextMessage, final DatabaseException dbe)
     {
-        boolean restart = (e instanceof InsufficientReplicasException || e instanceof InsufficientReplicasException);
+        boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException);
         if (restart)
         {
             if (_state.compareAndSet(State.OPEN, State.RESTARTING))
             {
                 if (LOGGER.isDebugEnabled())
                 {
-                    LOGGER.debug("Environment restarting due to exception " + e.getMessage(), e);
+                    LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe);
                 }
-                _executor.execute(new Runnable()
+                _restartEnvironmentExecutor.execute(new Runnable()
                 {
                     @Override
                     public void run()
                     {
                         try
                         {
-                            restartEnvironment();
+                            restartEnvironment(dbe);
                         }
                         catch (Exception e)
                         {
@@ -244,7 +267,7 @@ public class ReplicatedEnvironmentFacade
                 LOGGER.info("Cannot restart environment because of facade state: " + _state.get());
             }
         }
-        return new AMQStoreException(contextMessage, e);
+        return new AMQStoreException(contextMessage, dbe);
     }
 
     @Override
@@ -411,7 +434,6 @@ public class ReplicatedEnvironmentFacade
         try
         {
             createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
-
         }
         catch (OperationFailureException ofe)
         {
@@ -475,7 +497,7 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    private void notifyExistingRemoteReplicationNodes(ReplicationGroupListener listener)
+    private void populateExistingRemoteReplicationNodes()
     {
         ReplicationGroup group = _environment.getGroup();
         Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes());
@@ -486,13 +508,19 @@ public class ReplicatedEnvironmentFacade
             String discoveredNodeName = replicationNode.getName();
             if (!discoveredNodeName.equals(localNodeName))
             {
-                // TODO remote replication nodes should be cached
-                RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(group.getName(),
-                                                                             replicationNode.getName(),
-                                                                             replicationNode.getHostName() + ":" + replicationNode.getPort());
-                listener.onReplicationNodeRecovered(remoteNode);
+                RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName());
+
+                _remoteReplicationNodes.put(replicationNode.getName(), remoteNode);
             }
         }
+     }
+
+    private void notifyExistingRemoteReplicationNodes(ReplicationGroupListener listener)
+    {
+        for (org.apache.qpid.server.model.ReplicationNode value : _remoteReplicationNodes.values())
+        {
+            listener.onReplicationNodeRecovered(value);
+        }
     }
 
     private ReplicationGroupAdmin createReplicationGroupAdmin()
@@ -525,12 +553,30 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    private void startCommitThread(String name, Environment environment)
+    private CommitThreadWrapper startCommitThread(String name, Environment environment)
     {
+        CommitThreadWrapper commitThreadWrapper = null;
         if (_coalescingSync)
         {
-            _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, environment);
-            _commitThreadWrapper.startCommitThread();
+            commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, environment);
+            commitThreadWrapper.startCommitThread();
+        }
+        return commitThreadWrapper;
+    }
+
+    private void stopCommitThread(RuntimeException dbe)
+    {
+        if (_coalescingSync)
+        {
+            try
+            {
+                _commitThreadWrapper.stopCommitThread(dbe);
+            }
+            catch (InterruptedException e)
+            {
+                LOGGER.warn("Stopping of commit thread is interrupted", e);
+                Thread.interrupted();
+            }
         }
     }
 
@@ -550,11 +596,11 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    private void restartEnvironment() throws AMQStoreException
+    private void restartEnvironment(DatabaseException dbe) throws AMQStoreException
     {
         LOGGER.info("Restarting environment");
 
-        stopCommitThread();
+        stopCommitThread(dbe);
 
         Set<String> databaseNames = new HashSet<String>(_databases.keySet());
         closeEnvironmentSafely();
@@ -566,7 +612,7 @@ public class ReplicatedEnvironmentFacade
         // TODO Alex and I think this should be removed.
         openDatabases(databaseNames.toArray(new String[databaseNames.size()]), dbConfig);
 
-        startCommitThread(_name, _environment);
+        _commitThreadWrapper = startCommitThread(_name, _environment);
 
         _environment.setStateChangeListener(this);
 
@@ -697,6 +743,72 @@ public class ReplicatedEnvironmentFacade
         return environment;
     }
 
+    private final class GroupChangeLearner implements Runnable
+    {
+        @Override
+        public void run()
+        {
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Checking for changes in the group " + _groupName);
+            }
+
+            ReplicatedEnvironment env = _environment;
+            ReplicationGroupListener replicationGroupListener = _replicationGroupListener;
+            if (env != null && env.isValid())
+            {
+                ReplicationGroup group = env.getGroup();
+                Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes());
+                String localNodeName = getNodeName();
+
+                Map<String, org.apache.qpid.server.model.ReplicationNode> removalMap = new HashMap<String, org.apache.qpid.server.model.ReplicationNode>(_remoteReplicationNodes);
+                for (ReplicationNode replicationNode : nodes)
+                {
+                    String discoveredNodeName = replicationNode.getName();
+                    if (!discoveredNodeName.equals(localNodeName))
+                    {
+                        if (!_remoteReplicationNodes.containsKey(discoveredNodeName))
+                        {
+                            if (LOGGER.isDebugEnabled())
+                            {
+                                LOGGER.debug("Remote replication node added '" + replicationNode + "' to '" + _groupName + "'");
+                            }
+
+                            RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName());
+
+                            _remoteReplicationNodes.put(discoveredNodeName, remoteNode);
+                            if (replicationGroupListener != null)
+                            {
+                                replicationGroupListener.onReplicationNodeAddedToGroup(remoteNode);
+                            }
+                        }
+                        else
+                        {
+                            removalMap.remove(discoveredNodeName);
+                        }
+                    }
+                }
+
+                if (!removalMap.isEmpty())
+                {
+                    for (Map.Entry<String, org.apache.qpid.server.model.ReplicationNode> replicationNodeEntry : removalMap.entrySet())
+                    {
+                        String replicationNodeName = replicationNodeEntry.getKey();
+                        if (LOGGER.isDebugEnabled())
+                        {
+                            LOGGER.debug("Remote replication node removed '" + replicationNodeName + "' from '" + _groupName + "'");
+                        }
+                        _remoteReplicationNodes.remove(replicationNodeName);
+                        if (replicationGroupListener != null)
+                        {
+                            replicationGroupListener.onReplicationNodeRemovedFromGroup(replicationNodeEntry.getValue());
+                        }
+                    }
+                }
+            }
+        }
+    }
+
     private class LoggingAsyncExceptionListener implements ExceptionListener
     {
         @Override

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java?rev=1558844&r1=1558843&r2=1558844&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java Thu Jan 16 16:14:00 2014
@@ -21,9 +21,13 @@
 package org.apache.qpid.server.store.berkeleydb;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.store.berkeleydb.replication.LocalReplicationNode;
 import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
@@ -61,7 +65,7 @@ public class ReplicatedEnvironmentFacade
         return facade;
     }
 
-    private static class RemoteReplicationNodeFactoryImpl implements RemoteReplicationNodeFactory
+    static class RemoteReplicationNodeFactoryImpl implements RemoteReplicationNodeFactory
     {
         private VirtualHost _virtualHost;
 
@@ -71,9 +75,13 @@ public class ReplicatedEnvironmentFacade
         }
 
         @Override
-        public RemoteReplicationNode create(String groupName, String nodeName, String hostPort)
+        public RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName)
         {
-            return new RemoteReplicationNode(groupName, nodeName, hostPort, _virtualHost);
+            Map<String, Object> attributes = new HashMap<String, Object>();
+            attributes.put(ReplicationNode.NAME, replicationNode.getName());
+            attributes.put(ReplicationNode.GROUP_NAME, groupName);
+            attributes.put(ReplicationNode.HOST_PORT, replicationNode.getHostName() + ":" + replicationNode.getPort());
+            return new RemoteReplicationNode(replicationNode, groupName, _virtualHost, _virtualHost.getTaskExecutor());
         }
     }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java?rev=1558844&r1=1558843&r2=1558844&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java Thu Jan 16 16:14:00 2014
@@ -1,16 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.store.berkeleydb.replication;
 
+import java.io.IOException;
+import java.security.AccessControlException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.adapter.AbstractAdapter;
+import org.apache.qpid.server.model.adapter.NoStatistics;
+
+import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.util.DbPing;
+import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException;
 
 /**
  * Represents a remote replication node in a BDB group.
  */
-public class RemoteReplicationNode extends AbstractReplicationNode
+public class RemoteReplicationNode extends AbstractAdapter implements ReplicationNode, Runnable
 {
+    private static final Logger LOGGER = Logger.getLogger(RemoteReplicationNode.class);
+
+    //TODO: add attributes for setting the intervals below
+    private static final int DEFAULT_SOCKET_TIMEOUT = 10000;
+    private static final long DEFAULT_STATE_UPDATE_INTERVAL = 1000; //TODO: set it to bigger value
+
+    // TODO: needs to be shared between all remote nodes
+    private final ScheduledExecutorService _updateStateExecutor;
+    private final com.sleepycat.je.rep.ReplicationNode _replicationNode;
+    private final String _hostPort;
+    private final String _groupName;
 
-    public RemoteReplicationNode(String groupName, String nodeName, String hostPort, VirtualHost virtualHost)
+    private volatile String _role;
+    private volatile long _joinTime;
+    private volatile long _lastTransactionId;
+
+    public RemoteReplicationNode(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName, VirtualHost virtualHost, TaskExecutor taskExecutor)
     {
-        super(groupName, nodeName, hostPort, virtualHost);
+        super(UUIDGenerator.generateReplicationNodeId(groupName, replicationNode.getName()), null, null, taskExecutor);
+        addParent(VirtualHost.class, virtualHost);
+        _groupName = groupName;
+        _hostPort = replicationNode.getHostName() + ":" + replicationNode.getPort();
+        _replicationNode = replicationNode;
+        _updateStateExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory()
+        {
+            @Override
+            public Thread newThread(Runnable r)
+            {
+                return new Thread(r, "Remote node state updater " + getName() + "-" + getAttribute(GROUP_NAME));
+            }
+        });
+
+        //TODO: add attribute for update interval
+        long stateUpdateInterval = DEFAULT_STATE_UPDATE_INTERVAL;
+        _updateStateExecutor.schedule(this, stateUpdateInterval, TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -19,4 +93,163 @@ public class RemoteReplicationNode exten
         return false;
     }
 
+    @Override
+    public String getName()
+    {
+        return (String)getAttribute(NAME);
+    }
+
+    @Override
+    public String setName(String currentName, String desiredName) throws IllegalStateException, AccessControlException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public State getActualState()
+    {
+        return State.UNAVAILABLE;
+    }
+
+    @Override
+    public boolean isDurable()
+    {
+        return true;
+    }
+
+    @Override
+    public void setDurable(boolean durable) throws IllegalStateException, AccessControlException, IllegalArgumentException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public LifetimePolicy getLifetimePolicy()
+    {
+        return LifetimePolicy.PERMANENT;
+    }
+
+    @Override
+    public LifetimePolicy setLifetimePolicy(LifetimePolicy expected, LifetimePolicy desired) throws IllegalStateException,
+            AccessControlException, IllegalArgumentException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getTimeToLive()
+    {
+        return 0;
+    }
+
+    @Override
+    public long setTimeToLive(long expected, long desired) throws IllegalStateException, AccessControlException,
+            IllegalArgumentException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Statistics getStatistics()
+    {
+        return NoStatistics.getInstance();
+    }
+
+    @Override
+    public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
+    {
+        return Collections.emptySet();
+    }
+
+    @Override
+    protected boolean setState(State currentState, State desiredState)
+    {
+        if (desiredState == State.STOPPED)
+        {
+            _updateStateExecutor.shutdown();
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+
+    @Override
+    public Object getAttribute(String name)
+    {
+        if (ROLE.equals(name))
+        {
+            return _role;
+        }
+        else if (JOIN_TIME.equals(name))
+        {
+            return _joinTime;
+        }
+        else if (LAST_KNOWN_REPLICATION_TRANSACTION_ID.equals(name))
+        {
+            return _lastTransactionId;
+        }
+        else if (NAME.equals(name))
+        {
+            return _replicationNode.getName();
+        }
+        else if (GROUP_NAME.equals(name))
+        {
+            return _groupName;
+        }
+        else if (HOST_PORT.equals(name))
+        {
+            return _hostPort;
+        }
+        return super.getAttribute(name);
+    }
+
+    private void updateNodeState()
+    {
+        DbPing ping = new DbPing(_replicationNode, _groupName, DEFAULT_SOCKET_TIMEOUT);
+        String oldRole = _role;
+        long oldJoinTime = _joinTime;
+        long oldTransactionId = _lastTransactionId;
+
+        try
+        {
+            NodeState state = ping.getNodeState();
+            _role = state.getNodeState().name();
+            _joinTime = state.getJoinTime();
+            _lastTransactionId = state.getCurrentTxnEndVLSN();
+        }
+        catch (IOException e)
+        {
+            _role = com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN.name();
+            LOGGER.warn("Cannot connect to node " + _replicationNode.getName() + " from " + _groupName, e);
+        }
+        catch (ServiceConnectFailedException e)
+        {
+            _role = com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN.name();
+            LOGGER.warn("Cannot retrieve the node details for node " + _replicationNode.getName() + " from " + _groupName, e);
+        }
+
+        if (!_role.equals(oldRole))
+        {
+            attributeSet(ROLE, oldRole, _role);
+        }
+
+        if (_joinTime != oldJoinTime)
+        {
+            attributeSet(JOIN_TIME, oldJoinTime, _joinTime);
+        }
+
+        if (_lastTransactionId != oldTransactionId)
+        {
+            attributeSet(LAST_KNOWN_REPLICATION_TRANSACTION_ID, oldTransactionId, _lastTransactionId);
+        }
+    }
+
+    @Override
+    public void run()
+    {
+        updateNodeState();
+    }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java?rev=1558844&r1=1558843&r2=1558844&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java Thu Jan 16 16:14:00 2014
@@ -22,5 +22,5 @@ package org.apache.qpid.server.store.ber
 
 public interface RemoteReplicationNodeFactory
 {
-    RemoteReplicationNode create(String groupName, String nodeName, String hostPort);
+    RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode jeNode, String groupName);
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java?rev=1558844&r1=1558843&r2=1558844&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java Thu Jan 16 16:14:00 2014
@@ -47,9 +47,11 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.replication.ReplicationGroupListener;
 import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
 import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory;
@@ -71,6 +73,25 @@ import com.sleepycat.je.rep.StateChangeL
 
 public class ReplicatedEnvironmentFacadeTest extends EnvironmentFacadeTestCase
 {
+
+    private static class NoopReplicationGroupListener implements ReplicationGroupListener
+    {
+        @Override
+        public void onReplicationNodeRecovered(ReplicationNode node)
+        {
+        }
+
+        @Override
+        public void onReplicationNodeAddedToGroup(ReplicationNode node)
+        {
+        }
+
+        @Override
+        public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+        {
+        }
+    }
+
     private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
     private static final TimeUnit WAIT_STATE_CHANGE_TIME_UNIT = TimeUnit.SECONDS;
     private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
@@ -82,7 +103,8 @@ public class ReplicatedEnvironmentFacade
     private static final boolean TEST_DESIGNATED_PRIMARY = true;
     private static final boolean TEST_COALESCING_SYNC = true;
     private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
-    private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = mock(RemoteReplicationNodeFactory.class);
+    private VirtualHost _virtualHost = mock(VirtualHost.class);
+    private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = new ReplicatedEnvironmentFacadeFactory.RemoteReplicationNodeFactoryImpl(_virtualHost);
 
     public void setUp() throws Exception
     {
@@ -162,20 +184,144 @@ public class ReplicatedEnvironmentFacade
 
     public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception
     {
-        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getEnvironmentFacade();
+        getEnvironmentFacade();
         String nodeName2 = TEST_NODE_NAME + "_2";
         String host = "localhost";
         int port = getNextAvailable(TEST_NODE_PORT + 1);
         String node2NodeHostPort = host + ":" + port;
-        joinReplica(nodeName2, node2NodeHostPort);
+        ReplicatedEnvironmentFacade replicatedEnvironmentFacade2 = joinReplica(nodeName2, node2NodeHostPort);
 
-        List<Map<String, String>> groupMembers = replicatedEnvironmentFacade.getGroupMembers();
-        assertEquals("Unexpected number of nodes at start of test", 2, groupMembers.size());
+        List<Map<String, String>> groupMembers = replicatedEnvironmentFacade2.getGroupMembers();
+        assertEquals("Unexpected number of nodes", 2, groupMembers.size());
 
         ReplicationGroupListener listener = mock(ReplicationGroupListener.class);
-        replicatedEnvironmentFacade.setReplicationGroupListener(listener);
+        replicatedEnvironmentFacade2.setReplicationGroupListener(listener);
         verify(listener).onReplicationNodeRecovered(any(RemoteReplicationNode.class));
-        verify(_remoteReplicationNodeFactory).create(TEST_GROUP_NAME, nodeName2, node2NodeHostPort);
+    }
+
+    public void testReplicationGroupListenerHearsNodeAdded() throws Exception
+    {
+        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getEnvironmentFacade();
+
+        List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
+        assertEquals("Unexpected number of nodes at start of test", 1, initialGroupMembers.size());
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicInteger invocationCount = new AtomicInteger();
+        ReplicationGroupListener listener = new NoopReplicationGroupListener()
+        {
+            @Override
+            public void onReplicationNodeAddedToGroup(ReplicationNode node)
+            {
+                invocationCount.getAndIncrement();
+                latch.countDown();
+            }
+        };
+        replicatedEnvironmentFacade.setReplicationGroupListener(listener);
+
+        String node2Name = TEST_NODE_NAME + "_2";
+        String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
+        joinReplica(node2Name, node2NodeHostPort);
+
+        assertTrue("Listener not fired within timeout", latch.await(5, TimeUnit.SECONDS));
+
+        List<Map<String, String>> groupMembers = replicatedEnvironmentFacade.getGroupMembers();
+        assertEquals("Unexpected number of nodes", 2, groupMembers.size());
+
+        assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
+    }
+
+    public void testReplicationGroupListenerHearsNodeRemoved() throws Exception
+    {
+        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getEnvironmentFacade();
+        String node2Name = TEST_NODE_NAME + "_2";
+        String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
+        joinReplica(node2Name, node2NodeHostPort);
+
+        List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
+        assertEquals("Unexpected number of nodes at start of test", 2, initialGroupMembers.size());
+
+        final CountDownLatch nodeDeletedLatch = new CountDownLatch(1);
+        final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
+        final AtomicInteger invocationCount = new AtomicInteger();
+        ReplicationGroupListener listener = new NoopReplicationGroupListener()
+        {
+            @Override
+            public void onReplicationNodeRecovered(ReplicationNode node)
+            {
+                nodeAddedLatch.countDown();
+            }
+
+            @Override
+            public void onReplicationNodeAddedToGroup(ReplicationNode node)
+            {
+                nodeAddedLatch.countDown();
+            }
+
+            @Override
+            public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+            {
+                invocationCount.getAndIncrement();
+                nodeDeletedLatch.countDown();
+            }
+        };
+
+        replicatedEnvironmentFacade.setReplicationGroupListener(listener);
+
+        // Need to await the listener hearing the addition of the node to the model.
+        assertTrue("Node add not fired within timeout", nodeAddedLatch.await(5, TimeUnit.SECONDS));
+
+        // Now remove the node and ensure we hear the event
+        replicatedEnvironmentFacade.removeNodeFromGroup(node2Name);
+
+        assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(5, TimeUnit.SECONDS));
+
+        List<Map<String, String>> groupMembers = replicatedEnvironmentFacade.getGroupMembers();
+        assertEquals("Unexpected number of nodes after node removal", 1, groupMembers.size());
+
+        assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
+    }
+
+    public void testMasterHearsRemoteNodeRoles() throws Exception
+    {
+
+        final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
+        final AtomicReference<ReplicationNode> nodeRef = new AtomicReference<ReplicationNode>();
+        ReplicationGroupListener listener = new NoopReplicationGroupListener()
+        {
+            @Override
+            public void onReplicationNodeAddedToGroup(ReplicationNode node)
+            {
+                nodeRef.set(node);
+                nodeAddedLatch.countDown();
+            }
+        };
+
+        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getEnvironmentFacade();
+        replicatedEnvironmentFacade.setReplicationGroupListener(listener);
+
+        String node2Name = TEST_NODE_NAME + "_2";
+        String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
+        joinReplica(node2Name, node2NodeHostPort);
+
+        List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
+        assertEquals("Unexpected number of nodes at start of test", 2, initialGroupMembers.size());
+
+        assertTrue("Node add not fired within timeout", nodeAddedLatch.await(5, TimeUnit.SECONDS));
+
+        RemoteReplicationNode remoteNode = (RemoteReplicationNode)nodeRef.get();
+        assertEquals("Unexpcted node name", node2Name, remoteNode.getName());
+
+        // Need to poll to await the remote node updating itself
+        long timeout = System.currentTimeMillis() + 5000;
+        while(!State.REPLICA.name().equals(remoteNode.getAttribute(ReplicationNode.ROLE)) && System.currentTimeMillis() < timeout)
+        {
+            Thread.sleep(200);
+        }
+
+        assertEquals("Unexpcted node role (after waiting)", State.REPLICA.name(), remoteNode.getAttribute(ReplicationNode.ROLE));
+        assertNotNull("Replica node " + ReplicationNode.JOIN_TIME + " attribute is not set", remoteNode.getAttribute(ReplicationNode.JOIN_TIME));
+        assertNotNull("Replica node " + ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID + " attribute is not set", remoteNode.getAttribute(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID));
     }
 
     public void testRemoveNodeFromGroup() throws Exception

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1558844&r1=1558843&r2=1558844&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Thu Jan 16 16:14:00 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.model;
 
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.MessageStore;
@@ -172,4 +173,6 @@ public interface VirtualHost extends Con
     MessageStore getMessageStore();
 
     String getType();
+
+    TaskExecutor getTaskExecutor();
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1558844&r1=1558843&r2=1558844&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Thu Jan 16 16:14:00 2014
@@ -116,9 +116,12 @@ public final class VirtualHostAdapter ex
 
     private final List<ReplicationNode> _replicationNodes = new ArrayList<ReplicationNode>();
 
+    private final TaskExecutor _taskExecutor;
+
     public VirtualHostAdapter(UUID id, Map<String, Object> attributes, Broker broker, StatisticsGatherer brokerStatisticsGatherer, TaskExecutor taskExecutor)
     {
         super(id, null, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES, false), taskExecutor, false);
+        _taskExecutor = taskExecutor;
         _broker = broker;
         _brokerStatisticsGatherer = brokerStatisticsGatherer;
         addParent(Broker.class, broker);
@@ -1260,6 +1263,12 @@ public final class VirtualHostAdapter ex
     }
 
     @Override
+    public TaskExecutor getTaskExecutor()
+    {
+        return _taskExecutor;
+    }
+
+    @Override
     protected void changeAttributes(Map<String, Object> attributes)
     {
         // TODO: a hack to change a virtual host state only
@@ -1322,6 +1331,18 @@ public final class VirtualHostAdapter ex
         _replicationNodes.add(node);
     }
 
+    @Override
+    public void onReplicationNodeAddedToGroup(ReplicationNode node)
+    {
+        _replicationNodes.add(node);
+    }
+
+    @Override
+    public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+    {
+        _replicationNodes.remove(node);
+    }
+
     public void recoverChild(ConfiguredObject configuredObject)
     {
         if (configuredObject instanceof ReplicationNode)

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/replication/ReplicationGroupListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/replication/ReplicationGroupListener.java?rev=1558844&r1=1558843&r2=1558844&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/replication/ReplicationGroupListener.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/replication/ReplicationGroupListener.java Thu Jan 16 16:14:00 2014
@@ -28,7 +28,7 @@ public interface ReplicationGroupListene
      * Fired when a remote replication node is added to a group.  This event happens
      * exactly once just after a new replication node is created.
      */
-    //void onReplicationNodeAddedToGroup(ReplicationNode node);
+    void onReplicationNodeAddedToGroup(ReplicationNode node);
 
     /**
      * Fired exactly once for each existing remote node.  Used to inform the application
@@ -40,7 +40,7 @@ public interface ReplicationGroupListene
      * Fired when a remote replication node is (permanently) removed from group.  This event
      * happens exactly once just after the existing replication node is deleted.
      */
-    //void onReplicationNodeRemovedFromGroup(ReplicationNode node);
+    void onReplicationNodeRemovedFromGroup(ReplicationNode node);
 
     /**
      * Fired when a remote replication node (that is already a member of the group) joins



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org