You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/20 12:53:16 UTC

[36/50] ignite git commit: IGNITE-3153 TcpDiscoveryZookeeperIpFinder doesn't properly handle client reconnections

IGNITE-3153 TcpDiscoveryZookeeperIpFinder doesn't properly handle client reconnections


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7da0a02a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7da0a02a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7da0a02a

Branch: refs/heads/ignite-3341
Commit: 7da0a02a8aface7db7ac2485f22572f85ca67674
Parents: a4bbf0b
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu Jun 16 10:49:48 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Jun 16 10:49:48 2016 +0300

----------------------------------------------------------------------
 .../zk/TcpDiscoveryZookeeperIpFinder.java       | 65 +++++++++++---------
 .../tcp/ipfinder/zk/ZookeeperIpFinderTest.java  | 20 ++++--
 2 files changed, 51 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7da0a02a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
index bee4dab..238987b 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
@@ -39,6 +39,7 @@ import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter;
@@ -68,11 +69,8 @@ import org.codehaus.jackson.map.annotate.JsonRootName;
  *
  * @see <a href="http://zookeeper.apache.org">Apache ZooKeeper</a>
  * @see <a href="http://curator.apache.org">Apache Curator</a>
- *
- * @author Raul Kripalani
  */
 public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
-
     /** System property name to provide the ZK Connection String. */
     public static final String PROP_ZK_CONNECTION_STRING = "IGNITE_ZK_CONNECTION_STRING";
 
@@ -89,6 +87,10 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
     @GridToStringExclude
     private final AtomicBoolean initGuard = new AtomicBoolean();
 
+    /** Init guard. */
+    @GridToStringExclude
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
     /** Logger. */
     @LoggerResource
     private IgniteLogger log;
@@ -140,9 +142,11 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
             curator = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy);
         }
 
-        if (curator.getState() != CuratorFrameworkState.STARTED)
+        if (curator.getState() == CuratorFrameworkState.LATENT)
             curator.start();
 
+        A.ensure(curator.getState() == CuratorFrameworkState.STARTED, "CuratorFramework can't be started.");
+
         discovery = ServiceDiscoveryBuilder.builder(IgniteInstanceDetails.class)
             .client(curator)
             .basePath(basePath)
@@ -152,8 +156,11 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
 
     /** {@inheritDoc} */
     @Override public void onSpiContextDestroyed() {
-        if (!initGuard.compareAndSet(true, false))
+        if (!closeGuard.compareAndSet(false, true)) {
+            U.warn(log, "ZooKeeper IP Finder can't be closed more than once.");
+
             return;
+        }
 
         log.info("Destroying ZooKeeper IP Finder.");
 
@@ -175,7 +182,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
 
         try {
             serviceInstances = discovery.queryForInstances(serviceName);
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             log.warning("Error while getting registered addresses from ZooKeeper IP Finder.", e);
             return Collections.emptyList();
         }
@@ -214,17 +222,18 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
 
             try {
                 ServiceInstance<IgniteInstanceDetails> si = ServiceInstance.<IgniteInstanceDetails>builder()
-                        .name(serviceName)
-                        .uriSpec(URI_SPEC)
-                        .address(addr.getAddress().getHostAddress())
-                        .port(addr.getPort())
-                        .build();
+                    .name(serviceName)
+                    .uriSpec(URI_SPEC)
+                    .address(addr.getAddress().getHostAddress())
+                    .port(addr.getPort())
+                    .build();
 
                 ourInstances.put(addr, si);
 
                 discovery.registerService(si);
 
-            } catch (Exception e) {
+            }
+            catch (Exception e) {
                 log.warning(String.format("Error while registering an address from ZooKeeper IP Finder " +
                     "[message=%s,addresses=%s]", e.getMessage(), addr), e);
             }
@@ -245,13 +254,14 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
             ServiceInstance<IgniteInstanceDetails> si = ourInstances.get(addr);
             if (si == null) {
                 log.warning("Asked to unregister address from ZooKeeper IP Finder, but no match was found in local " +
-                        "instance map for: " + addrs);
+                    "instance map for: " + addrs);
                 continue;
             }
 
             try {
                 discovery.unregisterService(si);
-            } catch (Exception e) {
+            }
+            catch (Exception e) {
                 log.warning("Error while unregistering an address from ZooKeeper IP Finder: " + addr, e);
             }
         }
@@ -272,7 +282,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
     }
 
     /**
-     * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework} is not being set explicitly.
+     * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework} is not being set
+     * explicitly.
      */
     public void setZkConnectionString(String zkConnectionString) {
         this.zkConnectionString = zkConnectionString;
@@ -286,8 +297,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
     }
 
     /**
-     * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being injected, or if
-     *                    using a system property.
+     * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being injected, or if using a
+     * system property.
      */
     public void setRetryPolicy(RetryPolicy retryPolicy) {
         this.retryPolicy = retryPolicy;
@@ -315,9 +326,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
     }
 
     /**
-     * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery} recipe. In physical
-     *                    ZK terms, it represents the node under {@link #basePath}, under which services will be
-     *                    registered.
+     * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery} recipe. In physical ZK
+     * terms, it represents the node under {@link #basePath}, under which services will be registered.
      */
     public void setServiceName(String serviceName) {
         this.serviceName = serviceName;
@@ -331,20 +341,19 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
     }
 
     /**
-     * @param allowDuplicateRegistrations Whether to register each node only once, or if duplicate registrations
-     *                                    are allowed. Nodes will attempt to register themselves, plus those they
-     *                                    know about. By default, duplicate registrations are not allowed, but you
-     *                                    might want to set this property to <tt>true</tt> if you have multiple
-     *                                    network interfaces or if you are facing troubles.
+     * @param allowDuplicateRegistrations Whether to register each node only once, or if duplicate registrations are
+     * allowed. Nodes will attempt to register themselves, plus those they know about. By default, duplicate
+     * registrations are not allowed, but you might want to set this property to <tt>true</tt> if you have multiple
+     * network interfaces or if you are facing troubles.
      */
     public void setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) {
         this.allowDuplicateRegistrations = allowDuplicateRegistrations;
     }
 
     /**
-     * Empty DTO for storing service instances details. Currently acting as a placeholder because Curator requires
-     * a payload type when registering and discovering nodes. May be enhanced in the future with further information
-     * to assist discovery.
+     * Empty DTO for storing service instances details. Currently acting as a placeholder because Curator requires a
+     * payload type when registering and discovering nodes. May be enhanced in the future with further information to
+     * assist discovery.
      *
      * @author Raul Kripalani
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7da0a02a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
index ec868ea..42f7a1d 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
@@ -32,8 +32,10 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 /**
@@ -42,7 +44,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
  * @author Raul Kripalani
  */
 public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
-
     /** ZK Cluster size. */
     private static final int ZK_CLUSTER_SIZE = 3;
 
@@ -121,9 +122,9 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
 
         // first node => configure with zkUrl; second node => configure with CuratorFramework; third and subsequent
         // shall be configured through system property
-        if (gridName.equals(getTestGridName(0))) {
+        if (gridName.equals(getTestGridName(0)))
             zkIpFinder.setZkConnectionString(zkCluster.getConnectString());
-        }
+
         else if (gridName.equals(getTestGridName(1))) {
             zkIpFinder.setCurator(CuratorFrameworkFactory.newClient(zkCluster.getConnectString(),
                 new ExponentialBackoffRetry(100, 5)));
@@ -361,10 +362,17 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
 
         // stop all grids
         stopAllGrids();
-        Thread.sleep(2000);
 
-        // check that all nodes are gone in ZK
-        assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    return 0 == zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size();
+                }
+                catch (Exception e) {
+                    return false;
+                }
+            }
+        }, 20000));
     }
 
     /**