You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/03/22 13:43:03 UTC

[1/3] ignite git commit: IGNITE-4761: Fix ServiceProcessor hanging on node stop. This closes #1602.

Repository: ignite
Updated Branches:
  refs/heads/apache-master [created] 16153bb8b


IGNITE-4761: Fix ServiceProcessor hanging on node stop. This closes #1602.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0ed4fdac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0ed4fdac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0ed4fdac

Branch: refs/heads/apache-master
Commit: 0ed4fdacc7cc8cf41b7726fc4a42db1a43241285
Parents: bcb1398
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Mar 14 15:50:03 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Mar 14 15:50:03 2017 +0300

----------------------------------------------------------------------
 .../service/GridServiceProcessor.java           | 15 ++--
 .../GridServiceProcessorStopSelfTest.java       | 75 ++++++++++++++++++++
 2 files changed, 83 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0ed4fdac/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 4eeafed..6bcfd65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -315,6 +315,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
         busyLock.block();
 
+        U.shutdownNow(GridServiceProcessor.class, depExe, log);
+
         if (!ctx.clientNode())
             ctx.event().removeLocalEventListener(topLsnr);
 
@@ -352,8 +354,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             }
         }
 
-        U.shutdownNow(GridServiceProcessor.class, depExe, log);
-
         Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
 
         cancelFutures(depFuts, err);
@@ -1399,7 +1399,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                 return;
 
             try {
-                depExe.submit(new BusyRunnable() {
+                depExe.submit(new DepRunnable() {
                     @Override public void run0() {
                         onSystemCacheUpdated(deps);
                     }
@@ -1586,7 +1586,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                 else
                     topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
 
-                depExe.submit(new BusyRunnable() {
+                depExe.submit(new DepRunnable() {
                     @Override public void run0() {
                         ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
 
@@ -1794,12 +1794,15 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /**
      *
      */
-    private abstract class BusyRunnable implements Runnable {
+    private abstract class DepRunnable implements Runnable {
         /** {@inheritDoc} */
         @Override public void run() {
             if (!busyLock.enterBusy())
                 return;
 
+            // Won't block ServiceProcessor stopping process.
+            busyLock.leaveBusy();
+
             svcName.set(null);
 
             try {
@@ -1812,8 +1815,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                     throw t;
             }
             finally {
-                busyLock.leaveBusy();
-
                 svcName.set(null);
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ed4fdac/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
index 92b18ab..ea0ba51 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
@@ -20,10 +20,16 @@ package org.apache.ignite.internal.processors.service;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteServices;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.services.Service;
@@ -91,6 +97,75 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testStopDuringHangedDeployment() throws Exception {
+        final CountDownLatch depLatch = new CountDownLatch(1);
+
+        final CountDownLatch finishLatch = new CountDownLatch(1);
+
+        final IgniteEx node0 = startGrid(0);
+        final IgniteEx node1 = startGrid(1);
+        final IgniteEx node2 = startGrid(2);
+
+        final IgniteCache<Object, Object> cache = node2.getOrCreateCache(new CacheConfiguration<Object, Object>("def")
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        node0.services().deployNodeSingleton("myService", new TestServiceImpl());
+
+        // Guarantee lock owner will never left topology unexpectedly.
+        final Integer lockKey = keyForNode(node2.affinity("def"), new AtomicInteger(1),
+            node2.cluster().localNode());
+
+        // Lock to hold topology version undone.
+        final Lock lock = cache.lock(lockKey);
+
+        // Try to change topology once service has deployed.
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                depLatch.await();
+
+                node1.close();
+
+                return null;
+            }
+        }, "top-change-thread");
+
+        // Stop node on unstable topology.
+        GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                depLatch.await();
+
+                Thread.sleep(1000);
+
+                node0.close();
+
+                finishLatch.countDown();
+
+                return null;
+            }
+        }, "stopping-node-thread");
+
+        assertNotNull(node0.services().service("myService"));
+
+        // Freeze topology changing
+        lock.lock();
+
+        depLatch.countDown();
+
+        boolean wait = finishLatch.await(15, TimeUnit.SECONDS);
+
+        if (!wait)
+            U.dumpThreads(log);
+
+        assertTrue("Deploy future isn't completed", wait);
+
+        fut.get();
+
+        Ignition.stopAll(true);
+    }
+
+    /**
      * Simple map service.
      */
     public interface TestService {


[2/3] ignite git commit: IGNITE-4473 - Client should re-try connection attempt in case of concurrent network failure.

Posted by nt...@apache.org.
IGNITE-4473 - Client should re-try connection attempt in case of concurrent network failure.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d124004d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d124004d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d124004d

Branch: refs/heads/apache-master
Commit: d124004d8b0396a44c26f4c35c263a15880f508c
Parents: 0ed4fda
Author: dkarachentsev <dk...@gridgain.com>
Authored: Fri Mar 17 14:57:48 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Fri Mar 17 14:57:48 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridKernalGatewayImpl.java  |   8 +-
 .../apache/ignite/internal/IgniteKernal.java    | 120 +++++-
 .../internal/IgniteNeedReconnectException.java  |  40 ++
 .../discovery/GridDiscoveryManager.java         |  24 ++
 .../GridCachePartitionExchangeManager.java      |  25 +-
 .../dht/GridDhtAssignmentFetchFuture.java       |  14 +-
 .../GridDhtPartitionsExchangeFuture.java        |  48 ++-
 .../service/GridServiceProcessor.java           |  86 ++---
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 201 ++++++++--
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   5 +
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   8 +
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   9 +
 .../IgniteClientReconnectCacheTest.java         |   7 +-
 .../ignite/internal/IgniteClientRejoinTest.java | 378 +++++++++++++++++++
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  48 ++-
 .../IgniteClientReconnectTestSuite.java         |   2 +
 16 files changed, 929 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
index fe8c580..036954a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
@@ -44,7 +44,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
 
     /** */
     @GridToStringExclude
-    private IgniteFutureImpl<?> reconnectFut;
+    private volatile IgniteFutureImpl<?> reconnectFut;
 
     /** */
     private final AtomicReference<GridKernalState> state = new AtomicReference<>(GridKernalState.STOPPED);
@@ -149,6 +149,12 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
 
     /** {@inheritDoc} */
     @Override public GridFutureAdapter<?> onDisconnected() {
+        if (state.get() == GridKernalState.DISCONNECTED) {
+            assert reconnectFut != null;
+
+            return (GridFutureAdapter<?>)reconnectFut.internalFuture();
+        }
+
         GridFutureAdapter<?> fut = new GridFutureAdapter<>();
 
         reconnectFut = new IgniteFutureImpl<>(fut);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8fda72f..25f7884 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -250,6 +250,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     /** Periodic starvation check interval. */
     private static final long PERIODIC_STARVATION_CHECK_FREQ = 1000 * 30;
 
+    /** Force complete reconnect future. */
+    private static final Object STOP_RECONNECT = new Object();
+
     /** */
     @GridToStringExclude
     private GridKernalContextImpl ctx;
@@ -327,6 +330,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     @GridToStringExclude
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
+    /** */
+    private final ReconnectState reconnectState = new ReconnectState();
+
     /**
      * No-arg constructor is required by externalization.
      */
@@ -930,6 +936,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             // Notify IO manager the second so further components can send and receive messages.
             ctx.io().onKernalStart();
 
+            boolean recon = false;
+
             // Callbacks.
             for (GridComponent comp : ctx) {
                 // Skip discovery manager.
@@ -940,10 +948,24 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 if (comp instanceof GridIoManager)
                     continue;
 
-                if (!skipDaemon(comp))
-                    comp.onKernalStart();
+                if (!skipDaemon(comp)) {
+                    try {
+                        comp.onKernalStart();
+                    }
+                    catch (IgniteNeedReconnectException e) {
+                        assert ctx.discovery().reconnectSupported();
+
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to start node components on node start, will wait for reconnect: " + e);
+
+                        recon = true;
+                    }
+                }
             }
 
+            if (recon)
+                reconnectState.waitFirstReconnect();
+
             // Register MBeans.
             registerKernalMBean();
             registerLocalNodeMBean();
@@ -3274,6 +3296,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     public void onDisconnected() {
         Throwable err = null;
 
+        reconnectState.waitPreviousReconnect();
+
         GridFutureAdapter<?> reconnectFut = ctx.gateway().onDisconnected();
 
         if (reconnectFut == null) {
@@ -3282,9 +3306,18 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             return;
         }
 
-        IgniteFuture<?> userFut = new IgniteFutureImpl<>(reconnectFut);
+        IgniteFutureImpl<?> curFut = (IgniteFutureImpl<?>)ctx.cluster().get().clientReconnectFuture();
+
+        IgniteFuture<?> userFut;
 
-        ctx.cluster().get().clientReconnectFuture(userFut);
+        // In case of previous reconnect did not finish keep reconnect future.
+        if (curFut != null && curFut.internalFuture() == reconnectFut)
+            userFut = curFut;
+        else {
+            userFut = new IgniteFutureImpl<>(reconnectFut);
+
+            ctx.cluster().get().clientReconnectFuture(userFut);
+        }
 
         ctx.disconnected(true);
 
@@ -3337,30 +3370,53 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         try {
             ctx.disconnected(false);
 
-            GridCompoundFuture<?, ?> reconnectFut = new GridCompoundFuture<>();
+            GridCompoundFuture curReconnectFut = reconnectState.curReconnectFut = new GridCompoundFuture<>();
+
+            reconnectState.reconnectDone = new GridFutureAdapter<>();
 
             for (GridComponent comp : ctx.components()) {
                 IgniteInternalFuture<?> fut = comp.onReconnected(clusterRestarted);
 
                 if (fut != null)
-                    reconnectFut.add((IgniteInternalFuture)fut);
+                    curReconnectFut.add(fut);
             }
 
-            reconnectFut.add((IgniteInternalFuture)ctx.cache().context().exchange().reconnectExchangeFuture());
+            curReconnectFut.add(ctx.cache().context().exchange().reconnectExchangeFuture());
+
+            curReconnectFut.markInitialized();
 
-            reconnectFut.markInitialized();
+            final GridFutureAdapter reconnectDone = reconnectState.reconnectDone;
 
-            reconnectFut.listen(new CI1<IgniteInternalFuture<?>>() {
+            curReconnectFut.listen(new CI1<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> fut) {
                     try {
-                        fut.get();
+                        Object res = fut.get();
+
+                        if (res == STOP_RECONNECT)
+                            return;
 
                         ctx.gateway().onReconnected();
+
+                        reconnectState.firstReconnectFut.onDone();
                     }
                     catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to reconnect, will stop node", e);
+                        if (!X.hasCause(e, IgniteNeedReconnectException.class,
+                            IgniteClientDisconnectedCheckedException.class)) {
+                            U.error(log, "Failed to reconnect, will stop node.", e);
+
+                            reconnectState.firstReconnectFut.onDone(e);
 
-                        close();
+                            close();
+                        }
+                        else {
+                            assert ctx.discovery().reconnectSupported();
+
+                            U.error(log, "Failed to finish reconnect, will retry [locNodeId=" + ctx.localNodeId() +
+                                ", err=" + e.getMessage() + ']');
+                        }
+                    }
+                    finally {
+                        reconnectDone.onDone();
                     }
                 }
             });
@@ -3519,6 +3575,46 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         }
     }
 
+    /**
+     *
+     */
+    private class ReconnectState {
+        /** */
+        private final GridFutureAdapter firstReconnectFut = new GridFutureAdapter();
+
+        /** */
+        private GridCompoundFuture<?, Object> curReconnectFut;
+
+        /** */
+        private GridFutureAdapter<?> reconnectDone;
+
+        /**
+         * @throws IgniteCheckedException If failed.
+         */
+        void waitFirstReconnect() throws IgniteCheckedException {
+            firstReconnectFut.get();
+        }
+
+        /**
+         *
+         */
+        void waitPreviousReconnect() {
+            if (curReconnectFut != null && !curReconnectFut.isDone()) {
+                assert reconnectDone != null;
+
+                curReconnectFut.onDone(STOP_RECONNECT);
+
+                try {
+                    reconnectDone.get();
+                }
+                catch (IgniteCheckedException ignote) {
+                    // No-op.
+                }
+            }
+
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteKernal.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java
new file mode 100644
index 0000000..61ab576
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Indicates that node should try reconnect to cluster.
+ */
+public class IgniteNeedReconnectException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param locNode Local node.
+     * @param cause Cause.
+     */
+    public IgniteNeedReconnectException(ClusterNode locNode, @Nullable Throwable cause) {
+        super("Local node need try to reconnect [locNodeId=" + locNode.id() + ']', cause);
+
+        assert locNode.isClient();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9aa4db1..2ec1070 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -112,6 +112,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
 import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
 import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -1891,6 +1892,29 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @return {@code True} if local node client and discovery SPI supports reconnect.
+     */
+    public boolean reconnectSupported() {
+        DiscoverySpi spi = getSpi();
+
+        return ctx.clientNode() && (spi instanceof TcpDiscoverySpi) &&
+            !(((TcpDiscoverySpi) spi).isClientReconnectDisabled());
+    }
+
+    /**
+     * Leave cluster and try to join again.
+     *
+     * @throws IgniteSpiException If failed.
+     */
+    public void reconnect() {
+        assert reconnectSupported();
+
+        DiscoverySpi discoverySpi = getSpi();
+
+        ((TcpDiscoverySpi)discoverySpi).reconnect();
+    }
+
+    /**
      * Updates topology version if current version is smaller than updated.
      *
      * @param updated Updated topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7f11dc4..92142c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
@@ -447,6 +448,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     else
                         U.warn(log, "Still waiting for initial partition map exchange [fut=" + fut + ']');
                 }
+                catch (IgniteNeedReconnectException e) {
+                    throw e;
+                }
+                catch (Exception e) {
+                    if (fut.reconnectOnError(e))
+                        throw new IgniteNeedReconnectException(cctx.localNode(), e);
+
+                    throw e;
+                }
             }
 
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -1690,6 +1700,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                         dumpedObjects++;
                                     }
                                 }
+                                catch (Exception e) {
+                                    if (exchFut.reconnectOnError(e))
+                                        throw new IgniteNeedReconnectException(cctx.localNode(), e);
+
+                                    throw e;
+                                }
                             }
 
 
@@ -1829,7 +1845,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 catch (IgniteInterruptedCheckedException e) {
                     throw e;
                 }
-                catch (IgniteClientDisconnectedCheckedException e) {
+                catch (IgniteClientDisconnectedCheckedException | IgniteNeedReconnectException e) {
+                    assert cctx.discovery().reconnectSupported();
+
+                    U.warn(log,"Local node failed to complete partition map exchange due to " +
+                        "network issues, will try to reconnect to cluster", e);
+
+                    cctx.discovery().reconnect();
+
                     return;
                 }
                 catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index ab8e863..6425bc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -17,15 +17,16 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.UUID;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridNodeOrderComparator;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -202,8 +204,14 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
                         "continue to another node): " + node);
                 }
                 catch (IgniteCheckedException e) {
-                    U.error(log0, "Failed to request affinity assignment from remote node (will " +
-                        "continue to another node): " + node, e);
+                    if (ctx.discovery().reconnectSupported() && X.hasCause(e, IOException.class)) {
+                        onDone(new IgniteNeedReconnectException(ctx.localNode(), e));
+
+                        return;
+                    }
+
+                    U.warn(log0, "Failed to request affinity assignment from remote node (will " +
+                        "continue to another node): " + node);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e945de9..d4f95e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
@@ -39,6 +41,7 @@ import org.apache.ignite.events.CacheEvent;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -54,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -65,7 +67,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -506,10 +508,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             throw e;
         }
+        catch (IgniteNeedReconnectException e) {
+            onDone(e);
+        }
         catch (Throwable e) {
-            U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e);
+            if (reconnectOnError(e))
+                onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+            else {
+                U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e);
 
-            onDone(e);
+                onDone(e);
+            }
 
             if (e instanceof Error)
                 throw (Error)e;
@@ -1297,7 +1306,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             }
         }
         catch (IgniteCheckedException e) {
-            onDone(e);
+            if (reconnectOnError(e))
+                onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+            else
+                onDone(e);
         }
     }
 
@@ -1314,8 +1326,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         }
         catch (IgniteCheckedException e) {
             if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) {
-                log.debug("Failed to send full partition map to node, node left grid " +
-                    "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']');
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send full partition map to node, node left grid " +
+                        "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']');
+
+                return;
+            }
+
+            if (reconnectOnError(e)) {
+                onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
 
                 return;
             }
@@ -1641,6 +1660,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                             }
                         }
                     }
+                    catch (Exception e) {
+                        if (reconnectOnError(e))
+                            onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+                        else
+                            throw e;
+                    }
                     finally {
                         leaveBusy();
                     }
@@ -1652,6 +1677,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         }
     }
 
+    /**
+     * @param e Exception.
+     * @return {@code True} if local node should try reconnect in case of error.
+     */
+    public boolean reconnectOnError(Throwable e) {
+        return X.hasCause(e, IOException.class, IgniteClientDisconnectedCheckedException.class) &&
+            cctx.discovery().reconnectSupported();
+    }
+
     /** {@inheritDoc} */
     @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
         return exchId.compareTo(fut.exchId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 6bcfd65..bd81518 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1498,60 +1498,60 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         }
     }
 
-        /**
-         * Deployment callback.
-         *
-         * @param dep Service deployment.
-         * @param topVer Topology version.
-         */
-        private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
-            // Retry forever.
-            try {
-                AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
+    /**
+     * Deployment callback.
+     *
+     * @param dep Service deployment.
+     * @param topVer Topology version.
+     */
+    private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
+        // Retry forever.
+        try {
+            AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
 
-                // If topology version changed, reassignment will happen from topology event.
-                if (newTopVer.equals(topVer))
-                    reassign(dep, topVer);
-            }
-            catch (IgniteCheckedException e) {
-                if (!(e instanceof ClusterTopologyCheckedException))
-                    log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
+            // If topology version changed, reassignment will happen from topology event.
+            if (newTopVer.equals(topVer))
+                reassign(dep, topVer);
+        }
+        catch (IgniteCheckedException e) {
+            if (!(e instanceof ClusterTopologyCheckedException))
+                log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
 
-                AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
+            AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
 
-                if (!newTopVer.equals(topVer)) {
-                    assert newTopVer.compareTo(topVer) > 0;
+            if (!newTopVer.equals(topVer)) {
+                assert newTopVer.compareTo(topVer) > 0;
 
-                    // Reassignment will happen from topology event.
-                    return;
-                }
+                // Reassignment will happen from topology event.
+                return;
+            }
 
-                ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
-                    private IgniteUuid id = IgniteUuid.randomUuid();
+            ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
+                private IgniteUuid id = IgniteUuid.randomUuid();
 
-                    private long start = System.currentTimeMillis();
+                private long start = System.currentTimeMillis();
 
-                    @Override public IgniteUuid timeoutId() {
-                        return id;
-                    }
+                @Override public IgniteUuid timeoutId() {
+                    return id;
+                }
 
-                    @Override public long endTime() {
-                        return start + RETRY_TIMEOUT;
-                    }
+                @Override public long endTime() {
+                    return start + RETRY_TIMEOUT;
+                }
 
-                    @Override public void onTimeout() {
-                        if (!busyLock.enterBusy())
-                            return;
+                @Override public void onTimeout() {
+                    if (!busyLock.enterBusy())
+                        return;
 
-                        try {
-                            // Try again.
-                            onDeployment(dep, topVer);
-                        }
-                        finally {
-                            busyLock.leaveBusy();
-                        }
+                    try {
+                        // Try again.
+                        onDeployment(dep, topVer);
                     }
-                });
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+            });
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 95e2cda..02ba56a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -129,6 +129,9 @@ class ClientImpl extends TcpDiscoveryImpl {
     /** */
     private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED";
 
+    /** */
+    private static final Object SPI_RECONNECT = "SPI_RECONNECT";
+
     /** Remote nodes. */
     private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
 
@@ -809,6 +812,11 @@ class ClientImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
+    @Override public void reconnect() throws IgniteSpiException {
+        msgWorker.addMessage(SPI_RECONNECT);
+    }
+
+    /** {@inheritDoc} */
     @Override public void brakeConnection() {
         SocketStream sockStream = msgWorker.currSock;
 
@@ -879,9 +887,12 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** */
         private UUID rmtNodeId;
 
+        /** */
+        private CountDownLatch stopReadLatch;
+
         /**
          */
-        protected SocketReader() {
+        SocketReader() {
             super(spi.ignite().name(), "tcp-client-disco-sock-reader", log);
         }
 
@@ -889,7 +900,7 @@ class ClientImpl extends TcpDiscoveryImpl {
          * @param sockStream Socket.
          * @param rmtNodeId Rmt node id.
          */
-        public void setSocket(SocketStream sockStream, UUID rmtNodeId) {
+        void setSocket(SocketStream sockStream, UUID rmtNodeId) {
             synchronized (mux) {
                 this.sockStream = sockStream;
 
@@ -899,6 +910,31 @@ class ClientImpl extends TcpDiscoveryImpl {
             }
         }
 
+        /**
+         * @throws InterruptedException If interrupted.
+         */
+        private void forceStopRead() throws InterruptedException {
+            CountDownLatch stopReadLatch;
+
+            synchronized (mux) {
+                SocketStream stream = sockStream;
+
+                if (stream == null)
+                    return;
+
+                this.stopReadLatch = stopReadLatch = new CountDownLatch(1);
+
+                U.closeQuiet(stream.socket());
+
+                this.sockStream = null;
+                this.rmtNodeId = null;
+
+                mux.notifyAll();
+            }
+
+            stopReadLatch.await();
+        }
+
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
             while (!isInterrupted()) {
@@ -906,6 +942,12 @@ class ClientImpl extends TcpDiscoveryImpl {
                 UUID rmtNodeId;
 
                 synchronized (mux) {
+                    if (stopReadLatch != null) {
+                        stopReadLatch.countDown();
+
+                        stopReadLatch = null;
+                    }
+
                     if (this.sockStream == null) {
                         mux.wait();
 
@@ -1007,18 +1049,21 @@ class ClientImpl extends TcpDiscoveryImpl {
         private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
 
         /** */
-        private final long socketTimeout;
+        private final long sockTimeout;
 
         /** */
         private TcpDiscoveryAbstractMessage unackedMsg;
 
+        /** */
+        private CountDownLatch forceLeaveLatch;
+
         /**
          *
          */
-        protected SocketWriter() {
+        SocketWriter() {
             super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
 
-            socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+            sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
                 spi.getSocketTimeout();
         }
 
@@ -1034,6 +1079,29 @@ class ClientImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * Sends {@link TcpDiscoveryNodeLeftMessage} and closes socket.
+         *
+         * @throws InterruptedException If interrupted.
+         */
+        private void forceLeave() throws InterruptedException {
+            CountDownLatch forceLeaveLatch;
+
+            synchronized (mux) {
+                // If writer was stopped.
+                if (sock == null)
+                    return;
+
+                this.forceLeaveLatch = forceLeaveLatch = new CountDownLatch(1);
+
+                unackedMsg = null;
+
+                mux.notifyAll();
+            }
+
+            forceLeaveLatch.await();
+        }
+
+        /**
          * @param sock Socket.
          * @param clientAck {@code True} is server supports client message acknowlede.
          */
@@ -1089,13 +1157,41 @@ class ClientImpl extends TcpDiscoveryImpl {
                         continue;
                     }
 
-                    msg = queue.poll();
+                    if (forceLeaveLatch != null) {
+                        msg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
 
-                    if (msg == null) {
-                        mux.wait();
+                        msg.client(true);
+
+                        try {
+                            spi.writeToSocket(
+                                sock,
+                                msg,
+                                sockTimeout);
+                        }
+                        catch (IOException | IgniteCheckedException e) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Failed to send TcpDiscoveryNodeLeftMessage on force leave [msg=" + msg +
+                                    ", err=" + e.getMessage() + ']');
+                            }
+                        }
+
+                        U.closeQuiet(sock);
+
+                        this.sock = null;
+
+                        clear();
 
                         continue;
                     }
+                    else {
+                        msg = queue.poll();
+
+                        if (msg == null) {
+                            mux.wait();
+
+                            continue;
+                        }
+                    }
                 }
 
                 for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs)
@@ -1115,7 +1211,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     spi.writeToSocket(
                         sock,
                         msg,
-                        socketTimeout);
+                        sockTimeout);
 
                     msg = null;
 
@@ -1165,10 +1261,30 @@ class ClientImpl extends TcpDiscoveryImpl {
                     synchronized (mux) {
                         if (sock == this.sock)
                             this.sock = null; // Connection has dead.
+
+                        clear();
                     }
                 }
             }
         }
+
+        /**
+         *
+         */
+        private void clear() {
+            assert Thread.holdsLock(mux);
+
+            queue.clear();
+            unackedMsg = null;
+
+            CountDownLatch forceLeaveLatch = this.forceLeaveLatch;
+
+            if (forceLeaveLatch != null) {
+                this.forceLeaveLatch = null;
+
+                forceLeaveLatch.countDown();
+            }
+        }
     }
 
     /**
@@ -1413,6 +1529,38 @@ class ClientImpl extends TcpDiscoveryImpl {
                         else
                             leaveLatch.countDown();
                     }
+                    else if (msg == SPI_RECONNECT) {
+                        if (state == CONNECTED) {
+                            if (reconnector != null) {
+                                reconnector.cancel();
+                                reconnector.join();
+
+                                reconnector = null;
+                            }
+
+                            sockWriter.forceLeave();
+                            sockReader.forceStopRead();
+
+                            currSock = null;
+
+                            queue.clear();
+
+                            onDisconnected();
+
+                            notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+
+                            UUID newId = UUID.randomUUID();
+
+                            U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " +
+                                "to network problems [newId=" + newId +
+                                ", prevId=" + locNode.id() +
+                                ", locNode=" + locNode+ ']');
+
+                            locNode.onClientDisconnected(newId);
+
+                            tryJoin();
+                        }
+                    }
                     else if (msg instanceof TcpDiscoveryNodeFailedMessage &&
                         ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) {
                         TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg;
@@ -1495,20 +1643,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                                         ", failMsg=" + forceFailMsg + ']');
                                 }
 
-                                state = DISCONNECTED;
-
-                                nodeAdded = false;
-
-                                IgniteClientDisconnectedCheckedException err =
-                                    new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
-                                    "client node disconnected.");
-
-                                for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
-                                    GridFutureAdapter<Boolean> fut = e.getValue();
-
-                                    if (pingFuts.remove(e.getKey(), fut))
-                                        fut.onDone(err);
-                                }
+                                onDisconnected();
 
                                 notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
                             }
@@ -1604,6 +1739,26 @@ class ClientImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         *
+         */
+        private void onDisconnected() {
+            state = DISCONNECTED;
+
+            nodeAdded = false;
+
+            IgniteClientDisconnectedCheckedException err =
+                new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
+                    "client node disconnected.");
+
+            for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
+                GridFutureAdapter<Boolean> fut = e.getValue();
+
+                if (pingFuts.remove(e.getKey(), fut))
+                    fut.onDone(err);
+            }
+        }
+
+        /**
          * @throws InterruptedException If interrupted.
          */
         private void tryJoin() throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 4600be0..afd1c2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1610,6 +1610,11 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
+    @Override public void reconnect() throws IgniteSpiException {
+        throw new UnsupportedOperationException("Reconnect is not supported for server.");
+    }
+
+    /** {@inheritDoc} */
     @Override protected IgniteSpiThread workerThread() {
         return msgWorker;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index f199c20..84c2ff2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -259,6 +260,13 @@ abstract class TcpDiscoveryImpl {
     }
 
     /**
+     * Leave cluster and try to join again.
+     *
+     * @throws IgniteSpiException If failed.
+     */
+    public abstract void reconnect() throws IgniteSpiException;
+
+    /**
      * <strong>FOR TEST ONLY!!!</strong>
      * <p>
      * Simulates this node failure by stopping service threads. So, node will become

http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 00ae97d..a2a47fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1927,6 +1927,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     }
 
     /**
+     * Force reconnect to cluster.
+     *
+     * @throws IgniteSpiException If failed.
+     */
+    public void reconnect() throws IgniteSpiException {
+        impl.reconnect();
+    }
+
+    /**
      * <strong>FOR TEST ONLY!!!</strong>
      */
     public int clientWorkerCount() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 0f0165b..6cdf465 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -700,9 +700,12 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
                 try {
                     Ignition.start(optimize(getConfiguration(getTestGridName(SRV_CNT))));
 
-                    fail();
+                    // Commented due to IGNITE-4473, because
+                    // IgniteClientDisconnectedException won't
+                    // be thrown, but client will reconnect.
+//                    fail();
 
-                    return false;
+                    return true;
                 }
                 catch (IgniteClientDisconnectedException e) {
                     log.info("Expected start error: " + e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
new file mode 100644
index 0000000..a5d42e9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests client to be able restore connection to cluster if coordination is not available.
+ */
+public class IgniteClientRejoinTest extends GridCommonAbstractTest {
+    /** Block. */
+    private volatile boolean block;
+
+    /** Block all. */
+    private volatile boolean blockAll;
+
+    /** Coordinator. */
+    private volatile ClusterNode crd;
+
+    /** Client reconnect disabled. */
+    private boolean clientReconnectDisabled;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        System.setProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK", "true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        System.clearProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        clientReconnectDisabled = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (gridName.contains("client")) {
+            cfg.setCommunicationSpi(new TcpCommunicationSpi());
+
+            TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+            DiscoverySpi dspi = new DiscoverySpi();
+
+            dspi.setIpFinder(spi.getIpFinder());
+
+            cfg.setDiscoverySpi(dspi);
+
+            dspi.setJoinTimeout(60_000);
+            dspi.setClientReconnectDisabled(clientReconnectDisabled);
+
+            cfg.setClientMode(true);
+        }
+
+        // TODO: IGNITE-4833
+        cfg.setPeerClassLoadingEnabled(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientsReconnectAfterStart() throws Exception {
+        Ignite srv1 = startGrid("server1");
+
+        crd = ((IgniteKernal)srv1).localNode();
+
+        Ignite srv2 = startGrid("server2");
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        List<Ignite> clientNodes = new ArrayList<>();
+
+        final int CLIENTS_NUM = 5;
+
+        for (int i = 0; i < CLIENTS_NUM; i++)
+            clientNodes.add(startGrid("client" + i));
+
+        blockAll = true;
+
+        GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                U.sleep(5_000);
+
+                block = true;
+                blockAll = false;
+
+                System.out.println(">>> Allow with blocked coordinator.");
+
+                latch.countDown();
+
+                return null;
+            }
+        });
+
+        IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                latch.await();
+
+                U.sleep((new Random().nextInt(15) + 30) * 1000);
+
+                block = false;
+
+                System.out.println(">>> Allow coordinator.");
+
+                return null;
+            }
+        });
+
+        fut.get();
+
+        for (Ignite client : clientNodes) {
+            while (true) {
+                try {
+                    IgniteCache<Integer, Integer> cache = client.getOrCreateCache("some");
+
+                    for (int i = 0; i < 100; i++)
+                        cache.put(i, i);
+
+                    for (int i = 0; i < 100; i++)
+                        assertEquals((Integer)i, cache.get(i));
+
+                    cache.clear();
+
+                    break;
+                }
+                catch (IgniteClientDisconnectedException e) {
+                    e.reconnectFuture().get();
+                }
+            }
+        }
+
+        assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size());
+        assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientsReconnect() throws Exception {
+        Ignite srv1 = startGrid("server1");
+
+        crd = ((IgniteKernal)srv1).localNode();
+
+        Ignite srv2 = startGrid("server2");
+
+        block = true;
+
+        List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        final int CLIENTS_NUM = 5;
+
+        for (int i = 0; i < CLIENTS_NUM; i++) {
+            final int idx = i;
+
+            IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
+                @Override public Ignite call() throws Exception {
+                    latch.await();
+
+                    return startGrid("client" + idx);
+                }
+            });
+
+            futs.add(fut);
+        }
+
+        GridTestUtils.runAsync(new Callable<Boolean>() {
+            @Override public Boolean call() throws Exception {
+                latch.countDown();
+
+                Random rnd = new Random();
+
+                U.sleep((rnd.nextInt(15) + 15) * 1000);
+
+                block = false;
+
+                System.out.println(">>> ALLOW connection to coordinator.");
+
+                return true;
+            }
+        });
+
+        for (IgniteInternalFuture<Ignite> clientFut : futs) {
+            Ignite client = clientFut.get();
+
+            IgniteCache<Integer, Integer> cache = client.getOrCreateCache(client.name());
+
+            for (int i = 0; i < 100; i++)
+                cache.put(i, i);
+
+            for (int i = 0; i < 100; i++)
+                assert i == cache.get(i);
+        }
+
+        assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size());
+        assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientsReconnectDisabled() throws Exception {
+        clientReconnectDisabled = true;
+
+        Ignite srv1 = startGrid("server1");
+
+        crd = ((IgniteKernal)srv1).localNode();
+
+        Ignite srv2 = startGrid("server2");
+
+        block = true;
+
+        List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        final int CLIENTS_NUM = 5;
+
+        for (int i = 0; i < CLIENTS_NUM; i++) {
+            final int idx = i;
+
+            IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
+                @Override public Ignite call() throws Exception {
+                    latch.await();
+
+                    return startGrid("client" + idx);
+                }
+            });
+
+            futs.add(fut);
+        }
+
+        latch.countDown();
+
+        for (final IgniteInternalFuture<Ignite> clientFut : futs) {
+            //noinspection ThrowableNotThrown
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    clientFut.get();
+
+                    return null;
+                }
+            }, IgniteCheckedException.class, null);
+        }
+
+        assertEquals(0, srv1.cluster().forClients().nodes().size());
+        assertEquals(0, srv2.cluster().forClients().nodes().size());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 3 * 60_000;
+    }
+
+    /**
+     *
+     */
+    private class TcpCommunicationSpi extends org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+            if (blockAll || block && node.id().equals(crd.id()))
+                throw new IgniteSpiException(new SocketException("Test communication exception"));
+
+            super.sendMessage(node, msg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg,
+            IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+            if (blockAll || block && node.id().equals(crd.id()))
+                throw new IgniteSpiException(new SocketException("Test communication exception"));
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+
+    /**
+     *
+     */
+    private class DiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data,
+            long timeout) throws IOException {
+            if (blockAll || block && sock.getPort() == 47500)
+                throw new SocketException("Test discovery exception");
+
+            super.writeToSocket(sock, msg, data, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (blockAll || block && sock.getPort() == 47500)
+                throw new SocketException("Test discovery exception");
+
+            super.writeToSocket(sock, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (blockAll || block && sock.getPort() == 47500)
+                throw new SocketException("Test discovery exception");
+
+            super.writeToSocket(sock, out, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
+            long timeout) throws IOException {
+            if (blockAll || block && sock.getPort() == 47500)
+                throw new SocketException("Test discovery exception");
+
+            super.writeToSocket(msg, sock, res, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr,
+            IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException {
+            if (blockAll || block && sock.getPort() == 47500)
+                throw new SocketException("Test discovery exception");
+
+            return super.openSocket(sock, remAddr, timeoutHelper);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 331b581..0483a1c 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.IgniteState;
@@ -43,6 +44,7 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -1788,8 +1790,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         clientNodeIds.add(client.cluster().localNode().id());
 
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override
-            public boolean apply() {
+            @Override public boolean apply() {
                 return srv.cluster().nodes().size() == 2;
             }
         }, awaitTime());
@@ -1800,6 +1801,49 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testForceClientReconnect() throws Exception {
+        startServerNodes(1);
+
+        startClientNodes(1);
+
+        Ignite srv = G.ignite("server-0");
+        IgniteKernal client = (IgniteKernal)G.ignite("client-0");
+
+        UUID clientId = F.first(clientNodeIds);
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        srv.events().enableLocal(EVT_NODE_JOINED);
+
+        srv.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                latch.countDown();
+
+                return false;
+            }
+        }, EVT_NODE_JOINED);
+
+        client.context().discovery().reconnect();
+
+        assert latch.await(10, TimeUnit.SECONDS);
+
+        while (true) {
+            try {
+                UUID newId = client.localNode().id();
+
+                assert !clientId.equals(newId) : clientId;
+
+                break;
+            }
+            catch (IgniteClientDisconnectedException e) {
+                e.reconnectFuture().get(10_000);
+            }
+        }
+    }
+
+    /**
      * @param ignite Ignite.
      * @throws Exception If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
index ea8e37b..67d88e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.IgniteClientReconnectFailoverTest;
 import org.apache.ignite.internal.IgniteClientReconnectServicesTest;
 import org.apache.ignite.internal.IgniteClientReconnectStopTest;
 import org.apache.ignite.internal.IgniteClientReconnectStreamerTest;
+import org.apache.ignite.internal.IgniteClientRejoinTest;
 
 /**
  *
@@ -52,6 +53,7 @@ public class IgniteClientReconnectTestSuite extends TestSuite {
         suite.addTestSuite(IgniteClientReconnectServicesTest.class);
         suite.addTestSuite(IgniteClientReconnectStreamerTest.class);
         suite.addTestSuite(IgniteClientReconnectFailoverTest.class);
+        suite.addTestSuite(IgniteClientRejoinTest.class);
 
         return suite;
     }


[3/3] ignite git commit: Merge branch 'ignite-1.7.9-p1' into apache-master

Posted by nt...@apache.org.
Merge branch 'ignite-1.7.9-p1' into apache-master

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
#	modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
#	modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16153bb8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16153bb8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16153bb8

Branch: refs/heads/apache-master
Commit: 16153bb8bae9153ae979c13a93e60daab240d5ee
Parents: 117e18e d124004
Author: dkarachentsev <dk...@gridgain.com>
Authored: Wed Mar 22 16:36:27 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Wed Mar 22 16:36:27 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridKernalGatewayImpl.java  |   8 +-
 .../apache/ignite/internal/IgniteKernal.java    | 120 +++++-
 .../internal/IgniteNeedReconnectException.java  |  40 ++
 .../discovery/GridDiscoveryManager.java         |  24 ++
 .../GridCachePartitionExchangeManager.java      |  25 +-
 .../dht/GridDhtAssignmentFetchFuture.java       |  13 +-
 .../GridDhtPartitionsExchangeFuture.java        |  46 ++-
 .../service/GridServiceProcessor.java           | 101 ++---
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 203 ++++++++--
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   5 +
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   8 +
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   9 +
 .../IgniteClientReconnectCacheTest.java         |   7 +-
 .../ignite/internal/IgniteClientRejoinTest.java | 378 +++++++++++++++++++
 .../GridServiceProcessorStopSelfTest.java       |  75 ++++
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  48 ++-
 .../IgniteClientReconnectTestSuite.java         |   2 +
 17 files changed, 1013 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 2a6706e,25f7884..0ea6ea4
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@@ -3620,25 -3576,45 +3676,65 @@@ public class IgniteKernal implements Ig
      }
  
      /**
 +     * @param node Node.
 +     * @param payload Message payload.
 +     * @param procFromNioThread If {@code true} message is processed from NIO thread.
 +     * @return Response future.
 +     */
 +    public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) {
 +        return ctx.io().sendIoTest(node, payload, procFromNioThread);
 +    }
 +
 +    /**
 +     * @param nodes Nodes.
 +     * @param payload Message payload.
 +     * @param procFromNioThread If {@code true} message is processed from NIO thread.
 +     * @return Response future.
 +     */
 +    public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) {
 +        return ctx.io().sendIoTest(nodes, payload, procFromNioThread);
 +    }
 +
++    /**
+      *
+      */
+     private class ReconnectState {
+         /** */
+         private final GridFutureAdapter firstReconnectFut = new GridFutureAdapter();
+ 
+         /** */
+         private GridCompoundFuture<?, Object> curReconnectFut;
+ 
+         /** */
+         private GridFutureAdapter<?> reconnectDone;
+ 
+         /**
+          * @throws IgniteCheckedException If failed.
+          */
+         void waitFirstReconnect() throws IgniteCheckedException {
+             firstReconnectFut.get();
+         }
+ 
+         /**
+          *
+          */
+         void waitPreviousReconnect() {
+             if (curReconnectFut != null && !curReconnectFut.isDone()) {
+                 assert reconnectDone != null;
+ 
+                 curReconnectFut.onDone(STOP_RECONNECT);
+ 
+                 try {
+                     reconnectDone.get();
+                 }
+                 catch (IgniteCheckedException ignote) {
+                     // No-op.
+                 }
+             }
+ 
+         }
+     }
+ 
      /** {@inheritDoc} */
      @Override public String toString() {
          return S.toString(IgniteKernal.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index d637de4,2ec1070..b2c4ced
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@@ -108,8 -112,8 +108,9 @@@ import org.apache.ignite.spi.discovery.
  import org.apache.ignite.spi.discovery.DiscoverySpiListener;
  import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
  import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
  import org.apache.ignite.thread.IgniteThread;
 +import org.jetbrains.annotations.NotNull;
  import org.jetbrains.annotations.Nullable;
  import org.jsr166.ConcurrentHashMap8;
  
@@@ -1903,114 -1892,29 +1904,137 @@@ public class GridDiscoveryManager exten
      }
  
      /**
+      * @return {@code True} if local node client and discovery SPI supports reconnect.
+      */
+     public boolean reconnectSupported() {
+         DiscoverySpi spi = getSpi();
+ 
+         return ctx.clientNode() && (spi instanceof TcpDiscoverySpi) &&
+             !(((TcpDiscoverySpi) spi).isClientReconnectDisabled());
+     }
+ 
+     /**
+      * Leave cluster and try to join again.
+      *
+      * @throws IgniteSpiException If failed.
+      */
+     public void reconnect() {
+         assert reconnectSupported();
+ 
+         DiscoverySpi discoverySpi = getSpi();
+ 
+         ((TcpDiscoverySpi)discoverySpi).reconnect();
+     }
+ 
+     /**
 +     * @param loc Local node.
 +     * @param topSnapshot Topology snapshot.
 +     * @return Newly created discovery cache.
 +     */
 +    @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode> topSnapshot) {
 +        HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
 +        HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size());
 +
 +        ArrayList<ClusterNode> daemonNodes = new ArrayList<>(topSnapshot.size());
 +        ArrayList<ClusterNode> srvNodes = new ArrayList<>(topSnapshot.size());
 +        ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size());
 +        ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size());
 +
 +        for (ClusterNode node : topSnapshot) {
 +            if (alive(node))
 +                alives.add(node.id());
 +
 +            if (node.isDaemon())
 +                daemonNodes.add(node);
 +            else {
 +                allNodes.add(node);
 +
 +                if (!node.isLocal())
 +                    rmtNodes.add(node);
 +
 +                if (!CU.clientNode(node))
 +                    srvNodes.add(node);
 +            }
 +
 +            nodeMap.put(node.id(), node);
 +        }
 +
 +        assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" +
 +            " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']';
 +
 +        Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
 +        Map<Integer, List<ClusterNode>> affCacheNodes = U.newHashMap(allNodes.size());
 +
 +        Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
 +        Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
 +        Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
 +
 +        Set<Integer> nearEnabledCaches = new HashSet<>();
 +
 +        for (ClusterNode node : allNodes) {
 +            assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
 +            assert !node.isDaemon();
 +
 +            for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
 +                String cacheName = entry.getKey();
 +                CachePredicate filter = entry.getValue();
 +
 +                if (filter.cacheNode(node)) {
 +                    allNodesWithCaches.add(node);
 +
 +                    if(!CU.clientNode(node))
 +                        srvNodesWithCaches.add(node);
 +
 +                    if (!node.isLocal())
 +                        rmtNodesWithCaches.add(node);
 +
 +                    addToMap(allCacheNodes, cacheName, node);
 +
 +                    if (filter.dataNode(node))
 +                        addToMap(affCacheNodes, cacheName, node);
 +
 +                    if (filter.nearNode(node))
 +                        nearEnabledCaches.add(CU.cacheId(cacheName));
 +                }
 +            }
 +        }
 +
 +        return new DiscoCache(
 +            loc,
 +            Collections.unmodifiableList(rmtNodes),
 +            Collections.unmodifiableList(allNodes),
 +            Collections.unmodifiableList(srvNodes),
 +            Collections.unmodifiableList(daemonNodes),
 +            U.sealList(srvNodesWithCaches),
 +            U.sealList(allNodesWithCaches),
 +            U.sealList(rmtNodesWithCaches),
 +            Collections.unmodifiableMap(allCacheNodes),
 +            Collections.unmodifiableMap(affCacheNodes),
 +            Collections.unmodifiableMap(nodeMap),
 +            Collections.unmodifiableSet(nearEnabledCaches),
 +            alives);
 +    }
 +
 +    /**
 +     * Adds node to map.
 +     *
 +     * @param cacheMap Map to add to.
 +     * @param cacheName Cache name.
 +     * @param rich Node to add
 +     */
 +    private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
 +        List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
 +
 +        if (cacheNodes == null) {
 +            cacheNodes = new ArrayList<>();
 +
 +            cacheMap.put(CU.cacheId(cacheName), cacheNodes);
 +        }
 +
 +        cacheNodes.add(rich);
 +    }
 +
 +    /**
       * Updates topology version if current version is smaller than updated.
       *
       * @param updated Updated topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 50937a8,d4f95e5..5eacc36
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -1677,12 -1677,16 +1704,21 @@@ public class GridDhtPartitionsExchangeF
          }
      }
  
+     /**
+      * @param e Exception.
+      * @return {@code True} if local node should try reconnect in case of error.
+      */
+     public boolean reconnectOnError(Throwable e) {
+         return X.hasCause(e, IOException.class, IgniteClientDisconnectedCheckedException.class) &&
+             cctx.discovery().reconnectSupported();
+     }
+ 
      /** {@inheritDoc} */
 +    @Override public boolean isExchange() {
 +        return true;
 +    }
 +
 +    /** {@inheritDoc} */
      @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
          return exchId.compareTo(fut.exchId);
      }

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 84fb8e3,bd81518..e0a5c7c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@@ -317,8 -315,10 +317,10 @@@ public class GridServiceProcessor exten
  
          busyLock.block();
  
+         U.shutdownNow(GridServiceProcessor.class, depExe, log);
+ 
          if (!ctx.clientNode())
 -            ctx.event().removeLocalEventListener(topLsnr);
 +            ctx.event().removeDiscoveryEventListener(topLsnr);
  
          Collection<ServiceContextImpl> ctxs = new ArrayList<>();
  
@@@ -1401,7 -1399,7 +1401,7 @@@
                  return;
  
              try {
-                 depExe.execute(new BusyRunnable() {
 -                depExe.submit(new DepRunnable() {
++                depExe.execute(new DepRunnable() {
                      @Override public void run0() {
                          onSystemCacheUpdated(deps);
                      }
@@@ -1582,18 -1582,13 +1582,18 @@@
                      }
                      else
                          return;
 +
 +                    topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
                  }
                  else
 -                    topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
 +                    topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
  
-                 depExe.execute(new BusyRunnable() {
 -                depExe.submit(new DepRunnable() {
++                depExe.execute(new DepRunnable() {
                      @Override public void run0() {
 -                        ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
 +                        // In case the cache instance isn't tracked by DiscoveryManager anymore.
 +                        discoCache.updateAlives(ctx.discovery());
 +
 +                        ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
  
                          if (oldest != null && oldest.isLocal()) {
                              final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 04b076d,02ba56a..feb3e48
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@@ -1100,6 -1183,15 +1186,16 @@@ class ClientImpl extends TcpDiscoveryIm
  
                          continue;
                      }
+                     else {
 -                        msg = queue.poll();
++                        if (msg == null)
++                            msg = queue.poll();
+ 
+                         if (msg == null) {
+                             mux.wait();
+ 
+                             continue;
+                         }
+                     }
                  }
  
                  for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs)

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 682d2d7,6cdf465..01aa256
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@@ -698,11 -698,14 +698,14 @@@ public class IgniteClientReconnectCache
          IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>() {
              @Override public Boolean call() throws Exception {
                  try {
 -                    Ignition.start(optimize(getConfiguration(getTestGridName(SRV_CNT))));
 +                    Ignition.start(optimize(getConfiguration(getTestIgniteInstanceName(SRV_CNT))));
  
-                     fail();
+                     // Commented due to IGNITE-4473, because
+                     // IgniteClientDisconnectedException won't
+                     // be thrown, but client will reconnect.
+ //                    fail();
  
-                     return false;
+                     return true;
                  }
                  catch (IgniteClientDisconnectedException e) {
                      log.info("Expected start error: " + e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------