You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/04/03 08:31:03 UTC
[03/50] [abbrv] ignite git commit: IGNITE-4473 - Client should re-try
connection attempt in case of concurrent network failure.
IGNITE-4473 - Client should re-try connection attempt in case of concurrent network failure.
(cherry picked from commit d124004)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/94641931
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/94641931
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/94641931
Branch: refs/heads/master
Commit: 946419314b567c604a15ae4f9658d89bc350127b
Parents: 59ea1c2
Author: dkarachentsev <dk...@gridgain.com>
Authored: Fri Mar 17 14:57:48 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Fri Mar 17 15:20:20 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/GridKernalGatewayImpl.java | 8 +-
.../apache/ignite/internal/IgniteKernal.java | 120 +++++-
.../internal/IgniteNeedReconnectException.java | 40 ++
.../discovery/GridDiscoveryManager.java | 24 ++
.../GridCachePartitionExchangeManager.java | 25 +-
.../dht/GridDhtAssignmentFetchFuture.java | 14 +-
.../GridDhtPartitionsExchangeFuture.java | 48 ++-
.../service/GridServiceProcessor.java | 86 ++---
.../ignite/spi/discovery/tcp/ClientImpl.java | 201 ++++++++--
.../ignite/spi/discovery/tcp/ServerImpl.java | 5 +
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 8 +
.../spi/discovery/tcp/TcpDiscoverySpi.java | 9 +
.../IgniteClientReconnectCacheTest.java | 7 +-
.../ignite/internal/IgniteClientRejoinTest.java | 378 +++++++++++++++++++
.../tcp/TcpClientDiscoverySpiSelfTest.java | 48 ++-
.../IgniteClientReconnectTestSuite.java | 2 +
16 files changed, 929 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
index fe8c580..036954a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
@@ -44,7 +44,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
/** */
@GridToStringExclude
- private IgniteFutureImpl<?> reconnectFut;
+ private volatile IgniteFutureImpl<?> reconnectFut;
/** */
private final AtomicReference<GridKernalState> state = new AtomicReference<>(GridKernalState.STOPPED);
@@ -149,6 +149,12 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
/** {@inheritDoc} */
@Override public GridFutureAdapter<?> onDisconnected() {
+ if (state.get() == GridKernalState.DISCONNECTED) {
+ assert reconnectFut != null;
+
+ return (GridFutureAdapter<?>)reconnectFut.internalFuture();
+ }
+
GridFutureAdapter<?> fut = new GridFutureAdapter<>();
reconnectFut = new IgniteFutureImpl<>(fut);
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 4972d1f..063fe25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -250,6 +250,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/** Periodic starvation check interval. */
private static final long PERIODIC_STARVATION_CHECK_FREQ = 1000 * 30;
+ /** Force complete reconnect future. */
+ private static final Object STOP_RECONNECT = new Object();
+
/** */
@GridToStringExclude
private GridKernalContextImpl ctx;
@@ -327,6 +330,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
@GridToStringExclude
private final AtomicBoolean stopGuard = new AtomicBoolean();
+ /** */
+ private final ReconnectState reconnectState = new ReconnectState();
+
/**
* No-arg constructor is required by externalization.
*/
@@ -936,6 +942,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
// Notify IO manager the second so further components can send and receive messages.
ctx.io().onKernalStart();
+ boolean recon = false;
+
// Callbacks.
for (GridComponent comp : ctx) {
// Skip discovery manager.
@@ -946,10 +954,24 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
if (comp instanceof GridIoManager)
continue;
- if (!skipDaemon(comp))
- comp.onKernalStart();
+ if (!skipDaemon(comp)) {
+ try {
+ comp.onKernalStart();
+ }
+ catch (IgniteNeedReconnectException e) {
+ assert ctx.discovery().reconnectSupported();
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to start node components on node start, will wait for reconnect: " + e);
+
+ recon = true;
+ }
+ }
}
+ if (recon)
+ reconnectState.waitFirstReconnect();
+
// Register MBeans.
registerKernalMBean();
registerLocalNodeMBean();
@@ -3309,6 +3331,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
public void onDisconnected() {
Throwable err = null;
+ reconnectState.waitPreviousReconnect();
+
GridFutureAdapter<?> reconnectFut = ctx.gateway().onDisconnected();
if (reconnectFut == null) {
@@ -3317,9 +3341,18 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
return;
}
- IgniteFuture<?> userFut = new IgniteFutureImpl<>(reconnectFut);
+ IgniteFutureImpl<?> curFut = (IgniteFutureImpl<?>)ctx.cluster().get().clientReconnectFuture();
+
+ IgniteFuture<?> userFut;
- ctx.cluster().get().clientReconnectFuture(userFut);
+ // In case of previous reconnect did not finish keep reconnect future.
+ if (curFut != null && curFut.internalFuture() == reconnectFut)
+ userFut = curFut;
+ else {
+ userFut = new IgniteFutureImpl<>(reconnectFut);
+
+ ctx.cluster().get().clientReconnectFuture(userFut);
+ }
ctx.disconnected(true);
@@ -3372,30 +3405,53 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
try {
ctx.disconnected(false);
- GridCompoundFuture<?, ?> reconnectFut = new GridCompoundFuture<>();
+ GridCompoundFuture curReconnectFut = reconnectState.curReconnectFut = new GridCompoundFuture<>();
+
+ reconnectState.reconnectDone = new GridFutureAdapter<>();
for (GridComponent comp : ctx.components()) {
IgniteInternalFuture<?> fut = comp.onReconnected(clusterRestarted);
if (fut != null)
- reconnectFut.add((IgniteInternalFuture)fut);
+ curReconnectFut.add(fut);
}
- reconnectFut.add((IgniteInternalFuture)ctx.cache().context().exchange().reconnectExchangeFuture());
+ curReconnectFut.add(ctx.cache().context().exchange().reconnectExchangeFuture());
+
+ curReconnectFut.markInitialized();
- reconnectFut.markInitialized();
+ final GridFutureAdapter reconnectDone = reconnectState.reconnectDone;
- reconnectFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ curReconnectFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
try {
- fut.get();
+ Object res = fut.get();
+
+ if (res == STOP_RECONNECT)
+ return;
ctx.gateway().onReconnected();
+
+ reconnectState.firstReconnectFut.onDone();
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to reconnect, will stop node", e);
+ if (!X.hasCause(e, IgniteNeedReconnectException.class,
+ IgniteClientDisconnectedCheckedException.class)) {
+ U.error(log, "Failed to reconnect, will stop node.", e);
+
+ reconnectState.firstReconnectFut.onDone(e);
- close();
+ close();
+ }
+ else {
+ assert ctx.discovery().reconnectSupported();
+
+ U.error(log, "Failed to finish reconnect, will retry [locNodeId=" + ctx.localNodeId() +
+ ", err=" + e.getMessage() + ']');
+ }
+ }
+ finally {
+ reconnectDone.onDone();
}
}
});
@@ -3574,6 +3630,46 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
return ctx.io().sendIoTest(nodes, payload, procFromNioThread);
}
+ /**
+ *
+ */
+ private class ReconnectState {
+ /** */
+ private final GridFutureAdapter firstReconnectFut = new GridFutureAdapter();
+
+ /** */
+ private GridCompoundFuture<?, Object> curReconnectFut;
+
+ /** */
+ private GridFutureAdapter<?> reconnectDone;
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ void waitFirstReconnect() throws IgniteCheckedException {
+ firstReconnectFut.get();
+ }
+
+ /**
+ *
+ */
+ void waitPreviousReconnect() {
+ if (curReconnectFut != null && !curReconnectFut.isDone()) {
+ assert reconnectDone != null;
+
+ curReconnectFut.onDone(STOP_RECONNECT);
+
+ try {
+ reconnectDone.get();
+ }
+ catch (IgniteCheckedException ignote) {
+ // No-op.
+ }
+ }
+
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteKernal.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java
new file mode 100644
index 0000000..61ab576
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Indicates that node should try reconnect to cluster.
+ */
+public class IgniteNeedReconnectException extends IgniteCheckedException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param locNode Local node.
+ * @param cause Cause.
+ */
+ public IgniteNeedReconnectException(ClusterNode locNode, @Nullable Throwable cause) {
+ super("Local node need try to reconnect [locNodeId=" + locNode.id() + ']', cause);
+
+ assert locNode.isClient();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9aa4db1..2ec1070 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -112,6 +112,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -1891,6 +1892,29 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * @return {@code True} if local node client and discovery SPI supports reconnect.
+ */
+ public boolean reconnectSupported() {
+ DiscoverySpi spi = getSpi();
+
+ return ctx.clientNode() && (spi instanceof TcpDiscoverySpi) &&
+ !(((TcpDiscoverySpi) spi).isClientReconnectDisabled());
+ }
+
+ /**
+ * Leave cluster and try to join again.
+ *
+ * @throws IgniteSpiException If failed.
+ */
+ public void reconnect() {
+ assert reconnectSupported();
+
+ DiscoverySpi discoverySpi = getSpi();
+
+ ((TcpDiscoverySpi)discoverySpi).reconnect();
+ }
+
+ /**
* Updates topology version if current version is smaller than updated.
*
* @param updated Updated topology version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7cf75fe..6f2c626 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
@@ -448,6 +449,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
else
U.warn(log, "Still waiting for initial partition map exchange [fut=" + fut + ']');
}
+ catch (IgniteNeedReconnectException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ if (fut.reconnectOnError(e))
+ throw new IgniteNeedReconnectException(cctx.localNode(), e);
+
+ throw e;
+ }
}
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -1697,6 +1707,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
dumpedObjects++;
}
}
+ catch (Exception e) {
+ if (exchFut.reconnectOnError(e))
+ throw new IgniteNeedReconnectException(cctx.localNode(), e);
+
+ throw e;
+ }
}
@@ -1836,7 +1852,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
catch (IgniteInterruptedCheckedException e) {
throw e;
}
- catch (IgniteClientDisconnectedCheckedException e) {
+ catch (IgniteClientDisconnectedCheckedException | IgniteNeedReconnectException e) {
+ assert cctx.discovery().reconnectSupported();
+
+ U.warn(log,"Local node failed to complete partition map exchange due to " +
+ "network issues, will try to reconnect to cluster", e);
+
+ cctx.discovery().reconnect();
+
return;
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index ab8e863..6425bc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -17,15 +17,16 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;
-import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridNodeOrderComparator;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -202,8 +204,14 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
"continue to another node): " + node);
}
catch (IgniteCheckedException e) {
- U.error(log0, "Failed to request affinity assignment from remote node (will " +
- "continue to another node): " + node, e);
+ if (ctx.discovery().reconnectSupported() && X.hasCause(e, IOException.class)) {
+ onDone(new IgniteNeedReconnectException(ctx.localNode(), e));
+
+ return;
+ }
+
+ U.warn(log0, "Failed to request affinity assignment from remote node (will " +
+ "continue to another node): " + node);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e945de9..d4f95e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
@@ -39,6 +41,7 @@ import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -54,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -65,7 +67,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -506,10 +508,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
throw e;
}
+ catch (IgniteNeedReconnectException e) {
+ onDone(e);
+ }
catch (Throwable e) {
- U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e);
+ if (reconnectOnError(e))
+ onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+ else {
+ U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e);
- onDone(e);
+ onDone(e);
+ }
if (e instanceof Error)
throw (Error)e;
@@ -1297,7 +1306,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
}
catch (IgniteCheckedException e) {
- onDone(e);
+ if (reconnectOnError(e))
+ onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+ else
+ onDone(e);
}
}
@@ -1314,8 +1326,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
catch (IgniteCheckedException e) {
if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) {
- log.debug("Failed to send full partition map to node, node left grid " +
- "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']');
+ if (log.isDebugEnabled())
+ log.debug("Failed to send full partition map to node, node left grid " +
+ "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']');
+
+ return;
+ }
+
+ if (reconnectOnError(e)) {
+ onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
return;
}
@@ -1641,6 +1660,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
}
}
+ catch (Exception e) {
+ if (reconnectOnError(e))
+ onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+ else
+ throw e;
+ }
finally {
leaveBusy();
}
@@ -1652,6 +1677,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
}
+ /**
+ * @param e Exception.
+ * @return {@code True} if local node should try reconnect in case of error.
+ */
+ public boolean reconnectOnError(Throwable e) {
+ return X.hasCause(e, IOException.class, IgniteClientDisconnectedCheckedException.class) &&
+ cctx.discovery().reconnectSupported();
+ }
+
/** {@inheritDoc} */
@Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
return exchId.compareTo(fut.exchId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index f7f370e..730c6a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1498,60 +1498,60 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
}
- /**
- * Deployment callback.
- *
- * @param dep Service deployment.
- * @param topVer Topology version.
- */
- private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
- // Retry forever.
- try {
- AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
+ /**
+ * Deployment callback.
+ *
+ * @param dep Service deployment.
+ * @param topVer Topology version.
+ */
+ private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
+ // Retry forever.
+ try {
+ AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
- // If topology version changed, reassignment will happen from topology event.
- if (newTopVer.equals(topVer))
- reassign(dep, topVer);
- }
- catch (IgniteCheckedException e) {
- if (!(e instanceof ClusterTopologyCheckedException))
- log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
+ // If topology version changed, reassignment will happen from topology event.
+ if (newTopVer.equals(topVer))
+ reassign(dep, topVer);
+ }
+ catch (IgniteCheckedException e) {
+ if (!(e instanceof ClusterTopologyCheckedException))
+ log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
- AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
+ AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
- if (!newTopVer.equals(topVer)) {
- assert newTopVer.compareTo(topVer) > 0;
+ if (!newTopVer.equals(topVer)) {
+ assert newTopVer.compareTo(topVer) > 0;
- // Reassignment will happen from topology event.
- return;
- }
+ // Reassignment will happen from topology event.
+ return;
+ }
- ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
- private IgniteUuid id = IgniteUuid.randomUuid();
+ ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
+ private IgniteUuid id = IgniteUuid.randomUuid();
- private long start = System.currentTimeMillis();
+ private long start = System.currentTimeMillis();
- @Override public IgniteUuid timeoutId() {
- return id;
- }
+ @Override public IgniteUuid timeoutId() {
+ return id;
+ }
- @Override public long endTime() {
- return start + RETRY_TIMEOUT;
- }
+ @Override public long endTime() {
+ return start + RETRY_TIMEOUT;
+ }
- @Override public void onTimeout() {
- if (!busyLock.enterBusy())
- return;
+ @Override public void onTimeout() {
+ if (!busyLock.enterBusy())
+ return;
- try {
- // Try again.
- onDeployment(dep, topVer);
- }
- finally {
- busyLock.leaveBusy();
- }
+ try {
+ // Try again.
+ onDeployment(dep, topVer);
}
- });
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 95e2cda..02ba56a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -129,6 +129,9 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED";
+ /** */
+ private static final Object SPI_RECONNECT = "SPI_RECONNECT";
+
/** Remote nodes. */
private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
@@ -809,6 +812,11 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
+ @Override public void reconnect() throws IgniteSpiException {
+ msgWorker.addMessage(SPI_RECONNECT);
+ }
+
+ /** {@inheritDoc} */
@Override public void brakeConnection() {
SocketStream sockStream = msgWorker.currSock;
@@ -879,9 +887,12 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private UUID rmtNodeId;
+ /** */
+ private CountDownLatch stopReadLatch;
+
/**
*/
- protected SocketReader() {
+ SocketReader() {
super(spi.ignite().name(), "tcp-client-disco-sock-reader", log);
}
@@ -889,7 +900,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param sockStream Socket.
* @param rmtNodeId Rmt node id.
*/
- public void setSocket(SocketStream sockStream, UUID rmtNodeId) {
+ void setSocket(SocketStream sockStream, UUID rmtNodeId) {
synchronized (mux) {
this.sockStream = sockStream;
@@ -899,6 +910,31 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
+ /**
+ * @throws InterruptedException If interrupted.
+ */
+ private void forceStopRead() throws InterruptedException {
+ CountDownLatch stopReadLatch;
+
+ synchronized (mux) {
+ SocketStream stream = sockStream;
+
+ if (stream == null)
+ return;
+
+ this.stopReadLatch = stopReadLatch = new CountDownLatch(1);
+
+ U.closeQuiet(stream.socket());
+
+ this.sockStream = null;
+ this.rmtNodeId = null;
+
+ mux.notifyAll();
+ }
+
+ stopReadLatch.await();
+ }
+
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
while (!isInterrupted()) {
@@ -906,6 +942,12 @@ class ClientImpl extends TcpDiscoveryImpl {
UUID rmtNodeId;
synchronized (mux) {
+ if (stopReadLatch != null) {
+ stopReadLatch.countDown();
+
+ stopReadLatch = null;
+ }
+
if (this.sockStream == null) {
mux.wait();
@@ -1007,18 +1049,21 @@ class ClientImpl extends TcpDiscoveryImpl {
private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
/** */
- private final long socketTimeout;
+ private final long sockTimeout;
/** */
private TcpDiscoveryAbstractMessage unackedMsg;
+ /** */
+ private CountDownLatch forceLeaveLatch;
+
/**
*
*/
- protected SocketWriter() {
+ SocketWriter() {
super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
- socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+ sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
spi.getSocketTimeout();
}
@@ -1034,6 +1079,29 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
+ * Sends {@link TcpDiscoveryNodeLeftMessage} and closes socket.
+ *
+ * @throws InterruptedException If interrupted.
+ */
+ private void forceLeave() throws InterruptedException {
+ CountDownLatch forceLeaveLatch;
+
+ synchronized (mux) {
+ // If writer was stopped.
+ if (sock == null)
+ return;
+
+ this.forceLeaveLatch = forceLeaveLatch = new CountDownLatch(1);
+
+ unackedMsg = null;
+
+ mux.notifyAll();
+ }
+
+ forceLeaveLatch.await();
+ }
+
+ /**
* @param sock Socket.
* @param clientAck {@code True} is server supports client message acknowlede.
*/
@@ -1089,13 +1157,41 @@ class ClientImpl extends TcpDiscoveryImpl {
continue;
}
- msg = queue.poll();
+ if (forceLeaveLatch != null) {
+ msg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
- if (msg == null) {
- mux.wait();
+ msg.client(true);
+
+ try {
+ spi.writeToSocket(
+ sock,
+ msg,
+ sockTimeout);
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to send TcpDiscoveryNodeLeftMessage on force leave [msg=" + msg +
+ ", err=" + e.getMessage() + ']');
+ }
+ }
+
+ U.closeQuiet(sock);
+
+ this.sock = null;
+
+ clear();
continue;
}
+ else {
+ msg = queue.poll();
+
+ if (msg == null) {
+ mux.wait();
+
+ continue;
+ }
+ }
}
for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs)
@@ -1115,7 +1211,7 @@ class ClientImpl extends TcpDiscoveryImpl {
spi.writeToSocket(
sock,
msg,
- socketTimeout);
+ sockTimeout);
msg = null;
@@ -1165,10 +1261,30 @@ class ClientImpl extends TcpDiscoveryImpl {
synchronized (mux) {
if (sock == this.sock)
this.sock = null; // Connection has dead.
+
+ clear();
}
}
}
}
+
+ /**
+ *
+ */
+ private void clear() {
+ assert Thread.holdsLock(mux);
+
+ queue.clear();
+ unackedMsg = null;
+
+ CountDownLatch forceLeaveLatch = this.forceLeaveLatch;
+
+ if (forceLeaveLatch != null) {
+ this.forceLeaveLatch = null;
+
+ forceLeaveLatch.countDown();
+ }
+ }
}
/**
@@ -1413,6 +1529,38 @@ class ClientImpl extends TcpDiscoveryImpl {
else
leaveLatch.countDown();
}
+ else if (msg == SPI_RECONNECT) {
+ if (state == CONNECTED) {
+ if (reconnector != null) {
+ reconnector.cancel();
+ reconnector.join();
+
+ reconnector = null;
+ }
+
+ sockWriter.forceLeave();
+ sockReader.forceStopRead();
+
+ currSock = null;
+
+ queue.clear();
+
+ onDisconnected();
+
+ notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+
+ UUID newId = UUID.randomUUID();
+
+ U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " +
+ "to network problems [newId=" + newId +
+ ", prevId=" + locNode.id() +
+ ", locNode=" + locNode+ ']');
+
+ locNode.onClientDisconnected(newId);
+
+ tryJoin();
+ }
+ }
else if (msg instanceof TcpDiscoveryNodeFailedMessage &&
((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) {
TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg;
@@ -1495,20 +1643,7 @@ class ClientImpl extends TcpDiscoveryImpl {
", failMsg=" + forceFailMsg + ']');
}
- state = DISCONNECTED;
-
- nodeAdded = false;
-
- IgniteClientDisconnectedCheckedException err =
- new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
- "client node disconnected.");
-
- for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
- GridFutureAdapter<Boolean> fut = e.getValue();
-
- if (pingFuts.remove(e.getKey(), fut))
- fut.onDone(err);
- }
+ onDisconnected();
notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
}
@@ -1604,6 +1739,26 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
+ *
+ */
+ private void onDisconnected() {
+ state = DISCONNECTED;
+
+ nodeAdded = false;
+
+ IgniteClientDisconnectedCheckedException err =
+ new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
+ "client node disconnected.");
+
+ for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
+ GridFutureAdapter<Boolean> fut = e.getValue();
+
+ if (pingFuts.remove(e.getKey(), fut))
+ fut.onDone(err);
+ }
+ }
+
+ /**
* @throws InterruptedException If interrupted.
*/
private void tryJoin() throws InterruptedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 89a9efa..dfe614b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1610,6 +1610,11 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
+ @Override public void reconnect() throws IgniteSpiException {
+ throw new UnsupportedOperationException("Reconnect is not supported for server.");
+ }
+
+ /** {@inheritDoc} */
@Override protected IgniteSpiThread workerThread() {
return msgWorker;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index f199c20..84c2ff2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -259,6 +260,13 @@ abstract class TcpDiscoveryImpl {
}
/**
+ * Leave cluster and try to join again.
+ *
+ * @throws IgniteSpiException If failed.
+ */
+ public abstract void reconnect() throws IgniteSpiException;
+
+ /**
* <strong>FOR TEST ONLY!!!</strong>
* <p>
* Simulates this node failure by stopping service threads. So, node will become
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 00ae97d..a2a47fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1927,6 +1927,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
}
/**
+ * Force reconnect to cluster.
+ *
+ * @throws IgniteSpiException If failed.
+ */
+ public void reconnect() throws IgniteSpiException {
+ impl.reconnect();
+ }
+
+ /**
* <strong>FOR TEST ONLY!!!</strong>
*/
public int clientWorkerCount() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 0f0165b..6cdf465 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -700,9 +700,12 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
try {
Ignition.start(optimize(getConfiguration(getTestGridName(SRV_CNT))));
- fail();
+ // Commented due to IGNITE-4473, because
+ // IgniteClientDisconnectedException won't
+ // be thrown, but client will reconnect.
+// fail();
- return false;
+ return true;
}
catch (IgniteClientDisconnectedException e) {
log.info("Expected start error: " + e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
new file mode 100644
index 0000000..a5d42e9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests client to be able restore connection to cluster if coordination is not available.
+ */
+public class IgniteClientRejoinTest extends GridCommonAbstractTest {
+ /** Block. */
+ private volatile boolean block;
+
+ /** Block all. */
+ private volatile boolean blockAll;
+
+ /** Coordinator. */
+ private volatile ClusterNode crd;
+
+ /** Client reconnect disabled. */
+ private boolean clientReconnectDisabled;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ System.setProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK", "true");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ System.clearProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ clientReconnectDisabled = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (gridName.contains("client")) {
+ cfg.setCommunicationSpi(new TcpCommunicationSpi());
+
+ TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+ DiscoverySpi dspi = new DiscoverySpi();
+
+ dspi.setIpFinder(spi.getIpFinder());
+
+ cfg.setDiscoverySpi(dspi);
+
+ dspi.setJoinTimeout(60_000);
+ dspi.setClientReconnectDisabled(clientReconnectDisabled);
+
+ cfg.setClientMode(true);
+ }
+
+ // TODO: IGNITE-4833
+ cfg.setPeerClassLoadingEnabled(false);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientsReconnectAfterStart() throws Exception {
+ Ignite srv1 = startGrid("server1");
+
+ crd = ((IgniteKernal)srv1).localNode();
+
+ Ignite srv2 = startGrid("server2");
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ List<Ignite> clientNodes = new ArrayList<>();
+
+ final int CLIENTS_NUM = 5;
+
+ for (int i = 0; i < CLIENTS_NUM; i++)
+ clientNodes.add(startGrid("client" + i));
+
+ blockAll = true;
+
+ GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ U.sleep(5_000);
+
+ block = true;
+ blockAll = false;
+
+ System.out.println(">>> Allow with blocked coordinator.");
+
+ latch.countDown();
+
+ return null;
+ }
+ });
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ latch.await();
+
+ U.sleep((new Random().nextInt(15) + 30) * 1000);
+
+ block = false;
+
+ System.out.println(">>> Allow coordinator.");
+
+ return null;
+ }
+ });
+
+ fut.get();
+
+ for (Ignite client : clientNodes) {
+ while (true) {
+ try {
+ IgniteCache<Integer, Integer> cache = client.getOrCreateCache("some");
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, i);
+
+ for (int i = 0; i < 100; i++)
+ assertEquals((Integer)i, cache.get(i));
+
+ cache.clear();
+
+ break;
+ }
+ catch (IgniteClientDisconnectedException e) {
+ e.reconnectFuture().get();
+ }
+ }
+ }
+
+ assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size());
+ assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientsReconnect() throws Exception {
+ Ignite srv1 = startGrid("server1");
+
+ crd = ((IgniteKernal)srv1).localNode();
+
+ Ignite srv2 = startGrid("server2");
+
+ block = true;
+
+ List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final int CLIENTS_NUM = 5;
+
+ for (int i = 0; i < CLIENTS_NUM; i++) {
+ final int idx = i;
+
+ IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
+ @Override public Ignite call() throws Exception {
+ latch.await();
+
+ return startGrid("client" + idx);
+ }
+ });
+
+ futs.add(fut);
+ }
+
+ GridTestUtils.runAsync(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ latch.countDown();
+
+ Random rnd = new Random();
+
+ U.sleep((rnd.nextInt(15) + 15) * 1000);
+
+ block = false;
+
+ System.out.println(">>> ALLOW connection to coordinator.");
+
+ return true;
+ }
+ });
+
+ for (IgniteInternalFuture<Ignite> clientFut : futs) {
+ Ignite client = clientFut.get();
+
+ IgniteCache<Integer, Integer> cache = client.getOrCreateCache(client.name());
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, i);
+
+ for (int i = 0; i < 100; i++)
+ assert i == cache.get(i);
+ }
+
+ assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size());
+ assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientsReconnectDisabled() throws Exception {
+ clientReconnectDisabled = true;
+
+ Ignite srv1 = startGrid("server1");
+
+ crd = ((IgniteKernal)srv1).localNode();
+
+ Ignite srv2 = startGrid("server2");
+
+ block = true;
+
+ List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final int CLIENTS_NUM = 5;
+
+ for (int i = 0; i < CLIENTS_NUM; i++) {
+ final int idx = i;
+
+ IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
+ @Override public Ignite call() throws Exception {
+ latch.await();
+
+ return startGrid("client" + idx);
+ }
+ });
+
+ futs.add(fut);
+ }
+
+ latch.countDown();
+
+ for (final IgniteInternalFuture<Ignite> clientFut : futs) {
+ //noinspection ThrowableNotThrown
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientFut.get();
+
+ return null;
+ }
+ }, IgniteCheckedException.class, null);
+ }
+
+ assertEquals(0, srv1.cluster().forClients().nodes().size());
+ assertEquals(0, srv2.cluster().forClients().nodes().size());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 3 * 60_000;
+ }
+
+ /**
+ *
+ */
+ private class TcpCommunicationSpi extends org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ if (blockAll || block && node.id().equals(crd.id()))
+ throw new IgniteSpiException(new SocketException("Test communication exception"));
+
+ super.sendMessage(node, msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
+ IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+ if (blockAll || block && node.id().equals(crd.id()))
+ throw new IgniteSpiException(new SocketException("Test communication exception"));
+
+ super.sendMessage(node, msg, ackC);
+ }
+ }
+
+ /**
+ *
+ */
+ private class DiscoverySpi extends TcpDiscoverySpi {
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data,
+ long timeout) throws IOException {
+ if (blockAll || block && sock.getPort() == 47500)
+ throw new SocketException("Test discovery exception");
+
+ super.writeToSocket(sock, msg, data, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (blockAll || block && sock.getPort() == 47500)
+ throw new SocketException("Test discovery exception");
+
+ super.writeToSocket(sock, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (blockAll || block && sock.getPort() == 47500)
+ throw new SocketException("Test discovery exception");
+
+ super.writeToSocket(sock, out, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
+ long timeout) throws IOException {
+ if (blockAll || block && sock.getPort() == 47500)
+ throw new SocketException("Test discovery exception");
+
+ super.writeToSocket(msg, sock, res, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr,
+ IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException {
+ if (blockAll || block && sock.getPort() == 47500)
+ throw new SocketException("Test discovery exception");
+
+ return super.openSocket(sock, remAddr, timeoutHelper);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 331b581..0483a1c 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.IgniteState;
@@ -43,6 +44,7 @@ import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -1788,8 +1790,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
clientNodeIds.add(client.cluster().localNode().id());
GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override
- public boolean apply() {
+ @Override public boolean apply() {
return srv.cluster().nodes().size() == 2;
}
}, awaitTime());
@@ -1800,6 +1801,49 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testForceClientReconnect() throws Exception {
+ startServerNodes(1);
+
+ startClientNodes(1);
+
+ Ignite srv = G.ignite("server-0");
+ IgniteKernal client = (IgniteKernal)G.ignite("client-0");
+
+ UUID clientId = F.first(clientNodeIds);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ srv.events().enableLocal(EVT_NODE_JOINED);
+
+ srv.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ latch.countDown();
+
+ return false;
+ }
+ }, EVT_NODE_JOINED);
+
+ client.context().discovery().reconnect();
+
+ assert latch.await(10, TimeUnit.SECONDS);
+
+ while (true) {
+ try {
+ UUID newId = client.localNode().id();
+
+ assert !clientId.equals(newId) : clientId;
+
+ break;
+ }
+ catch (IgniteClientDisconnectedException e) {
+ e.reconnectFuture().get(10_000);
+ }
+ }
+ }
+
+ /**
* @param ignite Ignite.
* @throws Exception If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/94641931/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
index ea8e37b..67d88e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.IgniteClientReconnectFailoverTest;
import org.apache.ignite.internal.IgniteClientReconnectServicesTest;
import org.apache.ignite.internal.IgniteClientReconnectStopTest;
import org.apache.ignite.internal.IgniteClientReconnectStreamerTest;
+import org.apache.ignite.internal.IgniteClientRejoinTest;
/**
*
@@ -52,6 +53,7 @@ public class IgniteClientReconnectTestSuite extends TestSuite {
suite.addTestSuite(IgniteClientReconnectServicesTest.class);
suite.addTestSuite(IgniteClientReconnectStreamerTest.class);
suite.addTestSuite(IgniteClientReconnectFailoverTest.class);
+ suite.addTestSuite(IgniteClientRejoinTest.class);
return suite;
}