You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/03/07 16:15:57 UTC
[2/4] ignite git commit: IGNITE-4473 - Client should re-try
connection attempt in case of concurrent network failure.
IGNITE-4473 - Client should re-try connection attempt in case of concurrent network failure.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e5a5f9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e5a5f9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e5a5f9d
Branch: refs/heads/ignite-4473-1
Commit: 8e5a5f9d5cc4e4cd3d7d1e278d9a6a85dacfd706
Parents: 9df5e94
Author: dkarachentsev <dk...@gridgain.com>
Authored: Fri Mar 3 17:28:27 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Fri Mar 3 17:28:27 2017 +0300
----------------------------------------------------------------------
.../IgniteCouldReconnectCheckedException.java | 33 +++
.../ignite/internal/GridKernalGateway.java | 7 +
.../ignite/internal/GridKernalGatewayImpl.java | 14 +
.../apache/ignite/internal/IgniteKernal.java | 45 ++-
.../discovery/GridDiscoveryManager.java | 11 +
.../GridCachePartitionExchangeManager.java | 23 +-
.../dht/GridDhtAssignmentFetchFuture.java | 13 +-
.../GridDhtPartitionsExchangeFuture.java | 14 +-
.../service/GridServiceProcessor.java | 86 +++---
.../ignite/spi/discovery/DiscoverySpi.java | 10 +
.../ignite/spi/discovery/tcp/ClientImpl.java | 157 ++++++++++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 6 +
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 10 +
.../spi/discovery/tcp/TcpDiscoverySpi.java | 6 +
.../ignite/internal/IgniteClientRejoinTest.java | 281 +++++++++++++++++++
.../cluster/GridUpdateNotifierSelfTest.java | 2 +
.../IgniteClientReconnectTestSuite.java | 2 +
17 files changed, 657 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/IgniteCouldReconnectCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCouldReconnectCheckedException.java b/modules/core/src/main/java/org/apache/ignite/IgniteCouldReconnectCheckedException.java
new file mode 100644
index 0000000..62acaee
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCouldReconnectCheckedException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Indicates whether node could be rejoined to cluster.
+ */
+public class IgniteCouldReconnectCheckedException extends IgniteCheckedException {
+ /**
+ * @param msg Message.
+ * @param cause Cause.
+ */
+ public IgniteCouldReconnectCheckedException(String msg, @Nullable Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
index 1b9da2f..da52d09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
@@ -123,4 +123,11 @@ public interface GridKernalGateway {
* Reconnected callback.
*/
public void onReconnected();
+
+ /**
+ * Reconnect failed callback.
+ *
+ * @param t Cause.
+ */
+ public void onReconnectFailed(Throwable t);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
index fe8c580..84bb886 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
@@ -23,6 +23,7 @@ import java.io.StringWriter;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCouldReconnectCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -151,9 +152,16 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
@Override public GridFutureAdapter<?> onDisconnected() {
GridFutureAdapter<?> fut = new GridFutureAdapter<>();
+ IgniteFutureImpl reconnectFut0 = reconnectFut;
+
reconnectFut = new IgniteFutureImpl<>(fut);
if (!state.compareAndSet(GridKernalState.STARTED, GridKernalState.DISCONNECTED)) {
+ Throwable error = reconnectFut0.internalFuture().error();
+
+ if (error instanceof IgniteCouldReconnectCheckedException)
+ return fut;
+
((GridFutureAdapter<?>)reconnectFut.internalFuture()).onDone(new IgniteCheckedException("Node stopped."));
return null;
@@ -168,6 +176,12 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
((GridFutureAdapter<?>)reconnectFut.internalFuture()).onDone();
}
+ /** {@inheritDoc} */
+ @Override public void onReconnectFailed(Throwable t) {
+ if (state.get() == GridKernalState.DISCONNECTED)
+ ((GridFutureAdapter<?>)reconnectFut.internalFuture()).onDone(t);
+ }
+
/**
* Retrieves user stack trace.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 4972d1f..1fb5b57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -54,6 +54,7 @@ import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCouldReconnectCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteCountDownLatch;
@@ -780,6 +781,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
List<PluginProvider> plugins = U.allPluginProviders();
+ boolean recon = false;
+
// Spin out SPIs & managers.
try {
ctx = new GridKernalContextImpl(log,
@@ -946,8 +949,35 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
if (comp instanceof GridIoManager)
continue;
- if (!skipDaemon(comp))
- comp.onKernalStart();
+ if (!skipDaemon(comp)) {
+ try {
+ comp.onKernalStart();
+ }
+ catch (IgniteCouldReconnectCheckedException e) {
+ recon = true;
+ }
+ }
+ }
+
+ while (recon) {
+ try {
+ ctx.discovery().rejoin().get(); // TODO timeout?
+
+ IgniteFuture<?> reconFut = ctx.cluster().clientReconnectFuture();
+
+ reconFut.get();
+
+ recon = false;
+ }
+ catch (IgniteException e) {
+ if (X.hasCause(e, IgniteCouldReconnectCheckedException.class, IgniteClientDisconnectedException.class)) {
+ log.warning("Rejoin failed, retry. locNodeId=" + ctx.localNodeId() + ']');
+
+ continue;
+ }
+
+ throw e;
+ }
}
// Register MBeans.
@@ -3393,9 +3423,16 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ctx.gateway().onReconnected();
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to reconnect, will stop node", e);
+ if (!X.hasCause(e, IgniteCouldReconnectCheckedException.class, IgniteClientDisconnectedCheckedException.class)) {
+ U.error(log, "Failed to reconnect, will stop node 2", e);
- close();
+ close();
+ }
+ else {
+ U.error(log, "Failed to reconnect, retry", e);
+
+ ctx.gateway().onReconnectFailed(e);
+ }
}
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9aa4db1..9efc428 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1891,6 +1891,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * Leave cluster and try to join again.
+ *
+ * @return Future will be completed with success when node joined cluster
+ * or throw exception if failed.
+ * @throws IgniteSpiException If failed.
+ */
+ public IgniteInternalFuture<Object> rejoin() {
+ return getSpi().rejoin();
+ }
+
+ /**
* Updates topology version if current version is smaller than updated.
*
* @param updated Updated topology version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7cf75fe..0dd1a58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache;
+import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -43,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCouldReconnectCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
@@ -87,6 +89,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -448,6 +451,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
else
U.warn(log, "Still waiting for initial partition map exchange [fut=" + fut + ']');
}
+ catch (Exception e) {
+ if (cctx.localNode().isClient() && X.hasCause(e, IOException.class))
+ throw new IgniteCouldReconnectCheckedException("Reconnect", e);
+
+ throw e;
+ }
}
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -1697,6 +1706,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
dumpedObjects++;
}
}
+ catch (Exception e) {
+ if (cctx.localNode().isClient() && X.hasCause(e, IOException.class))
+ throw new IgniteCouldReconnectCheckedException("Reconnect", e);
+
+ throw e;
+ }
}
@@ -1836,8 +1851,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
catch (IgniteInterruptedCheckedException e) {
throw e;
}
- catch (IgniteClientDisconnectedCheckedException e) {
- return;
+ catch (IgniteClientDisconnectedCheckedException | IgniteCouldReconnectCheckedException e) {
+ if (!cctx.localNode().isClient()) {
+ U.error(log, "Ignore exception", e);
+
+ return;
+ }
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to wait for completion of partition map exchange " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index ab8e863..bc0adda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;
-import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -202,8 +203,14 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
"continue to another node): " + node);
}
catch (IgniteCheckedException e) {
- U.error(log0, "Failed to request affinity assignment from remote node (will " +
- "continue to another node): " + node, e);
+ if (log0.isDebugEnabled() || !X.hasCause(e, IOException.class)) {
+ U.error(log0, "Failed to request affinity assignment from remote node (will " +
+ "continue to another node): " + node, e);
+ }
+ else {
+ U.warn(log0, "Failed to request affinity assignment from remote node (will " +
+ "continue to another node): " + node);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e945de9..96389d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCouldReconnectCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
@@ -39,6 +41,7 @@ import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -54,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -65,7 +67,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -509,7 +511,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
catch (Throwable e) {
U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e);
- onDone(e);
+ if (cctx.localNode().isClient() && X.hasCause(e,
+ IOException.class,
+ IgniteClientDisconnectedCheckedException.class))
+ onDone(new IgniteCouldReconnectCheckedException("Local node could be reconnected. [locNodeId="
+ + cctx.localNodeId() + ']', e));
+ else
+ onDone(e);
if (e instanceof Error)
throw (Error)e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index d26242d..43918fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1508,60 +1508,60 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
}
- /**
- * Deployment callback.
- *
- * @param dep Service deployment.
- * @param topVer Topology version.
- */
- private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
- // Retry forever.
- try {
- AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
+ /**
+ * Deployment callback.
+ *
+ * @param dep Service deployment.
+ * @param topVer Topology version.
+ */
+ private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
+ // Retry forever.
+ try {
+ AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
- // If topology version changed, reassignment will happen from topology event.
- if (newTopVer.equals(topVer))
- reassign(dep, topVer);
- }
- catch (IgniteCheckedException e) {
- if (!(e instanceof ClusterTopologyCheckedException))
- log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
+ // If topology version changed, reassignment will happen from topology event.
+ if (newTopVer.equals(topVer))
+ reassign(dep, topVer);
+ }
+ catch (IgniteCheckedException e) {
+ if (!(e instanceof ClusterTopologyCheckedException))
+ log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
- AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
+ AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
- if (!newTopVer.equals(topVer)) {
- assert newTopVer.compareTo(topVer) > 0;
+ if (!newTopVer.equals(topVer)) {
+ assert newTopVer.compareTo(topVer) > 0;
- // Reassignment will happen from topology event.
- return;
- }
+ // Reassignment will happen from topology event.
+ return;
+ }
- ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
- private IgniteUuid id = IgniteUuid.randomUuid();
+ ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
+ private IgniteUuid id = IgniteUuid.randomUuid();
- private long start = System.currentTimeMillis();
+ private long start = System.currentTimeMillis();
- @Override public IgniteUuid timeoutId() {
- return id;
- }
+ @Override public IgniteUuid timeoutId() {
+ return id;
+ }
- @Override public long endTime() {
- return start + RETRY_TIMEOUT;
- }
+ @Override public long endTime() {
+ return start + RETRY_TIMEOUT;
+ }
- @Override public void onTimeout() {
- if (!busyLock.enterBusy())
- return;
+ @Override public void onTimeout() {
+ if (!busyLock.enterBusy())
+ return;
- try {
- // Try again.
- onDeployment(dep, topVer);
- }
- finally {
- busyLock.leaveBusy();
- }
+ try {
+ // Try again.
+ onDeployment(dep, topVer);
}
- });
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 8c23d92..9ee3777 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.IgniteSpiException;
@@ -128,6 +129,15 @@ public interface DiscoverySpi extends IgniteSpi {
public void disconnect() throws IgniteSpiException;
/**
+ * Leave cluster and try to join again.
+ *
+ * @return Future will be completed with success when node joined cluster
+ * or throw exception if failed.
+ * @throws IgniteSpiException If failed.
+ */
+ public IgniteInternalFuture<Object> rejoin() throws IgniteSpiException;
+
+ /**
* Sets discovery SPI node authenticator. This method is called before SPI start() method.
*
* @param auth Discovery SPI authenticator.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 39c539c..bbbd4c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -53,6 +53,7 @@ import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -127,6 +128,9 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED";
+ /** */
+ private static final Object SPI_REJOIN = "SPI_REJOIN";
+
/** Remote nodes. */
private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
@@ -163,6 +167,12 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private final Timer timer = new Timer("TcpDiscoverySpi.timer");
+ /** Rejoin future. */
+ private GridFutureAdapter<Object> rejoinFut;
+
+ /** Rejoin mutex. */
+ private final Object rejoinMux = new Object();
+
/** */
protected MessageWorker msgWorker;
@@ -789,6 +799,24 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Object> rejoin() throws IgniteSpiException {
+ synchronized (rejoinMux) {
+ if (rejoinFut != null)
+ return rejoinFut;
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Rejoining to cluster, [localNodeId=" + getLocalNodeId() + ']');
+
+ rejoinFut = new GridFutureAdapter<>();
+
+ msgWorker.addMessage(SPI_REJOIN);
+
+ return rejoinFut;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void brakeConnection() {
SocketStream sockStream = msgWorker.currSock;
@@ -987,18 +1015,21 @@ class ClientImpl extends TcpDiscoveryImpl {
private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
/** */
- private final long socketTimeout;
+ private final long sockTimeout;
/** */
private TcpDiscoveryAbstractMessage unackedMsg;
+ /** */
+ private GridFutureAdapter<Object> forceFut;
+
/**
*
*/
protected SocketWriter() {
super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
- socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+ sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
spi.getSocketTimeout();
}
@@ -1014,6 +1045,26 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
+ *
+ */
+ private void forceLeave() {
+ synchronized (mux) {
+ forceFut = new GridFutureAdapter<>();
+
+ unackedMsg = null;
+
+ mux.notifyAll();
+ }
+
+ try {
+ forceFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSpiException(e);
+ }
+ }
+
+ /**
* @param sock Socket.
* @param clientAck {@code True} is server supports client message acknowlede.
*/
@@ -1069,12 +1120,19 @@ class ClientImpl extends TcpDiscoveryImpl {
continue;
}
- msg = queue.poll();
+ if (forceFut != null) {
+ msg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
- if (msg == null) {
- mux.wait();
+ msg.client(true);
+ }
+ else {
+ msg = queue.poll();
- continue;
+ if (msg == null) {
+ mux.wait();
+
+ continue;
+ }
}
}
@@ -1095,7 +1153,10 @@ class ClientImpl extends TcpDiscoveryImpl {
spi.writeToSocket(
sock,
msg,
- socketTimeout);
+ sockTimeout);
+
+ if (forceFut != null)
+ throw new IgniteCheckedException("Force fail local node.");
msg = null;
@@ -1137,7 +1198,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Failed to send message, node is stopping [msg=" + msg + ", err=" + e + ']');
}
- else
+ else if (forceFut == null)
U.error(log, "Failed to send message: " + msg, e);
U.closeQuiet(sock);
@@ -1145,6 +1206,15 @@ class ClientImpl extends TcpDiscoveryImpl {
synchronized (mux) {
if (sock == this.sock)
this.sock = null; // Connection has dead.
+
+ if (forceFut != null) {
+ queue.clear();
+ unackedMsg = null;
+
+ forceFut.onDone();
+
+ forceFut = null;
+ }
}
}
}
@@ -1393,6 +1463,59 @@ class ClientImpl extends TcpDiscoveryImpl {
else
leaveLatch.countDown();
}
+ else if (msg == SPI_REJOIN) {
+ if (reconnector != null) {
+ reconnector.cancel();
+ reconnector.join();
+
+ reconnector = null;
+ }
+
+ if (state == CONNECTED) {
+ state = DISCONNECTED;
+
+ nodeAdded = false;
+
+ IgniteClientDisconnectedCheckedException err =
+ new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
+ "client node disconnected.");
+
+ for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
+ GridFutureAdapter<Boolean> fut = e.getValue();
+
+ if (pingFuts.remove(e.getKey(), fut))
+ fut.onDone(err);
+ }
+
+ sockWriter.forceLeave();
+ sockReader.setSocket(null, null);
+ currSock = null;
+
+ notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+ }
+
+ if (state == DISCONNECTED) {
+ UUID newId = UUID.randomUUID();
+
+ locNode.onClientDisconnected(newId);
+
+ tryJoin();
+ }
+
+ synchronized (rejoinMux) {
+ ClientImpl.State state0 = state;
+
+ if (rejoinFut != null &&
+ (state0 == STARTING || state0 == STOPPED || state0 == SEGMENTED)) {
+ rejoinFut.onDone(new IgniteCheckedException(
+ "Cannot perform rejoin on incompatible state: " + state0));
+
+ rejoinFut = null;
+ }
+ else
+ assert rejoinFut != null : "Rejoin future cannot be null.";
+ }
+ }
else if (msg instanceof TcpDiscoveryNodeFailedMessage &&
((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) {
TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg;
@@ -1566,6 +1689,16 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
}
+ catch (Throwable t) {
+ synchronized (rejoinMux) {
+ if (rejoinFut != null)
+ rejoinFut.onDone(t);
+
+ rejoinFut = null;
+ }
+
+ throw t;
+ }
finally {
SocketStream currSock = this.currSock;
@@ -1793,6 +1926,14 @@ class ClientImpl extends TcpDiscoveryImpl {
joinErr.set(null);
joinLatch.countDown();
+
+ synchronized (rejoinMux) {
+ if (rejoinFut != null) {
+ rejoinFut.onDone();
+
+ rejoinFut = null;
+ }
+ }
}
else if (log.isDebugEnabled())
log.debug("Discarding node add finished message (this message has already been processed) " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 93978ac..a13a70f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -65,6 +65,7 @@ import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.IgnitionEx;
@@ -1591,6 +1592,11 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture rejoin() throws IgniteSpiException {
+ throw new UnsupportedOperationException("Rejoin is not supported for server.");
+ }
+
+ /** {@inheritDoc} */
@Override protected IgniteSpiThread workerThread() {
return msgWorker;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index f199c20..a4ab772 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -259,6 +260,15 @@ abstract class TcpDiscoveryImpl {
}
/**
+ * Leave cluster and try to join again.
+ *
+ * @return Future will be completed with success when node joined cluster
+ * or throw exception if failed.
+ * @throws IgniteSpiException If failed.
+ */
+ public abstract IgniteInternalFuture<Object> rejoin() throws IgniteSpiException;
+
+ /**
* <strong>FOR TEST ONLY!!!</strong>
* <p>
* Simulates this node failure by stopping service threads. So, node will become
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 45933e1..e063d40 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -52,6 +52,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -1921,6 +1922,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return ignite().configuration().getSslContextFactory() != null;
}
+ /** {@inheritDoc} */
+ public IgniteInternalFuture<Object> rejoin() throws IgniteSpiException {
+ return impl.rejoin();
+ }
+
/**
* <strong>FOR TEST ONLY!!!</strong>
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
new file mode 100644
index 0000000..d91bed6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests client to be able restore connection to cluster if coordination is not available.
+ */
+public class IgniteClientRejoinTest extends GridCommonAbstractTest {
+ /** Block. */
+ private volatile boolean block;
+
+ /** Coordinator. */
+ private volatile ClusterNode crd;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ System.setProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK", "true");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ System.clearProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (gridName.contains("client")) {
+ cfg.setCommunicationSpi(new TcpCommunicationSpi());
+
+ TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+ DiscoverySpi dspi = new DiscoverySpi();
+
+ dspi.setIpFinder(spi.getIpFinder());
+
+ cfg.setDiscoverySpi(dspi);
+
+ cfg.setClientMode(true);
+ }
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnect() throws Exception {
+ Ignite srv1 = startGrid("server1");
+
+ crd = ((IgniteKernal)srv1).localNode();
+
+ Ignite srv2 = startGrid("server2");
+
+ block = true;
+
+ IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ Random rnd = new Random();
+
+ U.sleep((rnd.nextInt(15) + 30) * 1000);
+
+ block = false;
+
+ System.out.println("ALLOW connection to coordinator.");
+
+ return true;
+ }
+ });
+
+ Ignite client = startGrid("client");
+
+
+ assert fut.get();
+
+ IgniteCache<Integer, Integer> cache = client.getOrCreateCache("some");
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, i);
+
+ for (int i = 0; i < 100; i++)
+ assert i == cache.get(i);
+
+ Collection<ClusterNode> clients = client.cluster().forClients().nodes();
+
+ assertEquals("Clients: " + clients, 1, clients.size());
+ assertEquals(1, srv1.cluster().forClients().nodes().size());
+ assertEquals(1, srv2.cluster().forClients().nodes().size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testManyClientsReconnect() throws Exception {
+ Ignite srv1 = startGrid("server1");
+
+ crd = ((IgniteKernal)srv1).localNode();
+
+ Ignite srv2 = startGrid("server2");
+
+ block = true;
+
+ List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final int CLIENTS_NUM = 5;
+
+ for (int i = 0; i < CLIENTS_NUM; i++) {
+ final int idx = i;
+
+ IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
+ @Override public Ignite call() throws Exception {
+ latch.await();
+
+ return startGrid("client" + idx);
+ }
+ });
+
+ futs.add(fut);
+ }
+
+ GridTestUtils.runAsync(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ latch.countDown();
+
+ Random rnd = new Random();
+
+ U.sleep((rnd.nextInt(15) + 15) * 1000);
+
+ block = false;
+
+ System.out.println(">>> ALLOW connection to coordinator.");
+
+ return true;
+ }
+ });
+
+ for (IgniteInternalFuture<Ignite> clientFut : futs) {
+ Ignite client = clientFut.get();
+
+ IgniteCache<Integer, Integer> cache = client.getOrCreateCache(client.name());
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, i);
+
+ for (int i = 0; i < 100; i++)
+ assert i == cache.get(i);
+ }
+
+ assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size());
+ assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 2 * 60_000;
+ }
+
+ /**
+ *
+ */
+ private class TcpCommunicationSpi extends org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ if (block && node.id().equals(crd.id()))
+ throw new IgniteSpiException(new SocketException("Test communication exception"));
+
+ super.sendMessage(node, msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
+ IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+ if (block && node.id().equals(crd.id()))
+ throw new IgniteSpiException(new SocketException("Test communication exception"));
+
+ super.sendMessage(node, msg, ackC);
+ }
+ }
+
+ /**
+ *
+ */
+ private class DiscoverySpi extends TcpDiscoverySpi {
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data,
+ long timeout) throws IOException {
+ if (block && sock.getPort() == 47500)
+ throw new SocketException("Test discovery exception");
+
+ super.writeToSocket(sock, msg, data, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (block && sock.getPort() == 47500)
+ throw new SocketException("Test discovery exception");
+
+ super.writeToSocket(sock, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (block && sock.getPort() == 47500)
+ throw new SocketException("Test discovery exception");
+
+ super.writeToSocket(sock, out, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
+ long timeout) throws IOException {
+ if (block && sock.getPort() == 47500)
+ throw new SocketException("Test discovery exception");
+
+ super.writeToSocket(msg, sock, res, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr,
+ IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException {
+ if (block && sock.getPort() == 47500)
+ throw new SocketException("Test discovery exception");
+
+ return super.openSocket(sock, remAddr, timeoutHelper);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/GridUpdateNotifierSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/GridUpdateNotifierSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/GridUpdateNotifierSelfTest.java
index 21b91b6..4e3977c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/GridUpdateNotifierSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/GridUpdateNotifierSelfTest.java
@@ -133,6 +133,8 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
return null;
}
+ @Override public void onReconnectFailed(Throwable t) {}
+
@Override public void onReconnected() {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
index ea8e37b..67d88e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.IgniteClientReconnectFailoverTest;
import org.apache.ignite.internal.IgniteClientReconnectServicesTest;
import org.apache.ignite.internal.IgniteClientReconnectStopTest;
import org.apache.ignite.internal.IgniteClientReconnectStreamerTest;
+import org.apache.ignite.internal.IgniteClientRejoinTest;
/**
*
@@ -52,6 +53,7 @@ public class IgniteClientReconnectTestSuite extends TestSuite {
suite.addTestSuite(IgniteClientReconnectServicesTest.class);
suite.addTestSuite(IgniteClientReconnectStreamerTest.class);
suite.addTestSuite(IgniteClientReconnectFailoverTest.class);
+ suite.addTestSuite(IgniteClientRejoinTest.class);
return suite;
}