You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/02/28 17:12:40 UTC

[activemq-artemis] branch master updated: ARTEMIS-2637 Making UDP client discovery resilient

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cac669  ARTEMIS-2637 Making UDP client discovery resilient
     new ffb8df7  This closes #2994
0cac669 is described below

commit 0cac669840d22ed6b9b89b5bdecc91f0c9d53126
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Feb 27 17:59:39 2020 -0500

    ARTEMIS-2637 Making UDP client discovery resilient
    
    In case there is a hardware, firewal or any other thing making the UDP connection to go deaf
    we will now reopen the connection in an attempt to go over possible issues.
    
    This is also improving locking around DiscoveryGroup initial connection.
---
 .../artemis/core/client/ActiveMQClientLogger.java  |   5 +
 .../core/client/impl/ServerLocatorImpl.java        | 139 +++++++++++++++++----
 .../artemis/core/cluster/DiscoveryGroup.java       |  60 ++++++++-
 .../NettyHAClientTopologyWithDiscoveryTest.java    | 135 ++++++++++++++++++++
 4 files changed, 311 insertions(+), 28 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
index 700b863..9eebed8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
@@ -420,6 +420,11 @@ public interface ActiveMQClientLogger extends BasicLogger {
            format = Message.Format.MESSAGE_FORMAT)
    void unableToCheckEpollAvailabilitynoClass();
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 212077, value = "Timed out waiting to receive initial broadcast from cluster. Retry {0} of {1}",
+           format = Message.Format.MESSAGE_FORMAT)
+   void broadcastTimeout(int retry, int maxretry);
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT)
    void onMessageError(@Cause Throwable e);
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index d8bccd2..f06361a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
 import org.apache.activemq.artemis.api.core.Interceptor;
@@ -63,10 +64,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
 import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.remoting.Connector;
 import org.apache.activemq.artemis.uri.ServerLocatorParser;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
-import org.apache.activemq.artemis.utils.ClassloadingUtil;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.activemq.artemis.utils.*;
 import org.apache.activemq.artemis.utils.actors.Actor;
 import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
 import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores;
@@ -122,6 +120,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
 
    private boolean compressLargeMessage;
 
+   /** This specifies serverLocator.connect was used,
+    *  which means it's a cluster connection.
+    *  We should not use retries */
+   private volatile transient boolean disableDiscoveryRetries = false;
+
    // if the system should shutdown the pool when shutting down
    private transient boolean shutdownPool;
 
@@ -133,6 +136,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
 
    private transient ConnectionLoadBalancingPolicy loadBalancingPolicy;
 
+   private final Object discoveryGroupGuardian = new Object();
+
    // Settable attributes:
 
    private boolean cacheLargeMessagesClient;
@@ -211,6 +216,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
 
    private boolean useTopologyForLoadBalancing;
 
+   /** For tests only */
+   public DiscoveryGroup getDiscoveryGroup() {
+      return discoveryGroup;
+   }
+
    private final Exception traceException = new Exception();
 
    public static synchronized void clearThreadPools() {
@@ -230,7 +240,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
          ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
             @Override
             public ThreadFactory run() {
-               return new ActiveMQThreadFactory("ActiveMQ-client-factory-threads-" + System.identityHashCode(this), true, ClientSessionFactoryImpl.class.getClassLoader());
+               return new ActiveMQThreadFactory("ActiveMQ-client-factory-threads-" + System.identityHashCode(this), true, ServerLocatorImpl.class.getClassLoader());
             }
          });
 
@@ -299,13 +309,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
 
             instantiateLoadBalancingPolicy();
 
-            if (discoveryGroupConfiguration != null) {
-               discoveryGroup = createDiscoveryGroup(nodeID, discoveryGroupConfiguration);
+            startDiscovery();
 
-               discoveryGroup.registerListener(this);
-
-               discoveryGroup.start();
-            }
          } catch (Exception e) {
             state = null;
             throw ActiveMQClientMessageBundle.BUNDLE.failedToInitialiseSessionFactory(e);
@@ -313,6 +318,20 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       }
    }
 
+   private void startDiscovery() throws ActiveMQException {
+      if (discoveryGroupConfiguration != null) {
+         try {
+            discoveryGroup = createDiscoveryGroup(nodeID, discoveryGroupConfiguration);
+
+            discoveryGroup.registerListener(this);
+
+            discoveryGroup.start();
+         } catch (Exception e) {
+            throw new ActiveMQInternalErrorException(e.getMessage(), e);
+         }
+      }
+   }
+
    private static DiscoveryGroup createDiscoveryGroup(String nodeID,
                                                       DiscoveryGroupConfiguration config) throws Exception {
       return new DiscoveryGroup(nodeID, config.getName(), config.getRefreshTimeout(), config.getBroadcastEndpointFactory(), null);
@@ -633,6 +652,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
    }
 
    private ClientSessionFactoryInternal connect(final boolean skipWarnings) throws ActiveMQException {
+      // if we used connect, we should control UDP reconnections at a different path.
+      // and this belongs to a cluster connection, not client
+      disableDiscoveryRetries = true;
       ClientSessionFactoryInternal returnFactory = null;
 
       synchronized (this) {
@@ -752,14 +774,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
 
       flushTopology();
 
-      if (this.getNumInitialConnectors() == 0 && discoveryGroup != null) {
-         // Wait for an initial broadcast to give us at least one node in the cluster
-         long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
-         boolean ok = discoveryGroup.waitForBroadcast(timeout);
-
-         if (!ok) {
-            throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutInInitialBroadcast();
-         }
+      if (discoveryGroupConfiguration != null) {
+         executeDiscovery();
       }
 
       ClientSessionFactoryInternal factory = null;
@@ -826,6 +842,77 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return factory;
    }
 
+   private void executeDiscovery() throws ActiveMQException {
+      boolean discoveryOK = false;
+      boolean retryDiscovery = false;
+      int tryNumber = 0;
+
+      do {
+
+         discoveryOK = checkOnDiscovery();
+
+         retryDiscovery = (initialConnectAttempts > 0 && tryNumber++ < initialConnectAttempts) && !disableDiscoveryRetries;
+
+         if (!discoveryOK) {
+
+            if (retryDiscovery) {
+               ActiveMQClientLogger.LOGGER.broadcastTimeout(tryNumber, initialConnectAttempts);
+            } else {
+               throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutInInitialBroadcast();
+            }
+         }
+      }
+      while (!discoveryOK && retryDiscovery);
+
+      if (!discoveryOK) {
+         // I don't think the code would ever get to this situation, since there's an exception thrown on the previous loop
+         // however I will keep this just in case
+         throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutInInitialBroadcast();
+      }
+
+   }
+
+   private boolean checkOnDiscovery() throws ActiveMQException {
+
+      synchronized (discoveryGroupGuardian) {
+
+         // notice: in case you have many threads waiting to get on checkOnDiscovery, only one will perform the actual discovery
+         //         while subsequent calls will have numberOfInitialConnectors > 0
+         if (this.getNumInitialConnectors() == 0 && discoveryGroupConfiguration != null) {
+            try {
+
+               long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
+               if (!discoveryGroup.waitForBroadcast(timeout)) {
+
+                  if (logger.isDebugEnabled()) {
+                     String threadDump = ThreadDumpUtil.threadDump("Discovery timeout, printing thread dump");
+                     logger.debug(threadDump);
+                  }
+
+                  // if disableDiscoveryRetries = true, it means this is a Bridge or a Cluster Connection Bridge
+                  // which has a different mechanism of retry
+                  // and we should ignore UDP restarts here.
+                  if (!disableDiscoveryRetries) {
+                     if (discoveryGroup != null) {
+                        discoveryGroup.stop();
+                     }
+
+                     logger.debug("Restarting discovery");
+
+                     startDiscovery();
+                  }
+
+                  return false;
+               }
+            } catch (Exception e) {
+               throw new ActiveMQInternalErrorException(e.getMessage(), e);
+            }
+         }
+      }
+
+      return true;
+   }
+
    public void flushTopology() {
       if (updateArrayActor != null) {
          updateArrayActor.flush(10, TimeUnit.SECONDS);
@@ -1682,8 +1769,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
             while (!isClosed()) {
                retryNumber++;
                for (Connector conn : connectors) {
-                  if (logger.isDebugEnabled()) {
-                     logger.debug(this + "::Submitting connect towards " + conn);
+                  if (logger.isTraceEnabled()) {
+                     logger.trace(this + "::Submitting connect towards " + conn);
                   }
 
                   ClientSessionFactory csf = conn.tryConnect();
@@ -1717,8 +1804,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
                         }
                      });
 
-                     if (logger.isDebugEnabled()) {
-                        logger.debug("Returning " + csf +
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Returning " + csf +
                                         " after " +
                                         retryNumber +
                                         " retries on StaticConnector " +
@@ -1740,7 +1827,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
          } catch (RejectedExecutionException e) {
             if (isClosed() || skipWarnings)
                return null;
-            logger.debug("Rejected execution", e);
+            logger.trace("Rejected execution", e);
             throw e;
          } catch (Exception e) {
             if (isClosed() || skipWarnings)
@@ -1809,7 +1896,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
 
          public ClientSessionFactory tryConnect() throws ActiveMQException {
             if (logger.isDebugEnabled()) {
-               logger.debug(this + "::Trying to connect to " + factory);
+               logger.trace(this + "::Trying to connect to " + factory);
             }
             try {
                ClientSessionFactoryInternal factoryToUse = factory;
@@ -1824,7 +1911,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
                }
                return factoryToUse;
             } catch (ActiveMQException e) {
-               logger.debug(this + "::Exception on establish connector initial connection", e);
+               logger.trace(this + "::Exception on establish connector initial connection", e);
                return null;
             }
          }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
index b57042b..86b28d2 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
@@ -16,12 +16,15 @@
  */
 package org.apache.activemq.artemis.core.cluster;
 
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@@ -35,6 +38,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationService;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.jboss.logging.Logger;
 
@@ -102,13 +106,22 @@ public final class DiscoveryGroup implements ActiveMQComponent {
          return;
       }
 
+      if (logger.isDebugEnabled()) logger.debug("Starting Discovery Group for " + name);
+
       endpoint.openClient();
 
       started = true;
 
-      thread = new Thread(new DiscoveryRunnable(), "activemq-discovery-group-thread-" + name);
+      ThreadFactory tfactory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
+         @Override
+         public ThreadFactory run() {
+            return new ActiveMQThreadFactory("DiscoveryGroup-" + System.identityHashCode(this), "activemq-discovery-group-thread-" + name, true, DiscoveryGroup.class.getClassLoader());
+         }
+      });
+
+      thread = tfactory.newThread(new DiscoveryRunnable());
 
-      thread.setDaemon(true);
+      if (logger.isDebugEnabled()) logger.debug("Starting daemon thread");
 
       thread.start();
 
@@ -136,6 +149,10 @@ public final class DiscoveryGroup implements ActiveMQComponent {
 
    @Override
    public void stop() {
+
+      if (logger.isDebugEnabled()) {
+         logger.debug("Stopping discovery. There's an exception just as a trace where it happened", new Exception("trace"));
+      }
       synchronized (this) {
          if (!started) {
             return;
@@ -150,6 +167,9 @@ public final class DiscoveryGroup implements ActiveMQComponent {
 
       try {
          endpoint.close(false);
+         if (logger.isDebugEnabled()) {
+            logger.debug("endpoing closed");
+         }
       } catch (Exception e1) {
          ActiveMQClientLogger.LOGGER.errorStoppingDiscoveryBroadcastEndpoint(endpoint, e1);
       }
@@ -167,6 +187,7 @@ public final class DiscoveryGroup implements ActiveMQComponent {
       }
 
       thread = null;
+      received = false;
 
       if (notificationService != null) {
          TypedProperties props = new TypedProperties();
@@ -255,8 +276,16 @@ public final class DiscoveryGroup implements ActiveMQComponent {
                      if (started) {
                         ActiveMQClientLogger.LOGGER.unexpectedNullDataReceived();
                      }
+
+                     if (logger.isDebugEnabled()) {
+                        logger.debug("Received broadcast data as null");
+                     }
                      break;
                   }
+
+                  if (logger.isDebugEnabled()) {
+                     logger.debug("receiving " + data.length);
+                  }
                } catch (Exception e) {
                   if (!started) {
                      return;
@@ -274,6 +303,10 @@ public final class DiscoveryGroup implements ActiveMQComponent {
                checkUniqueID(originatingNodeID, uniqueID);
 
                if (nodeID.equals(originatingNodeID)) {
+
+                  if (logger.isDebugEnabled()) {
+                     logger.debug("ignoring original NodeID" + originatingNodeID + " receivedID = " + nodeID);
+                  }
                   if (checkExpiration()) {
                      callListeners();
                   }
@@ -281,6 +314,10 @@ public final class DiscoveryGroup implements ActiveMQComponent {
                   continue;
                }
 
+               if (logger.isDebugEnabled()) {
+                  logger.debug("Received nodeID " + nodeID);
+               }
+
                int size = buffer.readInt();
 
                boolean changed = false;
@@ -295,6 +332,15 @@ public final class DiscoveryGroup implements ActiveMQComponent {
                   entriesRead[i] = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis());
                }
 
+
+               if (logger.isDebugEnabled()) {
+                  logger.debug("Received " + entriesRead.length + " discovery entry elements");
+                  for (DiscoveryEntry entryDisco : entriesRead) {
+                     logger.debug("" + entryDisco);
+                  }
+               }
+
+
                synchronized (DiscoveryGroup.this) {
                   for (DiscoveryEntry entry : entriesRead) {
                      if (connectors.put(originatingNodeID, entry) == null) {
@@ -303,6 +349,10 @@ public final class DiscoveryGroup implements ActiveMQComponent {
                   }
 
                   changed = changed || checkExpiration();
+
+                  if (logger.isDebugEnabled()) {
+                     logger.debug("changed = " + changed);
+                  }
                }
                //only call the listeners if we have changed
                //also make sure that we aren't stopping to avoid deadlock
@@ -313,12 +363,18 @@ public final class DiscoveryGroup implements ActiveMQComponent {
                         logger.trace(connector);
                      }
                   }
+                  if (logger.isDebugEnabled()) {
+                     logger.debug("Calling listeners");
+                  }
                   callListeners();
                }
 
                synchronized (waitLock) {
                   received = true;
 
+                  if (logger.isDebugEnabled()) {
+                     logger.debug("Calling notifyAll");
+                  }
                   waitLock.notifyAll();
                }
             } catch (Throwable e) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java
index 0066ae1..7458ebf 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java
@@ -16,6 +16,18 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.topology;
 
+import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class NettyHAClientTopologyWithDiscoveryTest extends HAClientTopologyWithDiscoveryTest {
 
    @Override
@@ -23,4 +35,127 @@ public class NettyHAClientTopologyWithDiscoveryTest extends HAClientTopologyWith
       return true;
    }
 
+
+
+   @Test
+   public void testRecoveryBadUDPWithRetry() throws Exception {
+      startServers(0);
+      ServerLocatorImpl serverLocator = (ServerLocatorImpl) createHAServerLocator();
+      serverLocator.setInitialConnectAttempts(10);
+      serverLocator.initialize();
+      serverLocator.getDiscoveryGroup().stop();
+
+
+      ClientSessionFactory factory = serverLocator.createSessionFactory();
+      ClientSession session = factory.createSession();
+      session.close();
+   }
+
+   @Test
+   public void testRecoveryBadUDPWithoutRetry() throws Exception {
+      startServers(0);
+      ServerLocatorImpl serverLocator = (ServerLocatorImpl) createHAServerLocator();
+      serverLocator.setInitialConnectAttempts(0);
+      serverLocator.initialize();
+      serverLocator.getDiscoveryGroup().stop();
+
+
+      boolean failure = false;
+      try {
+         ClientSessionFactory factory = serverLocator.createSessionFactory();
+         ClientSession session = factory.createSession();
+         session.close();
+         factory.close();
+      } catch (Exception e) {
+         e.printStackTrace();
+         failure = true;
+      }
+
+      Assert.assertTrue(failure);
+
+      ClientSessionFactory factory = serverLocator.createSessionFactory();
+      ClientSession session = factory.createSession();
+      session.close();
+      factory.close();
+
+   }
+
+   @Test
+   public void testNoServer() {
+      final ServerLocatorImpl serverLocator = (ServerLocatorImpl)ActiveMQClient.createServerLocatorWithHA(new DiscoveryGroupConfiguration().
+              setBroadcastEndpointFactory(new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress).
+                      setGroupPort(groupPort)).setDiscoveryInitialWaitTimeout(10)).setInitialConnectAttempts(0);
+      addServerLocator(serverLocator);
+      serverLocator.setInitialConnectAttempts(3);
+
+      try {
+         serverLocator.createSessionFactory();
+         Assert.fail("Exception was expected");
+      } catch (Exception e) {
+      }
+   }
+
+
+   @Test
+   public void testConnectWithMultiThread() throws Exception {
+      final AtomicInteger errors = new AtomicInteger(0);
+      int NUMBER_OF_THREADS = 100;
+      final CyclicBarrier barrier = new CyclicBarrier(NUMBER_OF_THREADS);
+      final ServerLocatorImpl serverLocator = (ServerLocatorImpl)ActiveMQClient.createServerLocatorWithHA(new DiscoveryGroupConfiguration().
+              setBroadcastEndpointFactory(new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress).
+                      setGroupPort(groupPort)).setDiscoveryInitialWaitTimeout(1000)).setInitialConnectAttempts(0);
+      serverLocator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
+      addServerLocator(serverLocator);
+
+      startServers(0);
+
+      try {
+
+         serverLocator.setInitialConnectAttempts(0);
+
+         Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+               try {
+                  barrier.await();
+
+                  ClientSessionFactory factory = serverLocator.createSessionFactory();
+                  ClientSession session = factory.createSession();
+                  session.close();
+                  factory.close();
+
+               } catch (Exception e) {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+            }
+         };
+
+
+         Thread[] threads = new Thread[NUMBER_OF_THREADS];
+
+         for (int i = 0; i < threads.length; i++) {
+            threads[i] = new Thread(runnable);
+            threads[i].start();
+         }
+
+         for (Thread t : threads) {
+            t.join();
+         }
+
+
+         Assert.assertEquals(0, errors.get());
+
+         serverLocator.close();
+
+         serverLocator.getDiscoveryGroup().stop();
+      } finally {
+         stopServers(0);
+      }
+   }
+
+
+
+
+
 }