You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/03/14 17:10:27 UTC

svn commit: r1786930 - in /qpid/java/branches/6.1.x: ./ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/

Author: kwall
Date: Tue Mar 14 17:10:27 2017
New Revision: 1786930

URL: http://svn.apache.org/viewvc?rev=1786930&view=rev
Log:
QPID-7695: [HA] Prevent config thread hang if existing node is unresponsive.
Merged from trunk with command:

svn merge -c 1786342  ^/qpid/java/trunk

Modified:
    qpid/java/branches/6.1.x/   (props changed)
    qpid/java/branches/6.1.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    qpid/java/branches/6.1.x/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java

Propchange: qpid/java/branches/6.1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 14 17:10:27 2017
@@ -9,5 +9,5 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1766544,1766547,1766553,1766666,1766796-1766797,1766806,1767251,1767267-1767268,1767275,1767310,1767326,1767329,1767332,1767514,1767523,1767738,1767825,1767847-1767849,1767882,1767909,1767914,1768016-1768017,1768065,1768643,1768704,1768854,1768875,1768914,1768963,1768967,1768976,1769007,1769009,1769087,1769138-1769139,1769597,1769879,1770236,1770716,1772050,1772241,1772365,1772574,1773057,1774039,1774446,1774564,1774885,1775087,1775100,1777939,1780947,1782302,1782735,1785117,1785158,1785269-1785270,1785311,1785675,1785679,1785854,1785936,1785950,1786188-1786189,1786657,1786690
+/qpid/java/trunk:1766544,1766547,1766553,1766666,1766796-1766797,1766806,1767251,1767267-1767268,1767275,1767310,1767326,1767329,1767332,1767514,1767523,1767738,1767825,1767847-1767849,1767882,1767909,1767914,1768016-1768017,1768065,1768643,1768704,1768854,1768875,1768914,1768963,1768967,1768976,1769007,1769009,1769087,1769138-1769139,1769597,1769879,1770236,1770716,1772050,1772241,1772365,1772574,1773057,1774039,1774446,1774564,1774885,1775087,1775100,1777939,1780947,1782302,1782735,1785117,1785158,1785269-1785270,1785311,1785675,1785679,1785854,1785936,1785950,1786188-1786189,1786342,1786657,1786690
 /qpid/trunk/qpid:796646-796653

Modified: qpid/java/branches/6.1.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1786930&r1=1786929&r2=1786930&view=diff
==============================================================================
--- qpid/java/branches/6.1.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/java/branches/6.1.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Tue Mar 14 17:10:27 2017
@@ -210,7 +210,7 @@ public class ReplicatedEnvironmentFacade
     private final ListeningExecutorService _stateChangeExecutor;
 
     /**
-     * Executor used to learn about changes in the group.  Number of threads in the pool is maintained dynammically
+     * Executor used to learn about changes in the group.  Number of threads in the pool is maintained dynamically
      * to be number of nodes in the group + 1.   This gives us sufficient threads to 'ping' all the remote nodes in the
      * group (in parallel), and a thread for the coordination of the pings.  We also use the executor for the
      * transfer master operation.
@@ -1828,7 +1828,84 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    public static Collection<String> connectToHelperNodeAndCheckPermittedHosts(String nodeName, String hostPort, String groupName, String helperNodeName, String helperHostPort, int dbPingSocketTimeout)
+    public static Collection<String> connectToHelperNodeAndCheckPermittedHosts(final String nodeName, final String hostPort, final String groupName, final String helperNodeName, final String helperHostPort, final int dbPingSocketTimeout)
+    {
+        ExecutorService executor = null;
+        Future<Collection<String>> future = null;
+        try
+        {
+            executor = Executors.newSingleThreadExecutor(new DaemonThreadFactory(String.format(
+                            "PermittedHostsCheck-%s-%s",
+                            groupName,
+                            nodeName)));
+            future = executor.submit(new Callable<Collection<String>>()
+            {
+                @Override
+                public Collection<String> call() throws Exception
+                {
+                    return getPermittedHostsFromHelper(nodeName,
+                                                       groupName,
+                                                       helperNodeName,
+                                                       helperHostPort,
+                                                       dbPingSocketTimeout);
+                }
+            });
+
+            try
+            {
+                final long timeout = (long) (dbPingSocketTimeout * 1.25);
+                final Collection<String> permittedNodes =
+                        dbPingSocketTimeout <= 0 ? future.get() : future.get(timeout, TimeUnit.MILLISECONDS);
+
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug(String.format("Node '%s' permits nodes: '%s'", helperNodeName, String.valueOf(permittedNodes)));
+                }
+
+                if (permittedNodes == null || !permittedNodes.contains(hostPort))
+                {
+                    throw new IllegalConfigurationException(String.format("Node using address '%s' is not permitted to join the group '%s'",
+                                                                          hostPort, groupName));
+                }
+
+                return permittedNodes;
+            }
+            catch (ExecutionException e)
+            {
+                final Throwable cause = e.getCause();
+                if (cause instanceof RuntimeException)
+                {
+                    throw (RuntimeException) cause;
+                }
+                else
+                {
+                    throw new RuntimeException(cause);
+                }
+            }
+            catch (TimeoutException e)
+            {
+                future.cancel(true);
+                throw new ExternalServiceTimeoutException(String.format(
+                        "Task timed out trying to connect to existing node '%s' at '%s'", nodeName, hostPort));
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+                throw new ExternalServiceException(String.format(
+                        "Task failed to connect to existing node '%s' at '%s'", nodeName, hostPort));
+            }
+        }
+        finally
+        {
+            executor.shutdown();
+        }
+    }
+
+    private static Collection<String> getPermittedHostsFromHelper(final String nodeName,
+                                                                  final String groupName,
+                                                                  final String helperNodeName,
+                                                                  final String helperHostPort,
+                                                                  final int dbPingSocketTimeout)
     {
         if (LOGGER.isDebugEnabled())
         {
@@ -1838,7 +1915,7 @@ public class ReplicatedEnvironmentFacade
         if (helperNodeName == null || "".equals(helperNodeName))
         {
             throw new IllegalConfigurationException(String.format("A helper node is not specified for node '%s'"
-                    + " joining the group '%s'", nodeName, groupName));
+                                                                  + " joining the group '%s'", nodeName, groupName));
         }
 
         Collection<String> permittedNodes = null;
@@ -1847,12 +1924,12 @@ public class ReplicatedEnvironmentFacade
             ReplicationNodeImpl node = new ReplicationNodeImpl(helperNodeName, helperHostPort);
             NodeState state = getRemoteNodeState(groupName, node, dbPingSocketTimeout);
             byte[] applicationState = state.getAppState();
-            permittedNodes = convertApplicationStateBytesToPermittedNodeList(applicationState);
+            return convertApplicationStateBytesToPermittedNodeList(applicationState);
         }
         catch (SocketTimeoutException ste)
         {
             throw new ExternalServiceTimeoutException(String.format("Timed out trying to connect to existing node '%s' at '%s'",
-                                    helperNodeName, helperHostPort), ste);
+                                                                    helperNodeName, helperHostPort), ste);
         }
         catch (IOException | ServiceConnectFailedException e)
         {
@@ -1863,7 +1940,7 @@ public class ReplicatedEnvironmentFacade
         {
             String message = String.format("Unexpected protocol exception '%s' encountered while retrieving state for node '%s' (%s) from group '%s'",
                                           e.getUnexpectedMessage(), helperNodeName, helperHostPort, groupName);
-            LOGGER.warn(message,  e);
+            LOGGER.warn(message, e);
             throw new ExternalServiceException(message, e) ;
         }
         catch (RuntimeException e)
@@ -1871,18 +1948,6 @@ public class ReplicatedEnvironmentFacade
             throw new ExternalServiceException(String.format("Cannot retrieve state for node '%s' (%s) from group '%s'",
                     helperNodeName, helperHostPort, groupName), e);
         }
-
-        if (LOGGER.isDebugEnabled())
-        {
-            LOGGER.debug(String.format("Attribute 'permittedNodes' on node '%s' is set to '%s'", helperNodeName, String.valueOf(permittedNodes)));
-        }
-
-        if (permittedNodes==null || !permittedNodes.contains(hostPort))
-        {
-            throw new IllegalConfigurationException(String.format("Node from '%s' is not permitted!", hostPort));
-        }
-
-        return permittedNodes;
     }
 
     private void registerAppStateMonitorIfPermittedNodesSpecified(final Set<String> permittedNodes)

Modified: qpid/java/branches/6.1.x/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java?rev=1786930&r1=1786929&r2=1786930&view=diff
==============================================================================
--- qpid/java/branches/6.1.x/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java (original)
+++ qpid/java/branches/6.1.x/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java Tue Mar 14 17:10:27 2017
@@ -444,7 +444,7 @@ public class BDBHAVirtualHostNodeTest ex
         }
         catch(IllegalConfigurationException e)
         {
-            assertEquals("Unexpected exception message", String.format("Node from '%s' is not permitted!", "localhost:" + node3PortNumber), e.getMessage());
+            assertEquals("Unexpected exception message", String.format("Node using address '%s' is not permitted to join the group 'group'", "localhost:" + node3PortNumber, groupName), e.getMessage());
         }
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org