You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/17 08:54:45 UTC
[36/41] ignite git commit: IGNITE-3153 TcpDiscoveryZookeeperIpFinder
doesn't properly handle client reconnections
IGNITE-3153 TcpDiscoveryZookeeperIpFinder doesn't properly handle client reconnections
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7da0a02a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7da0a02a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7da0a02a
Branch: refs/heads/ignite-3331
Commit: 7da0a02a8aface7db7ac2485f22572f85ca67674
Parents: a4bbf0b
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu Jun 16 10:49:48 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Jun 16 10:49:48 2016 +0300
----------------------------------------------------------------------
.../zk/TcpDiscoveryZookeeperIpFinder.java | 65 +++++++++++---------
.../tcp/ipfinder/zk/ZookeeperIpFinderTest.java | 20 ++++--
2 files changed, 51 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7da0a02a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
index bee4dab..238987b 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
@@ -39,6 +39,7 @@ import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter;
@@ -68,11 +69,8 @@ import org.codehaus.jackson.map.annotate.JsonRootName;
*
* @see <a href="http://zookeeper.apache.org">Apache ZooKeeper</a>
* @see <a href="http://curator.apache.org">Apache Curator</a>
- *
- * @author Raul Kripalani
*/
public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
-
/** System property name to provide the ZK Connection String. */
public static final String PROP_ZK_CONNECTION_STRING = "IGNITE_ZK_CONNECTION_STRING";
@@ -89,6 +87,10 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
@GridToStringExclude
private final AtomicBoolean initGuard = new AtomicBoolean();
+ /** Init guard. */
+ @GridToStringExclude
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
/** Logger. */
@LoggerResource
private IgniteLogger log;
@@ -140,9 +142,11 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
curator = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy);
}
- if (curator.getState() != CuratorFrameworkState.STARTED)
+ if (curator.getState() == CuratorFrameworkState.LATENT)
curator.start();
+ A.ensure(curator.getState() == CuratorFrameworkState.STARTED, "CuratorFramework can't be started.");
+
discovery = ServiceDiscoveryBuilder.builder(IgniteInstanceDetails.class)
.client(curator)
.basePath(basePath)
@@ -152,8 +156,11 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
/** {@inheritDoc} */
@Override public void onSpiContextDestroyed() {
- if (!initGuard.compareAndSet(true, false))
+ if (!closeGuard.compareAndSet(false, true)) {
+ U.warn(log, "ZooKeeper IP Finder can't be closed more than once.");
+
return;
+ }
log.info("Destroying ZooKeeper IP Finder.");
@@ -175,7 +182,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
try {
serviceInstances = discovery.queryForInstances(serviceName);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.warning("Error while getting registered addresses from ZooKeeper IP Finder.", e);
return Collections.emptyList();
}
@@ -214,17 +222,18 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
try {
ServiceInstance<IgniteInstanceDetails> si = ServiceInstance.<IgniteInstanceDetails>builder()
- .name(serviceName)
- .uriSpec(URI_SPEC)
- .address(addr.getAddress().getHostAddress())
- .port(addr.getPort())
- .build();
+ .name(serviceName)
+ .uriSpec(URI_SPEC)
+ .address(addr.getAddress().getHostAddress())
+ .port(addr.getPort())
+ .build();
ourInstances.put(addr, si);
discovery.registerService(si);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.warning(String.format("Error while registering an address from ZooKeeper IP Finder " +
"[message=%s,addresses=%s]", e.getMessage(), addr), e);
}
@@ -245,13 +254,14 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
ServiceInstance<IgniteInstanceDetails> si = ourInstances.get(addr);
if (si == null) {
log.warning("Asked to unregister address from ZooKeeper IP Finder, but no match was found in local " +
- "instance map for: " + addrs);
+ "instance map for: " + addrs);
continue;
}
try {
discovery.unregisterService(si);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.warning("Error while unregistering an address from ZooKeeper IP Finder: " + addr, e);
}
}
@@ -272,7 +282,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
}
/**
- * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework} is not being set explicitly.
+ * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework} is not being set
+ * explicitly.
*/
public void setZkConnectionString(String zkConnectionString) {
this.zkConnectionString = zkConnectionString;
@@ -286,8 +297,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
}
/**
- * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being injected, or if
- * using a system property.
+ * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being injected, or if using a
+ * system property.
*/
public void setRetryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
@@ -315,9 +326,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
}
/**
- * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery} recipe. In physical
- * ZK terms, it represents the node under {@link #basePath}, under which services will be
- * registered.
+ * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery} recipe. In physical ZK
+ * terms, it represents the node under {@link #basePath}, under which services will be registered.
*/
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
@@ -331,20 +341,19 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
}
/**
- * @param allowDuplicateRegistrations Whether to register each node only once, or if duplicate registrations
- * are allowed. Nodes will attempt to register themselves, plus those they
- * know about. By default, duplicate registrations are not allowed, but you
- * might want to set this property to <tt>true</tt> if you have multiple
- * network interfaces or if you are facing troubles.
+ * @param allowDuplicateRegistrations Whether to register each node only once, or if duplicate registrations are
+ * allowed. Nodes will attempt to register themselves, plus those they know about. By default, duplicate
+ * registrations are not allowed, but you might want to set this property to <tt>true</tt> if you have multiple
+ * network interfaces or if you are facing troubles.
*/
public void setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) {
this.allowDuplicateRegistrations = allowDuplicateRegistrations;
}
/**
- * Empty DTO for storing service instances details. Currently acting as a placeholder because Curator requires
- * a payload type when registering and discovering nodes. May be enhanced in the future with further information
- * to assist discovery.
+ * Empty DTO for storing service instances details. Currently acting as a placeholder because Curator requires a
+ * payload type when registering and discovering nodes. May be enhanced in the future with further information to
+ * assist discovery.
*
* @author Raul Kripalani
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/7da0a02a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
index ec868ea..42f7a1d 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
@@ -32,8 +32,10 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
/**
@@ -42,7 +44,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
* @author Raul Kripalani
*/
public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
-
/** ZK Cluster size. */
private static final int ZK_CLUSTER_SIZE = 3;
@@ -121,9 +122,9 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
// first node => configure with zkUrl; second node => configure with CuratorFramework; third and subsequent
// shall be configured through system property
- if (gridName.equals(getTestGridName(0))) {
+ if (gridName.equals(getTestGridName(0)))
zkIpFinder.setZkConnectionString(zkCluster.getConnectString());
- }
+
else if (gridName.equals(getTestGridName(1))) {
zkIpFinder.setCurator(CuratorFrameworkFactory.newClient(zkCluster.getConnectString(),
new ExponentialBackoffRetry(100, 5)));
@@ -361,10 +362,17 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
// stop all grids
stopAllGrids();
- Thread.sleep(2000);
- // check that all nodes are gone in ZK
- assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ return 0 == zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size();
+ }
+ catch (Exception e) {
+ return false;
+ }
+ }
+ }, 20000));
}
/**