You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/13 09:33:19 UTC
[07/54] [abbrv] ignite git commit: IGNITE-7222 Added ZooKeeper
discovery SPI
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index 54b3a78..e89a4c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -17,11 +17,21 @@
package org.apache.ignite.spi.communication.tcp;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.GridAbstractCommunicationSelfTest;
@@ -85,6 +95,67 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
}
}
+ /**
+ *
+ */
+ public void testCheckConnection1() {
+ for (int i = 0; i < 100; i++) {
+ for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) {
+ TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue();
+
+ List<ClusterNode> checkNodes = new ArrayList<>(nodes);
+
+ assert checkNodes.size() > 1;
+
+ IgniteFuture<BitSet> fut = spi.checkConnection(checkNodes);
+
+ BitSet res = fut.get();
+
+ for (int n = 0; n < checkNodes.size(); n++)
+ assertTrue(res.get(n));
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCheckConnection2() throws Exception {
+ final int THREADS = spis.size();
+
+ final CyclicBarrier b = new CyclicBarrier(THREADS);
+
+ List<IgniteInternalFuture> futs = new ArrayList<>();
+
+ for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) {
+ final TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue();
+
+ futs.add(GridTestUtils.runAsync(new Callable() {
+ @Override public Object call() throws Exception {
+ List<ClusterNode> checkNodes = new ArrayList<>(nodes);
+
+ assert checkNodes.size() > 1;
+
+ b.await();
+
+ for (int i = 0; i < 100; i++) {
+ IgniteFuture<BitSet> fut = spi.checkConnection(checkNodes);
+
+ BitSet res = fut.get();
+
+ for (int n = 0; n < checkNodes.size(); n++)
+ assertTrue(res.get(n));
+ }
+
+ return null;
+ }
+ }));
+ }
+
+ for (IgniteInternalFuture f : futs)
+ f.get();
+ }
+
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
index 54b48e5..9a45d2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
@@ -206,6 +206,11 @@ public class FilterDataForClientNodeDiscoveryTest extends GridCommonAbstractTest
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
DiscoCache discoCache) {
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index ca05288..51dcb23 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -613,6 +613,16 @@ public class GridSpiTestContext implements IgniteSpiContext {
return Collections.emptyMap();
}
+ /** {@inheritDoc} */
+ @Override public boolean communicationFailureResolveSupported() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* @param cacheName Cache name.
* @return Map representing cache.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java b/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java
index 4507572..e2594ca 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java
@@ -83,6 +83,15 @@ public final class GridTestProperties {
/** "True value" enables {@link BinaryBasicNameMapper} in {@link BinaryTypeConfiguration#getNameMapper()} */
public static final String BINARY_MARSHALLER_USE_SIMPLE_NAME_MAPPER = "binary.marshaller.use.simple.name.mapper";
+ /**
+ * Name of class which provides static method preprocessConfiguration(IgniteConfiguration cfg) to
+ * alter {@link org.apache.ignite.configuration.IgniteConfiguration} before node is started.
+ * <p>
+ * Note: this pre-preprocessor is started only if test starts node using one of GridAbstractTest's startGrid
+ * method.
+ */
+ public static final String IGNITE_CFG_PREPROCESSOR_CLS = "ignite.cfg.preprocessor.class";
+
/** */
static {
// Initialize IGNITE_HOME system property.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index c3b262c..f5784eb 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -46,6 +46,7 @@ import junit.framework.TestCase;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
@@ -84,6 +85,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerContextTestImpl;
@@ -92,6 +94,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
@@ -124,6 +127,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.GridKernalState.DISCONNECTED;
import static org.apache.ignite.testframework.config.GridTestProperties.BINARY_MARSHALLER_USE_SIMPLE_NAME_MAPPER;
+import static org.apache.ignite.testframework.config.GridTestProperties.IGNITE_CFG_PREPROCESSOR_CLS;
/**
* Common abstract test for Ignite tests.
@@ -203,13 +207,15 @@ public abstract class GridAbstractTest extends TestCase {
if (BINARY_MARSHALLER)
GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName());
- Thread timer = new Thread(new GridTestClockTimer(), "ignite-clock-for-tests");
+ if (GridTestClockTimer.startTestTimer()) {
+ Thread timer = new Thread(new GridTestClockTimer(), "ignite-clock-for-tests");
- timer.setDaemon(true);
+ timer.setDaemon(true);
- timer.setPriority(10);
+ timer.setPriority(10);
- timer.start();
+ timer.start();
+ }
}
/** */
@@ -838,6 +844,7 @@ public abstract class GridAbstractTest extends TestCase {
protected Ignite startGrid(String igniteInstanceName, GridSpringResourceContext ctx) throws Exception {
return startGrid(igniteInstanceName, optimize(getConfiguration(igniteInstanceName)), ctx);
}
+
/**
* Starts new grid with given name.
*
@@ -852,12 +859,33 @@ public abstract class GridAbstractTest extends TestCase {
startingIgniteInstanceName.set(igniteInstanceName);
try {
+ String cfgProcClsName = System.getProperty(IGNITE_CFG_PREPROCESSOR_CLS);
+
+ if (cfgProcClsName != null) {
+ try {
+ Class<?> cfgProc = Class.forName(cfgProcClsName);
+
+ Method method = cfgProc.getMethod("preprocessConfiguration", IgniteConfiguration.class);
+
+ if (!Modifier.isStatic(method.getModifiers()))
+ throw new Exception("Non-static pre-processor method in pre-processor class: " + cfgProcClsName);
+
+ method.invoke(null, cfg);
+ }
+ catch (Exception e) {
+ log.error("Failed to pre-process IgniteConfiguration using pre-processor class: " + cfgProcClsName);
+
+ throw new IgniteException(e);
+ }
+ }
+
Ignite node = IgnitionEx.start(cfg, ctx);
IgniteConfiguration nodeCfg = node.configuration();
log.info("Node started with the following configuration [id=" + node.cluster().localNode().id()
+ ", marshaller=" + nodeCfg.getMarshaller()
+ + ", discovery=" + nodeCfg.getDiscoverySpi()
+ ", binaryCfg=" + nodeCfg.getBinaryConfiguration()
+ ", lateAff=" + nodeCfg.isLateAffinityAssignment() + "]");
@@ -967,6 +995,26 @@ public abstract class GridAbstractTest extends TestCase {
if (cfg == null)
cfg = optimize(getConfiguration(igniteInstanceName));
+ if (locNode != null) {
+ DiscoverySpi discoverySpi = locNode.configuration().getDiscoverySpi();
+
+ if (discoverySpi != null && !(discoverySpi instanceof TcpDiscoverySpi)) {
+ try {
+ // Clone added to support ZookeeperDiscoverySpi.
+ Method m = discoverySpi.getClass().getDeclaredMethod("cloneSpiConfiguration");
+
+ m.setAccessible(true);
+
+ cfg.setDiscoverySpi((DiscoverySpi) m.invoke(discoverySpi));
+
+ resetDiscovery = false;
+ }
+ catch (NoSuchMethodException e) {
+ // Ignore.
+ }
+ }
+ }
+
return new IgniteProcessProxy(cfg, log, locNode, resetDiscovery);
}
@@ -1075,7 +1123,9 @@ public abstract class GridAbstractTest extends TestCase {
for (Ignite g : srvs)
stopGrid(g.name(), cancel, false);
- assert G.allGrids().isEmpty();
+ List<Ignite> nodes = G.allGrids();
+
+ assert nodes.isEmpty() : nodes;
}
finally {
IgniteProcessProxy.killAll(); // In multi-JVM case.
@@ -1177,6 +1227,14 @@ public abstract class GridAbstractTest extends TestCase {
}
/**
+ * @param nodeIdx Node index.
+ * @return Node ID.
+ */
+ protected final UUID nodeId(int nodeIdx) {
+ return ignite(nodeIdx).cluster().localNode().id();
+ }
+
+ /**
* Gets grid for given test.
*
* @return Grid for given test.
@@ -1217,7 +1275,11 @@ public abstract class GridAbstractTest extends TestCase {
* @throws Exception If failed.
*/
protected Ignite startGrid(String igniteInstanceName, String springCfgPath) throws Exception {
- return startGrid(igniteInstanceName, loadConfiguration(springCfgPath));
+ IgniteConfiguration cfg = loadConfiguration(springCfgPath);
+
+ cfg.setGridLogger(getTestResources().getLogger());
+
+ return startGrid(igniteInstanceName, cfg);
}
/**
@@ -2142,6 +2204,50 @@ public abstract class GridAbstractTest extends TestCase {
}
}
}
+ /**
+ * @param expSize Expected nodes number.
+ * @throws Exception If failed.
+ */
+ protected void waitForTopology(final int expSize) throws Exception {
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ List<Ignite> nodes = G.allGrids();
+
+ if (nodes.size() != expSize) {
+ info("Wait all nodes [size=" + nodes.size() + ", exp=" + expSize + ']');
+
+ return false;
+ }
+
+ for (Ignite node: nodes) {
+ try {
+ IgniteFuture<?> reconnectFut = node.cluster().clientReconnectFuture();
+
+ if (reconnectFut != null && !reconnectFut.isDone()) {
+ info("Wait for size on node, reconnect is in progress [node=" + node.name() + ']');
+
+ return false;
+ }
+
+ int sizeOnNode = node.cluster().nodes().size();
+
+ if (sizeOnNode != expSize) {
+ info("Wait for size on node [node=" + node.name() + ", size=" + sizeOnNode + ", exp=" + expSize + ']');
+
+ return false;
+ }
+ }
+ catch (IgniteClientDisconnectedException e) {
+ info("Wait for size on node, node disconnected [node=" + node.name() + ']');
+
+ return false;
+ }
+ }
+
+ return true;
+ }
+ }, 30_000));
+ }
/**
* @param millis Time to sleep.
@@ -2172,6 +2278,17 @@ public abstract class GridAbstractTest extends TestCase {
}
/**
+ * @return {@code True} if nodes use {@link TcpDiscoverySpi}.
+ */
+ protected static boolean tcpDiscovery() {
+ List<Ignite> nodes = G.allGrids();
+
+ assertFalse("There are no nodes", nodes.isEmpty());
+
+ return nodes.get(0).configuration().getDiscoverySpi() instanceof TcpDiscoverySpi;
+ }
+
+ /**
*
*/
private static interface WriteReplaceOwner {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
index d7be576..2b3a19c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
@@ -162,6 +162,8 @@ public class IgniteNodeRunner {
cfg.setDiscoverySpi(disco);
}
+ X.println("Configured discovery: " + cfg.getDiscoverySpi().getClass().getName());
+
return cfg;
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 55fab8d..14eb296 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
import org.apache.ignite.internal.ClusterNodeMetricsSelfTest;
+import org.apache.ignite.internal.ClusterNodeMetricsUpdateTest;
import org.apache.ignite.internal.GridAffinityNoCacheSelfTest;
import org.apache.ignite.internal.GridAffinitySelfTest;
import org.apache.ignite.internal.GridAlwaysFailoverSpiFailSelfTest;
@@ -122,6 +123,7 @@ public class IgniteComputeGridTestSuite {
suite.addTestSuite(GridAlwaysFailoverSpiFailSelfTest.class);
suite.addTestSuite(GridTaskInstanceExecutionSelfTest.class);
suite.addTestSuite(ClusterNodeMetricsSelfTest.class);
+ suite.addTestSuite(ClusterNodeMetricsUpdateTest.class);
suite.addTestSuite(GridNonHistoryMetricsSelfTest.class);
suite.addTestSuite(GridCancelledJobsMetricsSelfTest.class);
suite.addTestSuite(GridCollisionJobsContextSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java
index b9ef1e4..e26b211 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java
@@ -117,7 +117,7 @@ public class IgniteCacheDistributedQueryCancelSelfTest extends GridCommonAbstrac
}
for (Ignite g : G.allGrids())
- if (!g.configuration().getDiscoverySpi().isClientMode())
+ if (!g.configuration().isClientMode())
stopGrid(g.name(), true);
}
}, 1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java
index 97720d5..bd3b093 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java
@@ -89,11 +89,14 @@ public abstract class DynamicIndexAbstractBasicSelfTest extends DynamicIndexAbst
* @param mode Mode.
* @param atomicityMode Atomicity mode.
* @param near Near flag.
+ * @throws Exception If failed.
*/
private void initialize(CacheMode mode, CacheAtomicityMode atomicityMode, boolean near)
- throws IgniteCheckedException {
+ throws Exception {
createSqlCache(node(), cacheConfiguration(mode, atomicityMode, near));
+ awaitPartitionMapExchange();
+
grid(IDX_CLI_NEAR_ONLY).getOrCreateNearCache(CACHE_NAME, new NearCacheConfiguration<>());
assertNoIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java
index a181068..5cad167 100644
--- a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java
+++ b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java
@@ -44,13 +44,7 @@ public class GridJtaTransactionManagerSelfTest extends GridCommonAbstractTest {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName).
setCacheConfiguration(defaultCacheConfiguration().setCacheMode(PARTITIONED));
- cfg.getTransactionConfiguration().setTxManagerFactory(new Factory<TransactionManager>() {
- private static final long serialVersionUID = 0L;
-
- @Override public TransactionManager create() {
- return jotm.getTransactionManager();
- }
- });
+ cfg.getTransactionConfiguration().setTxManagerFactory(new TestTxManagerFactory());
return cfg;
}
@@ -205,4 +199,17 @@ public class GridJtaTransactionManagerSelfTest extends GridCommonAbstractTest {
cache.removeAll();
}
}
+
+ /**
+ *
+ */
+ static class TestTxManagerFactory implements Factory<TransactionManager> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public TransactionManager create() {
+ return jotm.getTransactionManager();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/jta/GridPartitionedCacheJtaFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/jta/GridPartitionedCacheJtaFactorySelfTest.java b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/jta/GridPartitionedCacheJtaFactorySelfTest.java
index f6fd5c7..14b7fae 100644
--- a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/jta/GridPartitionedCacheJtaFactorySelfTest.java
+++ b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/jta/GridPartitionedCacheJtaFactorySelfTest.java
@@ -30,12 +30,19 @@ public class GridPartitionedCacheJtaFactorySelfTest extends AbstractCacheJtaSelf
@Override protected void configureJta(IgniteConfiguration cfg) {
TransactionConfiguration txCfg = cfg.getTransactionConfiguration();
- txCfg.setTxManagerFactory(new Factory<TransactionManager>() {
- private static final long serialVersionUID = 0L;
+ txCfg.setTxManagerFactory(new TestTxManagerFactory());
+ }
+
+ /**
+ *
+ */
+ static class TestTxManagerFactory implements Factory<TransactionManager> {
+ /** */
+ private static final long serialVersionUID = 0L;
- @Override public TransactionManager create() {
- return jotm.getTransactionManager();
- }
- });
+ /** {@inheritDoc} */
+ @Override public TransactionManager create() {
+ return jotm.getTransactionManager();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index fce47a6..d87ea0a 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.query.QueryCursorEx
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata
import org.apache.ignite.lang.IgniteUuid
import org.apache.ignite.spark.impl._
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode
import org.apache.spark._
import org.apache.spark.rdd.RDD
@@ -91,8 +92,14 @@ class IgniteRDD[K, V] (
override protected[spark] def getPreferredLocations(split: Partition): Seq[String] = {
ensureCache()
- ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index)
+ if (ic.ignite().configuration().getDiscoverySpi().isInstanceOf[TcpDiscoverySpi]) {
+ ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index)
.map(_.asInstanceOf[TcpDiscoveryNode].socketAddresses()).flatten.map(_.getHostName).toList
+ }
+ else {
+ ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index)
+ .flatten(_.hostNames).toSeq
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java b/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java
index 75128fc..b453858 100644
--- a/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java
@@ -998,7 +998,8 @@ public class GridFactorySelfTest extends GridCommonAbstractTest {
startGrid("1", c);
- assert ((TcpDiscoverySpi)c.getDiscoverySpi()).started();
+ if (tcpDiscovery())
+ assert ((TcpDiscoverySpi)c.getDiscoverySpi()).started();
try {
startGrid("2", c);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/spring/src/test/java/org/apache/ignite/p2p/GridP2PUserVersionChangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/p2p/GridP2PUserVersionChangeSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/p2p/GridP2PUserVersionChangeSelfTest.java
index b861e19..46da3cc 100644
--- a/modules/spring/src/test/java/org/apache/ignite/p2p/GridP2PUserVersionChangeSelfTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/p2p/GridP2PUserVersionChangeSelfTest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testsuites.IgniteIgnore;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_TASK_UNDEPLOYED;
@@ -255,12 +256,12 @@ public class GridP2PUserVersionChangeSelfTest extends GridCommonAbstractTest {
ignite2.events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
- if (evt.type() == EVT_NODE_LEFT)
+ if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED)
discoLatch.countDown();
return true;
}
- }, EVT_NODE_LEFT);
+ }, EVT_NODE_LEFT, EVT_NODE_FAILED);
Integer res1 = (Integer)ignite1.compute().execute(task1, ignite2.cluster().localNode().id());
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/yardstick/pom-standalone.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/pom-standalone.xml b/modules/yardstick/pom-standalone.xml
index 577a95e..6905d94 100644
--- a/modules/yardstick/pom-standalone.xml
+++ b/modules/yardstick/pom-standalone.xml
@@ -54,6 +54,12 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-zookeeper</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-log4j</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/yardstick/pom.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/pom.xml b/modules/yardstick/pom.xml
index 8cad24b..9923bb7 100644
--- a/modules/yardstick/pom.xml
+++ b/modules/yardstick/pom.xml
@@ -55,6 +55,12 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-zookeeper</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-log4j</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/modules/zookeeper/pom.xml b/modules/zookeeper/pom.xml
index c3c3679..2d47ece 100644
--- a/modules/zookeeper/pom.xml
+++ b/modules/zookeeper/pom.xml
@@ -49,6 +49,12 @@
<dependency>
<groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>${curator.version}</version>
</dependency>
@@ -109,6 +115,13 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-indexing</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-log4j</artifactId>
<version>${project.version}</version>
<scope>test</scope>
@@ -122,16 +135,43 @@
</dependency>
<dependency>
+ <groupId>com.thoughtworks.xstream</groupId>
+ <artifactId>xstream</artifactId>
+ <version>1.4.8</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-indexing</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
<!-- Generate the OSGi MANIFEST.MF for this bundle. -->
<plugin>
<groupId>org.apache.felix</groupId>
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
new file mode 100644
index 0000000..860c71c
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.curator.utils.PathUtils;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.IgniteSpiAdapter;
+import org.apache.ignite.spi.IgniteSpiConfiguration;
+import org.apache.ignite.spi.IgniteSpiContext;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
+import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
+import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport;
+import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
+import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClusterNode;
+import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
+
+/**
+ * Zookeeper Discovery Spi.
+ */
+@IgniteSpiMultipleInstancesSupport(true)
+@DiscoverySpiOrderSupport(true)
+@DiscoverySpiHistorySupport(true)
+@DiscoverySpiMutableCustomMessageSupport(false)
+public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, IgniteDiscoverySpi {
+ /** */
+ public static final String DFLT_ROOT_PATH = "/apacheIgnite";
+
+ /** */
+ public static final long DFLT_JOIN_TIMEOUT = 0;
+
+ /** */
+ @GridToStringInclude
+ private String zkRootPath = DFLT_ROOT_PATH;
+
+ /** */
+ @GridToStringInclude
+ private String zkConnectionString;
+
+ /** */
+ private long joinTimeout = DFLT_JOIN_TIMEOUT;
+
+ /** */
+ @GridToStringInclude
+ private long sesTimeout;
+
+ /** */
+ private boolean clientReconnectDisabled;
+
+ /** */
+ @GridToStringExclude
+ private DiscoverySpiListener lsnr;
+
+ /** */
+ @GridToStringExclude
+ private DiscoverySpiDataExchange exchange;
+
+ /** */
+ @GridToStringExclude
+ private DiscoverySpiNodeAuthenticator nodeAuth;
+
+ /** */
+ @GridToStringExclude
+ private DiscoveryMetricsProvider metricsProvider;
+
+ /** */
+ @GridToStringExclude
+ private ZookeeperDiscoveryImpl impl;
+
+ /** */
+ @GridToStringExclude
+ private Map<String, Object> locNodeAttrs;
+
+ /** */
+ @GridToStringExclude
+ private IgniteProductVersion locNodeVer;
+
+ /** */
+ @GridToStringExclude
+ private Serializable consistentId;
+
+ /** Local node addresses. */
+ private IgniteBiTuple<Collection<String>, Collection<String>> addrs;
+
+ /** */
+ @LoggerResource
+ @GridToStringExclude
+ private IgniteLogger log;
+
+ /** */
+ private IgniteDiscoverySpiInternalListener internalLsnr;
+
+ /**
+ * @return Base path in ZK for znodes created by SPI.
+ */
+ public String getZkRootPath() {
+ return zkRootPath;
+ }
+
+ /**
+ * @param zkRootPath Base path in ZooKeeper for znodes created by SPI.
+ * @return {@code this} for chaining.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public ZookeeperDiscoverySpi setZkRootPath(String zkRootPath) {
+ this.zkRootPath = zkRootPath;
+
+ return this;
+ }
+
+ /**
+ * @return ZooKeeper session timeout.
+ */
+ public long getSessionTimeout() {
+ return sesTimeout;
+ }
+
+ /**
+ * @param sesTimeout ZooKeeper session timeout.
+ * @return {@code this} for chaining.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public ZookeeperDiscoverySpi setSessionTimeout(long sesTimeout) {
+ this.sesTimeout = sesTimeout;
+
+ return this;
+ }
+
+ /**
+ * @return Cluster join timeout.
+ */
+ public long getJoinTimeout() {
+ return joinTimeout;
+ }
+
+ /**
+ * @param joinTimeout Cluster join timeout ({@code 0} means wait forever).
+ * @return {@code this} for chaining.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public ZookeeperDiscoverySpi setJoinTimeout(long joinTimeout) {
+ this.joinTimeout = joinTimeout;
+
+ return this;
+ }
+
+ /**
+ * @return ZooKeeper connection string
+ */
+ public String getZkConnectionString() {
+ return zkConnectionString;
+ }
+
+ /**
+ * @param zkConnectionString ZooKeeper connection string
+ * @return {@code this} for chaining.
+ */
+ @IgniteSpiConfiguration(optional = false)
+ public ZookeeperDiscoverySpi setZkConnectionString(String zkConnectionString) {
+ this.zkConnectionString = zkConnectionString;
+
+ return this;
+ }
+
+ /**
+ * If {@code true} client does not try to reconnect.
+ *
+ * @return Client reconnect disabled flag.
+ */
+ public boolean isClientReconnectDisabled() {
+ return clientReconnectDisabled;
+ }
+
+ /**
+ * Sets client reconnect disabled flag.
+ *
+ * @param clientReconnectDisabled Client reconnect disabled flag.
+ * @return {@code this} for chaining.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public ZookeeperDiscoverySpi setClientReconnectDisabled(boolean clientReconnectDisabled) {
+ this.clientReconnectDisabled = clientReconnectDisabled;
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean clientReconnectSupported() {
+ return !clientReconnectDisabled;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clientReconnect() {
+ impl.reconnect();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean knownNode(UUID nodeId) {
+ return impl.knownNode(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean supportsCommunicationFailureResolve() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) {
+ impl.resolveCommunicationError(node, err);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Serializable consistentId() throws IgniteSpiException {
+ if (consistentId == null) {
+ consistentId = ignite.configuration().getConsistentId();
+
+ if (consistentId == null) {
+ initAddresses();
+
+ final List<String> sortedAddrs = new ArrayList<>(addrs.get1());
+
+ Collections.sort(sortedAddrs);
+
+ if (getBoolean(IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT))
+ consistentId = U.consistentId(sortedAddrs);
+ else {
+ Integer commPort = null;
+
+ if (locNodeAttrs != null) {
+ commPort = (Integer)locNodeAttrs.get(
+ TcpCommunicationSpi.class.getSimpleName() + "." + TcpCommunicationSpi.ATTR_PORT);
+ }
+ else {
+ CommunicationSpi commSpi = ignite.configuration().getCommunicationSpi();
+
+ if (commSpi instanceof TcpCommunicationSpi) {
+ commPort = ((TcpCommunicationSpi)commSpi).boundPort();
+
+ if (commPort == -1)
+ commPort = null;
+ }
+ }
+
+ if (commPort == null) {
+ U.warn(log, "Can not initialize default consistentId, TcpCommunicationSpi port is not initialized.");
+
+ consistentId = ignite.configuration().getNodeId();
+ }
+ else
+ consistentId = U.consistentId(sortedAddrs, commPort);
+ }
+ }
+ }
+
+ return consistentId;
+ }
+
+ /**
+ *
+ */
+ private void initAddresses() {
+ if (addrs == null) {
+ String locHost = ignite != null ? ignite.configuration().getLocalHost() : null;
+
+ InetAddress locAddr;
+
+ try {
+ locAddr = U.resolveLocalHost(locHost);
+ }
+ catch (IOException e) {
+ throw new IgniteSpiException("Unknown local address: " + locHost, e);
+ }
+
+ try {
+ addrs = U.resolveLocalAddresses(locAddr);
+ }
+ catch (Exception e) {
+ throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost,
+ e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> getRemoteNodes() {
+ return impl.remoteNodes();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode getLocalNode() {
+ return impl != null ? impl.localNode() : null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public ClusterNode getNode(UUID nodeId) {
+ return impl.node(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean pingNode(UUID nodeId) {
+ return impl.pingNode(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
+ assert locNodeAttrs == null;
+ assert locNodeVer == null;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Node attributes to set: " + attrs);
+ log.debug("Node version to set: " + ver);
+ }
+
+ locNodeAttrs = attrs;
+ locNodeVer = ver;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+ this.lsnr = lsnr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setDataExchange(DiscoverySpiDataExchange exchange) {
+ this.exchange = exchange;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) {
+ this.metricsProvider = metricsProvider;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void disconnect() throws IgniteSpiException {
+ impl.stop();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
+ this.nodeAuth = auth;
+ }
+
+ /**
+ * @return Authenticator.
+ */
+ public DiscoverySpiNodeAuthenticator getAuthenticator() {
+ return nodeAuth;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getGridStartTime() {
+ return impl.gridStartTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
+ IgniteDiscoverySpiInternalListener internalLsnr = impl.internalLsnr;
+
+ if (internalLsnr != null) {
+ if (!internalLsnr.beforeSendCustomEvent(this, log, msg))
+ return;
+ }
+
+ impl.sendCustomMessage(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void failNode(UUID nodeId, @Nullable String warning) {
+ impl.failNode(nodeId, warning);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isClientMode() throws IllegalStateException {
+ return impl.localNode().isClient();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
+ super.onContextInitialized0(spiCtx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException {
+ if (sesTimeout == 0)
+ sesTimeout = ignite.configuration().getFailureDetectionTimeout().intValue();
+
+ assertParameter(sesTimeout > 0, "sessionTimeout > 0");
+
+ A.notNullOrEmpty(zkConnectionString, "zkConnectionString can not be empty");
+
+ A.notNullOrEmpty(zkRootPath, "zkRootPath can not be empty");
+
+ zkRootPath = zkRootPath.trim();
+
+ if (zkRootPath.endsWith("/"))
+ zkRootPath = zkRootPath.substring(0, zkRootPath.length() - 1);
+
+ try {
+ PathUtils.validatePath(zkRootPath);
+ }
+ catch (IllegalArgumentException e) {
+ throw new IgniteSpiException("zkRootPath is invalid: " + zkRootPath, e);
+ }
+
+ ZookeeperClusterNode locNode = initLocalNode();
+
+ if (log.isInfoEnabled()) {
+ log.info("Start Zookeeper discovery [zkConnectionString=" + zkConnectionString +
+ ", sessionTimeout=" + sesTimeout +
+ ", zkRootPath=" + zkRootPath + ']');
+ }
+
+ impl = new ZookeeperDiscoveryImpl(
+ this,
+ igniteInstanceName,
+ log,
+ zkRootPath,
+ locNode,
+ lsnr,
+ exchange,
+ internalLsnr);
+
+ try {
+ impl.startJoinAndWait();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteSpiException("Failed to join cluster, thread was interrupted", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr) {
+ if (impl != null)
+ impl.internalLsnr = lsnr;
+ else
+ internalLsnr = lsnr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void simulateNodeFailure() {
+ impl.simulateNodeFailure();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStop() throws IgniteSpiException {
+ if (impl != null)
+ impl.stop();
+ }
+
+ /**
+ * @return Local node instance.
+ */
+ private ZookeeperClusterNode initLocalNode() {
+ assert ignite != null;
+
+ initAddresses();
+
+ ZookeeperClusterNode locNode = new ZookeeperClusterNode(
+ ignite.configuration().getNodeId(),
+ addrs.get1(),
+ addrs.get2(),
+ locNodeVer,
+ locNodeAttrs,
+ consistentId(),
+ sesTimeout,
+ ignite.configuration().isClientMode(),
+ metricsProvider);
+
+ locNode.local(true);
+
+ DiscoverySpiListener lsnr = this.lsnr;
+
+ if (lsnr != null)
+ lsnr.onLocalNodeInitialized(locNode);
+
+ if (log.isDebugEnabled())
+ log.debug("Local node initialized: " + locNode);
+
+ if (metricsProvider != null) {
+ locNode.setMetrics(metricsProvider.metrics());
+ locNode.setCacheMetrics(metricsProvider.cacheMetrics());
+ }
+
+ return locNode;
+ }
+
+ /**
+ * Used in tests (called via reflection).
+ *
+ * @return Copy of SPI.
+ */
+ private ZookeeperDiscoverySpi cloneSpiConfiguration() {
+ ZookeeperDiscoverySpi spi = new ZookeeperDiscoverySpi();
+
+ spi.setZkRootPath(zkRootPath);
+ spi.setZkConnectionString(zkConnectionString);
+ spi.setSessionTimeout(sesTimeout);
+ spi.setJoinTimeout(joinTimeout);
+ spi.setClientReconnectDisabled(clientReconnectDisabled);
+
+ return spi;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ZookeeperDiscoverySpi.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
new file mode 100644
index 0000000..b80a9dd
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+
+/**
+ *
+ */
+abstract class ZkAbstractCallabck {
+ /** */
+ final ZkRuntimeState rtState;
+
+ /** */
+ private final ZookeeperDiscoveryImpl impl;
+
+ /** */
+ private final GridSpinBusyLock busyLock;
+
+ /**
+ * @param rtState Runtime state.
+ * @param impl Discovery impl.
+ */
+ ZkAbstractCallabck(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
+ this.rtState = rtState;
+ this.impl = impl;
+
+ busyLock = impl.busyLock;
+ }
+
+ /**
+ * @return {@code True} if is able to start processing.
+ */
+ final boolean onProcessStart() {
+ boolean start = rtState.errForClose == null && busyLock.enterBusy();
+
+ if (!start) {
+ assert rtState.errForClose != null;
+
+ onStartFailed();
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ *
+ */
+ void onStartFailed() {
+ // No-op.
+ }
+
+ /**
+ *
+ */
+ final void onProcessEnd() {
+ busyLock.leaveBusy();
+ }
+
+ /**
+ * @param e Error.
+ */
+ final void onProcessError(Throwable e) {
+ impl.onFatalError(busyLock, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
new file mode 100644
index 0000000..2292e35
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.List;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ *
+ */
+abstract class ZkAbstractChildrenCallback extends ZkAbstractCallabck implements AsyncCallback.Children2Callback {
+ /**
+ * @param rtState Runtime state.
+ * @param impl Discovery impl.
+ */
+ ZkAbstractChildrenCallback(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
+ super(rtState, impl);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
+ if (!onProcessStart())
+ return;
+
+ try {
+ processResult0(rc, path, ctx, children, stat);
+
+ onProcessEnd();
+ }
+ catch (Throwable e) {
+ onProcessError(e);
+ }
+ }
+
+ /**
+ * @param rc
+ * @param path
+ * @param ctx
+ * @param children
+ * @param stat
+ * @throws Exception If failed.
+ */
+ abstract void processResult0(int rc, String path, Object ctx, List<String> children, Stat stat)
+ throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
new file mode 100644
index 0000000..9098d05
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+/**
+ *
+ */
+abstract class ZkAbstractWatcher extends ZkAbstractCallabck implements Watcher {
+ /**
+ * @param rtState Runtime state.
+ * @param impl Discovery impl.
+ */
+ ZkAbstractWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
+ super(rtState, impl);
+ }
+
+ /** {@inheritDoc} */
+ @Override public final void process(WatchedEvent evt) {
+ if (!onProcessStart())
+ return;
+
+ try {
+ process0(evt);
+
+ onProcessEnd();
+ }
+ catch (Throwable e) {
+ onProcessError(e);
+ }
+ }
+
+ /**
+ * @param evt Event.
+ * @throws Exception If failed.
+ */
+ protected abstract void process0(WatchedEvent evt) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
new file mode 100644
index 0000000..d824377
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Zk Alive Node Data.
+ */
+public class ZkAliveNodeData implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ long lastProcEvt = -1;
+
+ /** */
+ transient boolean needUpdate;
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ZkAliveNodeData.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
new file mode 100644
index 0000000..a186aed
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.T2;
+
+/**
+ *
+ */
+class ZkBulkJoinContext {
+ /** */
+ List<T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>>> nodes;
+
+ /**
+ * @param nodeEvtData Node event data.
+ * @param discoData Discovery data for node.
+ */
+ void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map<Integer, Serializable> discoData) {
+ if (nodes == null)
+ nodes = new ArrayList<>();
+
+ nodes.add(new T2<>(nodeEvtData, discoData));
+ }
+
+ /**
+ * @return Number of joined nodes.
+ */
+ int nodes() {
+ return nodes != null ? nodes.size() : 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
new file mode 100644
index 0000000..7e2ea7b
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ * Zk Cluster Nodes.
+ */
+public class ZkClusterNodes {
+ /** */
+ final ConcurrentSkipListMap<Long, ZookeeperClusterNode> nodesByOrder = new ConcurrentSkipListMap<>();
+
+ /** */
+ final ConcurrentSkipListMap<Long, ZookeeperClusterNode> nodesByInternalId = new ConcurrentSkipListMap<>();
+
+ /** */
+ final ConcurrentHashMap<UUID, ZookeeperClusterNode> nodesById = new ConcurrentHashMap<>();
+
+ /**
+ * @return Remote nodes.
+ */
+ public Collection<ClusterNode> remoteNodes() {
+ List<ClusterNode> nodes = new ArrayList<>();
+
+ for (ClusterNode node : nodesById.values()) {
+ if (!node.isLocal())
+ nodes.add(node);
+ }
+
+ return nodes;
+ }
+
+ /**
+ * @return Current nodes in topology.
+ */
+ @SuppressWarnings("unchecked")
+ List<ClusterNode> topologySnapshot() {
+ return new ArrayList<>((Collection)nodesByOrder.values());
+ }
+
+ /**
+ * @param node New node.
+ */
+ void addNode(ZookeeperClusterNode node) {
+ assert node.id() != null : node;
+ assert node.order() > 0 : node;
+
+ ZookeeperClusterNode old = nodesById.put(node.id(), node);
+
+ assert old == null : old;
+
+ old = nodesByOrder.put(node.order(), node);
+
+ assert old == null : old;
+
+ old = nodesByInternalId.put(node.internalId(), node);
+
+ assert old == null : old;
+ }
+
+ /**
+ * @param internalId Node internal ID.
+ * @return Removed node.
+ */
+ ZookeeperClusterNode removeNode(long internalId) {
+ ZookeeperClusterNode node = nodesByInternalId.remove(internalId);
+
+ assert node != null : internalId;
+ assert node.order() > 0 : node;
+
+ Object rvmd = nodesByOrder.remove(node.order());
+
+ assert rvmd != null;
+
+ rvmd = nodesById.remove(node.id());
+
+ assert rvmd != null;
+
+ return node;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
new file mode 100644
index 0000000..9c21f13
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+/**
+ *
+ */
+class ZkCommunicationErrorNodeState implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ final BitSet commState;
+
+ /** */
+ final Exception err;
+
+ /**
+ * @param commState Communication state.
+ * @param err Error if failed get communication state..
+ */
+ ZkCommunicationErrorNodeState(BitSet commState, Exception err) {
+ assert commState != null || err != null;
+
+ this.commState = commState;
+ this.err = err;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
new file mode 100644
index 0000000..accda6e
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Future is created on each node when either connection error occurs or resolve communication error request
+ * received.
+ */
+class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implements IgniteSpiTimeoutObject, Runnable {
+ /** */
+ private final ZookeeperDiscoveryImpl impl;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private final Map<Long, GridFutureAdapter<Boolean>> nodeFuts = new ConcurrentHashMap<>();
+
+ /** */
+ private final long endTime;
+
+ /** */
+ private final IgniteUuid id;
+
+ /** */
+ private State state;
+
+ /** */
+ private long resolveTopVer;
+
+ /** */
+ private Set<Long> resFailedNodes;
+
+ /** */
+ private Exception resErr;
+
+ /** */
+ private ZkDistributedCollectDataFuture collectResFut;
+
+ /**
+ * @param impl Discovery impl.
+ * @param timeout Timeout to wait before initiating resolve process.
+ * @return Future.
+ */
+ static ZkCommunicationErrorProcessFuture createOnCommunicationError(ZookeeperDiscoveryImpl impl, long timeout) {
+ return new ZkCommunicationErrorProcessFuture(impl, State.WAIT_TIMEOUT, timeout);
+ }
+
+ /**
+ * @param impl Discovery impl.
+ * @return Future.
+ */
+ static ZkCommunicationErrorProcessFuture createOnStartResolveRequest(ZookeeperDiscoveryImpl impl) {
+ return new ZkCommunicationErrorProcessFuture(impl, State.RESOLVE_STARTED, 0);
+ }
+
+ /**
+ * @param impl Discovery implementation.
+ * @param state Initial state.
+ * @param timeout Wait timeout before initiating communication errors resolve.
+ */
+ private ZkCommunicationErrorProcessFuture(ZookeeperDiscoveryImpl impl, State state, long timeout) {
+ assert state != State.DONE;
+
+ this.impl = impl;
+ this.log = impl.log();
+
+ if (state == State.WAIT_TIMEOUT) {
+ assert timeout > 0 : timeout;
+
+ id = IgniteUuid.fromUuid(impl.localNode().id());
+ endTime = System.currentTimeMillis() + timeout;
+ }
+ else {
+ id = null;
+ endTime = 0;
+ }
+
+ this.state = state;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteLogger logger() {
+ return log;
+ }
+
+ /**
+ * @param collectResFut Collect nodes' communication status future.
+ */
+ void nodeResultCollectFuture(ZkDistributedCollectDataFuture collectResFut) {
+ assert this.collectResFut == null : collectResFut;
+
+ this.collectResFut = collectResFut;
+ }
+
+ /**
+ * @param top Topology.
+ * @throws Exception If failed.
+ */
+ void onTopologyChange(ZkClusterNodes top) throws Exception {
+ for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : nodeFuts.entrySet()) {
+ if (!top.nodesByOrder.containsKey(e.getKey()))
+ e.getValue().onDone(false);
+ }
+
+ if (collectResFut != null)
+ collectResFut.onTopologyChange(top);
+ }
+
+ /**
+ * @param rtState Runtime state.
+ * @param futPath Future path.
+ * @param nodes Nodes to ping.
+ */
+ void checkConnection(final ZkRuntimeState rtState, final String futPath, List<ClusterNode> nodes) {
+ final TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
+
+ IgniteFuture<BitSet> fut = spi.checkConnection(nodes);
+
+ fut.listen(new IgniteInClosure<IgniteFuture<BitSet>>() {
+ @Override public void apply(final IgniteFuture<BitSet> fut) {
+ // Future completed either from NIO thread or timeout worker, save result from another thread.
+ impl.runInWorkerThread(new ZkRunnable(rtState, impl) {
+ @Override public void run0() throws Exception {
+ BitSet commState = null;
+ Exception err = null;
+
+ try {
+ commState = fut.get();
+ }
+ catch (Exception e) {
+ err = e;
+ }
+
+ ZkCommunicationErrorNodeState state = new ZkCommunicationErrorNodeState(commState, err);
+
+ ZkDistributedCollectDataFuture.saveNodeResult(futPath,
+ rtState.zkClient,
+ impl.localNode().order(),
+ impl.marshalZip(state));
+ }
+
+ @Override void onStartFailed() {
+ onError(rtState.errForClose);
+ }
+ });
+
+ }
+ });
+ }
+
+ /**
+ *
+ */
+ void scheduleCheckOnTimeout() {
+ synchronized (this) {
+ if (state == State.WAIT_TIMEOUT)
+ impl.spi.getSpiContext().addTimeoutObject(this);
+ }
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @return {@code False} if future was already completed and need create another future instance.
+ */
+ boolean onStartResolveRequest(long topVer) {
+ synchronized (this) {
+ if (state == State.DONE)
+ return false;
+
+ if (state == State.WAIT_TIMEOUT)
+ impl.spi.getSpiContext().removeTimeoutObject(this);
+
+ assert resolveTopVer == 0 : resolveTopVer;
+
+ resolveTopVer = topVer;
+
+ state = State.RESOLVE_STARTED;
+ }
+
+ return true;
+ }
+
+ /**
+ * @param err Error.
+ */
+ void onError(Exception err) {
+ assert err != null;
+
+ Map<Long, GridFutureAdapter<Boolean>> futs;
+
+ synchronized (this) {
+ if (state == State.DONE) {
+ assert resErr != null;
+
+ return;
+ }
+
+ state = State.DONE;
+
+ resErr = err;
+
+ futs = nodeFuts; // nodeFuts should not be modified after state changed to DONE.
+ }
+
+ for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : futs.entrySet())
+ e.getValue().onDone(err);
+
+ onDone(err);
+ }
+
+ /**
+ * @param failedNodes Node failed as result of resolve process.
+ */
+ void onFinishResolve(Set<Long> failedNodes) {
+ Map<Long, GridFutureAdapter<Boolean>> futs;
+
+ synchronized (this) {
+ if (state == State.DONE) {
+ assert resErr != null;
+
+ return;
+ }
+
+ assert state == State.RESOLVE_STARTED : state;
+
+ state = State.DONE;
+
+ resFailedNodes = failedNodes;
+
+ futs = nodeFuts; // nodeFuts should not be modified after state changed to DONE.
+ }
+
+ for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : futs.entrySet()) {
+ Boolean res = !F.contains(resFailedNodes, e.getKey());
+
+ e.getValue().onDone(res);
+ }
+
+ onDone();
+ }
+
+ /**
+ * @param node Node.
+ * @return Future finished when communication error resolve is done or {@code null} if another
+ * resolve process should be started.
+ */
+ @Nullable IgniteInternalFuture<Boolean> nodeStatusFuture(ClusterNode node) {
+ GridFutureAdapter<Boolean> fut;
+
+ synchronized (this) {
+ if (state == State.DONE) {
+ if (resolveTopVer != 0 && node.order() <= resolveTopVer) {
+ Boolean res = !F.contains(resFailedNodes, node.order());
+
+ return new GridFinishedFuture<>(res);
+ }
+ else
+ return null;
+ }
+
+ fut = nodeFuts.get(node.order());
+
+ if (fut == null)
+ nodeFuts.put(node.order(), fut = new GridFutureAdapter<>());
+ }
+
+ if (impl.node(node.order()) == null)
+ fut.onDone(false);
+
+ return fut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ // Run from zk discovery worker pool after timeout.
+ if (needProcessTimeout()) {
+ try {
+ UUID reqId = UUID.randomUUID();
+
+ if (log.isInfoEnabled()) {
+ log.info("Initiate cluster-wide communication error resolve process [reqId=" + reqId +
+ ", errNodes=" + nodeFuts.size() + ']');
+ }
+
+ impl.sendCustomMessage(new ZkCommunicationErrorResolveStartMessage(reqId));
+ }
+ catch (Exception e) {
+ Collection<GridFutureAdapter<Boolean>> futs;
+
+ synchronized (this) {
+ if (state != State.WAIT_TIMEOUT)
+ return;
+
+ state = State.DONE;
+ resErr = e;
+
+ futs = nodeFuts.values(); // nodeFuts should not be modified after state changed to DONE.
+ }
+
+ for (GridFutureAdapter<Boolean> fut : futs)
+ fut.onDone(e);
+
+ onDone(e);
+ }
+ }
+ }
+
+ /**
+ * @return {@code True} if need initiate resolve process after timeout expired.
+ */
+ private boolean needProcessTimeout() {
+ synchronized (this) {
+ if (state != State.WAIT_TIMEOUT)
+ return false;
+
+ for (GridFutureAdapter<Boolean> fut : nodeFuts.values()) {
+ if (!fut.isDone())
+ return true;
+ }
+
+ state = State.DONE;
+ }
+
+ onDone(null, null);
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ if (needProcessTimeout())
+ impl.runInWorkerThread(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ impl.clearCommunicationErrorProcessFuture(this);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ZkCommunicationErrorProcessFuture.class, this);
+ }
+
+ /**
+ *
+ */
+ enum State {
+ /** */
+ DONE,
+
+ /** */
+ WAIT_TIMEOUT,
+
+ /** */
+ RESOLVE_STARTED
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
new file mode 100644
index 0000000..9b7476c
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ final UUID futId;
+
+ /** */
+ final long topVer;
+
+ /** */
+ transient ZkCommunicationErrorResolveResult res;
+
+ /**
+ * @param futId Future ID.
+ * @param topVer Topology version when resolve process finished.
+ */
+ ZkCommunicationErrorResolveFinishMessage(UUID futId, long topVer) {
+ this.futId = futId;
+ this.topVer = topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ZkCommunicationErrorResolveFinishMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
new file mode 100644
index 0000000..23495aa
--- /dev/null
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.GridLongList;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class ZkCommunicationErrorResolveResult implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ final GridLongList killedNodes;
+
+ /** */
+ final Exception err;
+
+ /**
+ * @param killedNodes Killed nodes.
+ * @param err Error.
+ */
+ ZkCommunicationErrorResolveResult(@Nullable GridLongList killedNodes, Exception err) {
+ this.killedNodes = killedNodes;
+ this.err = err;
+ }
+}