You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/08 17:19:12 UTC
[2/2] incubator-ignite git commit: # ignite-901
# ignite-901
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/86d963f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/86d963f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/86d963f9
Branch: refs/heads/ignite-901
Commit: 86d963f98f3d3db33effdc482654e86d5b02bc52
Parents: a3318e3
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 8 10:08:02 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 8 18:19:00 2015 +0300
----------------------------------------------------------------------
.../IgniteClientDisconnectedException.java | 10 +-
.../ignite/internal/GridJobSiblingImpl.java | 2 +-
.../ignite/internal/GridKernalGatewayImpl.java | 4 +-
.../apache/ignite/internal/IgniteKernal.java | 3 +
.../internal/cluster/IgniteClusterImpl.java | 3 +
.../internal/managers/GridManagerAdapter.java | 7 +-
.../deployment/GridDeploymentCommunication.java | 2 +-
.../deployment/GridDeploymentManager.java | 11 +-
.../discovery/GridDiscoveryManager.java | 64 ++-
.../processors/cache/GridCacheGateway.java | 3 +-
.../processors/cache/GridCacheUtils.java | 5 +-
.../processors/cache/IgniteCacheFutureImpl.java | 5 +
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../clock/GridClockSyncProcessor.java | 2 +-
.../datastreamer/DataStreamerImpl.java | 17 +-
.../GridCacheCountDownLatchImpl.java | 2 +-
.../processors/job/GridJobProcessor.java | 2 +-
.../internal/processors/job/GridJobWorker.java | 2 +-
.../processors/task/GridTaskProcessor.java | 41 +-
.../processors/task/GridTaskWorker.java | 59 ++-
.../ignite/internal/util/IgniteUtils.java | 19 +
.../communication/tcp/TcpCommunicationSpi.java | 16 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 90 ++--
.../spi/discovery/tcp/TcpDiscoverySpi.java | 12 +
.../IgniteClientReconnectAbstractTest.java | 3 +
...niteClientReconnectFailoverAbstractTest.java | 228 ++++++++++
.../IgniteClientReconnectFailoverTest.java | 167 +-------
.../IgniteSlowClientDetectionSelfTest.java | 1 +
.../cache/IgniteCacheDynamicStopSelfTest.java | 6 +-
.../IgniteTxExceptionAbstractSelfTest.java | 1 +
.../tcp/TcpClientDiscoverySpiSelfTest.java | 21 +
.../h2/twostep/GridReduceQueryExecutor.java | 16 +-
.../IgniteClientReconnectQueriesTest.java | 428 -------------------
...ClientReconnectCacheQueriesFailoverTest.java | 149 +++++++
.../cache/IgniteClientReconnectQueriesTest.java | 428 +++++++++++++++++++
...dCacheAbstractReduceFieldsQuerySelfTest.java | 4 +
.../IgniteCacheQuerySelfTestSuite.java | 3 -
.../IgniteCacheWithIndexingTestSuite.java | 1 +
38 files changed, 1147 insertions(+), 692 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
index 726091f..c40dd9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
@@ -33,6 +33,14 @@ public class IgniteClientDisconnectedException extends IgniteException {
/**
* @param reconnectFut Reconnect future.
* @param msg Error message.
+ */
+ public IgniteClientDisconnectedException(IgniteFuture<?> reconnectFut, String msg) {
+ this(reconnectFut, msg, null);
+ }
+
+ /**
+ * @param reconnectFut Reconnect future.
+ * @param msg Error message.
* @param cause Optional nested exception (can be {@code null}).
*/
public IgniteClientDisconnectedException(
@@ -41,8 +49,6 @@ public class IgniteClientDisconnectedException extends IgniteException {
@Nullable Throwable cause) {
super(msg, cause);
- assert reconnectFut != null;
-
this.reconnectFut = reconnectFut;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
index 62adf52..b4e0f01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
@@ -167,7 +167,7 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable {
}
catch (IgniteCheckedException e) {
// Avoid stack trace for left nodes.
- if (ctx.discovery().node(node.id()) != null && ctx.discovery().pingNode(node.id()))
+ if (ctx.discovery().node(node.id()) != null && ctx.discovery().pingNodeNoError(node.id()))
U.error(ctx.log(GridJobSiblingImpl.class), "Failed to send cancel request to node " +
"[nodeId=" + node.id() + ", ses=" + ses + ']', e);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
index b1f4df8..fa395e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
@@ -78,7 +78,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
rwLock.readUnlock();
if (state == GridKernalState.DISCONNECTED)
- throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.", null);
+ throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName);
throw illegalState();
}
@@ -92,7 +92,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
rwLock.readLock();
if (state == GridKernalState.DISCONNECTED)
- throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.", null);
+ throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 0dd3c29..0a9d093 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2186,6 +2186,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
return false;
}
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
finally {
unguard();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
index c4de2da..246eab5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
@@ -123,6 +123,9 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
try {
return ctx.discovery().pingNode(nodeId);
}
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
finally {
unguard();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 1cbe68d..9faa056 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -328,7 +328,12 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
@Override public boolean pingNode(UUID nodeId) {
A.notNull(nodeId, "nodeId");
- return ctx.discovery().pingNode(nodeId);
+ try {
+ return ctx.discovery().pingNode(nodeId);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
}
@Override public void send(ClusterNode node, Serializable msg, String topic)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
index 443b221..3b886a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
@@ -293,7 +293,7 @@ class GridDeploymentCommunication {
log.debug("Sent peer class loading response [node=" + node.id() + ", res=" + res + ']');
}
catch (IgniteCheckedException e) {
- if (ctx.discovery().pingNode(nodeId))
+ if (ctx.discovery().pingNodeNoError(nodeId))
U.error(log, "Failed to send peer class loading response to node: " + nodeId, e);
else if (log.isDebugEnabled())
log.debug("Failed to send peer class loading response to node " +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
index 9e418a5..75fb41e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
@@ -94,13 +94,7 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
comm.start();
- locStore = new GridDeploymentLocalStore(getSpi(), ctx, comm);
- ldrStore = new GridDeploymentPerLoaderStore(getSpi(), ctx, comm);
- verStore = new GridDeploymentPerVersionStore(getSpi(), ctx, comm);
-
- locStore.start();
- ldrStore.start();
- verStore.start();
+ startStores();
if (log.isDebugEnabled()) {
log.debug("Local deployment: " + locDep);
@@ -129,6 +123,9 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
storesStop();
+ if (comm != null)
+ comm.stop();
+
getSpi().setListener(null);
stopSpi();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 4a064d1..096f0e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -477,6 +477,27 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
return;
}
+ else if (type == EVT_CLIENT_NODE_DISCONNECTED) {
+ /*
+ * Notify all components from discovery thread to avoid concurrent
+ * reconnect while disconnect handling is in progress.
+ */
+
+ assert locNode.isClient() : locNode;
+ assert node.isClient() : node;
+
+ ((IgniteKernal)ctx.grid()).onDisconnected();
+
+ DiscoveryEvent evt = new DiscoveryEvent();
+
+ evt.node(ctx.discovery().localNode());
+ evt.eventNode(node);
+ evt.type(type);
+
+ ctx.event().record(evt);
+
+ return;
+ }
discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg);
}
@@ -1106,8 +1127,36 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* @param nodeId ID of the node.
* @return {@code True} if ping succeeded.
+ * @throws IgniteClientDisconnectedCheckedException If ping failed.
*/
- public boolean pingNode(UUID nodeId) {
+ public boolean pingNode(UUID nodeId) throws IgniteClientDisconnectedCheckedException {
+ assert nodeId != null;
+
+ if (!busyLock.enterBusy())
+ return false;
+
+ try {
+ return getSpi().pingNode(nodeId);
+ }
+ catch (IgniteException e) {
+ if (e.hasCause(IgniteClientDisconnectedCheckedException.class)) {
+ IgniteFuture<?> reconnectFut = ctx.cluster().clientReconnectFuture();
+
+ throw new IgniteClientDisconnectedCheckedException(reconnectFut, e.getMessage());
+ }
+
+ throw e;
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * @param nodeId ID of the node.
+ * @return {@code True} if ping succeeded.
+ */
+ public boolean pingNodeNoError(UUID nodeId) {
assert nodeId != null;
if (!busyLock.enterBusy())
@@ -1897,20 +1946,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
break;
}
- case EVT_CLIENT_NODE_DISCONNECTED: {
- assert localNode().isClient() : evt;
-
- ((IgniteKernal)ctx.grid()).onDisconnected();
-
- break;
- }
-
case EVT_CLIENT_NODE_RECONNECTED: {
assert localNode().isClient() : evt;
// TODO IGNITE-901.
((IgniteKernal)ctx.grid()).reconnected(false);
+ if (log.isInfoEnabled())
+ log.info("Client node reconnected to cluster: " + node);
+
+ ackTopology(topVer.topologyVersion(), true);
+
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index a9a73eb..da409a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -68,6 +68,7 @@ public class GridCacheGateway<K, V> {
/**
* @param lock {@code True} if lock is held.
* @param stopErr {@code True} if throw exception if stopped.
+ * @return {@code True} if cache is in started state.
*/
private boolean checkState(boolean lock, boolean stopErr) {
State state = this.state;
@@ -86,7 +87,7 @@ public class GridCacheGateway<K, V> {
assert reconnectFut != null;
throw new CacheException(
- new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.", null));
+ new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + ctx.gridName()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 6faf6e4..bd2623d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1565,8 +1565,11 @@ public class GridCacheUtils {
(IgniteClientDisconnectedCheckedException)e
: e.getCause(IgniteClientDisconnectedCheckedException.class);
- if (disconnectedErr != null)
+ if (disconnectedErr != null) {
+ assert disconnectedErr.reconnectFuture() != null : disconnectedErr;
+
e = disconnectedErr;
+ }
if (e.hasCause(CacheWriterException.class))
return new CacheWriterException(U.convertExceptionNoWrap(e));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
index 06c28e6..13af004 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
@@ -37,6 +37,11 @@ public class IgniteCacheFutureImpl<V> extends IgniteFutureImpl<V> {
/** {@inheritDoc} */
@Override protected RuntimeException convertException(IgniteCheckedException e) {
+ if (e instanceof IgniteFutureCancelledCheckedException ||
+ e instanceof IgniteInterruptedCheckedException ||
+ e instanceof IgniteFutureTimeoutCheckedException)
+ return U.convertException(e);
+
return CU.convertToCacheException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 969d7a2..f33f791 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -223,7 +223,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
final CacheConfiguration cfg = cctx.config();
- if (cfg.getRebalanceDelay() >= 0) {
+ if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
demandPool.syncFuture().listen(new CI1<Object>() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index 2920176..478426f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -295,7 +295,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
ctx.io().send(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
- if (ctx.discovery().pingNode(n.id()))
+ if (ctx.discovery().pingNodeNoError(n.id()))
U.error(log, "Failed to send time sync snapshot to remote node (did not leave grid?) " +
"[nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']');
else if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 55915f3..605f478 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1275,11 +1275,18 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
}
catch (IgniteCheckedException e) {
- if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
- ((GridFutureAdapter<Object>)fut).onDone(e);
- else
- ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
- "request (node has left): " + node.id()));
+ GridFutureAdapter<Object> fut0 = ((GridFutureAdapter<Object>)fut);
+
+ try {
+ if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
+ fut0.onDone(e);
+ else
+ fut0.onDone(new ClusterTopologyCheckedException("Failed to send request (node has left): "
+ + node.id()));
+ }
+ catch (IgniteClientDisconnectedCheckedException e0) {
+ fut0.onDone(e0);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 2d3cf13..cfc051c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -339,7 +339,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
GridCacheCountDownLatchValue latchVal = latchView.get(key);
if (latchVal == null)
- throw new IgniteCheckedException("Failed to find count down latch with given name: " + name);
+ return 0;
val = latchVal.get();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 48e9686..350068a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -1413,7 +1413,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
* @return {@code true} if node is dead, {@code false} is node is alive.
*/
private boolean isDeadNode(UUID uid) {
- return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid);
+ return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index d1ee5ad..3a309f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -863,7 +863,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
* @return {@code true} if node is dead, {@code false} is node is alive.
*/
private boolean isDeadNode(UUID uid) {
- return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid);
+ return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 65ce557..d3caf5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -121,11 +121,8 @@ public class GridTaskProcessor extends GridProcessorAdapter {
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
IgniteClientDisconnectedCheckedException err = disconnectedError(reconnectFut);
- for (GridTaskWorker<?, ?> worker : tasks.values()) {
+ for (GridTaskWorker<?, ?> worker : tasks.values())
worker.finishTask(null, err);
-
- worker.cancel();
- }
}
/**
@@ -617,31 +614,29 @@ public class GridTaskProcessor extends GridProcessorAdapter {
assert taskWorker0 == null : "Session ID is not unique: " + sesId;
- if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) {
- try {
- // Start task execution in another thread.
- if (sys)
- ctx.getSystemExecutorService().execute(taskWorker);
- else
- ctx.getExecutorService().execute(taskWorker);
- }
- catch (RejectedExecutionException e) {
- tasks.remove(sesId);
+ if (!ctx.clientDisconnected()) {
+ if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) {
+ try {
+ // Start task execution in another thread.
+ if (sys)
+ ctx.getSystemExecutorService().execute(taskWorker);
+ else
+ ctx.getExecutorService().execute(taskWorker);
+ }
+ catch (RejectedExecutionException e) {
+ tasks.remove(sesId);
- release(dep);
+ release(dep);
- handleException(new ComputeExecutionRejectedException("Failed to execute task " +
- "due to thread pool execution rejection: " + taskName, e), fut);
+ handleException(new ComputeExecutionRejectedException("Failed to execute task " +
+ "due to thread pool execution rejection: " + taskName, e), fut);
+ }
}
+ else
+ taskWorker.run();
}
else
- taskWorker.run();
-
- if (ctx.clientDisconnected()) {
taskWorker.finishTask(null, disconnectedError(null));
-
- taskWorker.cancel();
- }
}
}
else {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index eb5fa77..133a31f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -1070,10 +1070,17 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
- if (!isDeadNode(nodeId))
- U.error(log, "Failed to send cancel request to node (will ignore) [nodeId=" +
- nodeId + ", taskName=" + ses.getTaskName() +
- ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']', e);
+ try {
+ if (!isDeadNode(nodeId))
+ U.error(log, "Failed to send cancel request to node (will ignore) [nodeId=" +
+ nodeId + ", taskName=" + ses.getTaskName() +
+ ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']', e);
+ }
+ catch (IgniteClientDisconnectedCheckedException e0) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send cancel request to node, client disconnected [nodeId=" +
+ nodeId + ", taskName=" + ses.getTaskName() + ']');
+ }
}
}
}
@@ -1169,24 +1176,39 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
}
catch (IgniteCheckedException e) {
- boolean deadNode = isDeadNode(res.getNode().id());
+ IgniteException fakeErr = null;
- // Avoid stack trace if node has left grid.
- if (deadNode)
- U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " +
- "will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +
- ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');
- else
- U.error(log, "Failed to send job request: " + req, e);
+ try {
+ boolean deadNode = isDeadNode(res.getNode().id());
+
+ // Avoid stack trace if node has left grid.
+ if (deadNode) {
+ U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " +
+ "will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +
+ ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');
+
+ fakeErr = new ClusterTopologyException("Failed to send job due to node failure: " + node, e);
+ }
+ else
+ U.error(log, "Failed to send job request: " + req, e);
+
+ }
+ catch (IgniteClientDisconnectedCheckedException e0) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send job request, client disconnected [node=" + node +
+ ", taskName=" + ses.getTaskName() + ", taskSesId=" + ses.getId() + ", jobSesId=" +
+ res.getJobContext().getJobId() + ']');
+
+ fakeErr = U.convertException(e0);
+ }
GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(),
res.getJobContext().getJobId(), null, null, null, null, null, null, false);
- if (deadNode)
- fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " +
- node, e));
- else
- fakeRes.setFakeException(U.convertException(e));
+ if (fakeErr == null)
+ fakeErr = U.convertException(e);
+
+ fakeRes.setFakeException(fakeErr);
onResponse(fakeRes);
}
@@ -1345,8 +1367,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
*
* @param uid UID of node to check.
* @return {@code true} if node is dead, {@code false} is node is alive.
+ * @throws IgniteClientDisconnectedCheckedException if ping failed when client disconnected.
*/
- private boolean isDeadNode(UUID uid) {
+ private boolean isDeadNode(UUID uid) throws IgniteClientDisconnectedCheckedException {
return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 91d8172..149222e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -682,6 +682,25 @@ public abstract class IgniteUtils {
* @return Ignite runtime exception.
*/
public static IgniteException convertException(IgniteCheckedException e) {
+ IgniteClientDisconnectedException e0 = e.getCause(IgniteClientDisconnectedException.class);
+
+ if (e0 != null) {
+ assert e0.reconnectFuture() != null : e0;
+
+ throw e0;
+ }
+
+ IgniteClientDisconnectedCheckedException disconnectedErr =
+ e instanceof IgniteClientDisconnectedCheckedException ?
+ (IgniteClientDisconnectedCheckedException)e
+ : e.getCause(IgniteClientDisconnectedCheckedException.class);
+
+ if (disconnectedErr != null) {
+ assert disconnectedErr.reconnectFuture() != null : disconnectedErr;
+
+ e = disconnectedErr;
+ }
+
C1<IgniteCheckedException, IgniteException> converter = exceptionConverters.get(e.getClass());
if (converter != null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 7691e3f..8ea2b82 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2707,10 +2707,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) {
ClusterNode node = recoveryDesc.node();
- if (clients.containsKey(node.id()) ||
- !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) ||
- !getSpiContext().pingNode(node.id()))
+ try {
+ if (clients.containsKey(node.id()) ||
+ !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) ||
+ !getSpiContext().pingNode(node.id()))
+ return;
+ }
+ catch (IgniteClientDisconnectedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to ping node, client disconnected.");
+
return;
+ }
try {
if (log.isDebugEnabled())
@@ -3100,6 +3108,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param nodeId Node ID.
*/
private NodeIdMessage(UUID nodeId) {
+ assert nodeId != null;
+
nodeIdBytes = U.uuidToBytes(nodeId);
nodeIdBytesWithType = new byte[nodeIdBytes.length + 1];
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 38ba8fd..b3793b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -70,6 +70,9 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private SocketReader sockReader;
+ /** */
+ private volatile State state;
+
/** Last message ID. */
private volatile IgniteUuid lastMsgId;
@@ -255,23 +258,36 @@ class ClientImpl extends TcpDiscoveryImpl {
if (oldFut != null)
fut = oldFut;
else {
- if (spi.getSpiContext().isStopping()) {
+ State state = this.state;
+
+ if (spi.getSpiContext().isStopping() || state == State.STOPPED || state == State.SEGMENTED) {
if (pingFuts.remove(nodeId, fut))
fut.onDone(false);
return false;
}
+ else if (state == State.DISCONNECTED) {
+ if (pingFuts.remove(nodeId, fut))
+ fut.onDone(new IgniteClientDisconnectedCheckedException(null,
+ "Failed to ping node, client node disconnected."));
+ }
+ else {
+ final GridFutureAdapter<Boolean> finalFut = fut;
+
+ timer.schedule(new TimerTask() {
+ @Override public void run() {
+ if (pingFuts.remove(nodeId, finalFut)) {
+ if (ClientImpl.this.state == State.DISCONNECTED)
+ finalFut.onDone(new IgniteClientDisconnectedCheckedException(null,
+ "Failed to ping node, client node disconnected."));
+ else
+ finalFut.onDone(false);
+ }
+ }
+ }, spi.netTimeout);
- final GridFutureAdapter<Boolean> finalFut = fut;
-
- timer.schedule(new TimerTask() {
- @Override public void run() {
- if (pingFuts.remove(nodeId, finalFut))
- finalFut.onDone(false);
- }
- }, spi.netTimeout);
-
- sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
+ sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
+ }
}
}
@@ -282,7 +298,7 @@ class ClientImpl extends TcpDiscoveryImpl {
return false;
}
catch (IgniteCheckedException e) {
- throw new IgniteSpiException(e); // Should newer occur.
+ throw new IgniteSpiException(e);
}
}
@@ -953,8 +969,7 @@ class ClientImpl extends TcpDiscoveryImpl {
@Override protected void body() throws InterruptedException {
assert state == ClientImpl.State.DISCONNECTED
|| state == ClientImpl.State.CONNECTED
- || state == ClientImpl.State.STARTING :
- state;
+ || state == ClientImpl.State.STARTING : state;
boolean success = false;
@@ -976,7 +991,7 @@ class ClientImpl extends TcpDiscoveryImpl {
}
else
U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
- "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
+ "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
return;
}
@@ -1171,21 +1186,36 @@ class ClientImpl extends TcpDiscoveryImpl {
reconnector = null;
- state = ClientImpl.State.DISCONNECTED;
+ if (spi.isClientReconnectDisabled()) {
+ state = ClientImpl.State.SEGMENTED;
+
+ notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+ }
+ else {
+ state = ClientImpl.State.DISCONNECTED;
+
+ nodeAdded = false;
- nodeAdded = false;
+ IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
+ null, "Failed to ping node, client node disconnected.");
- notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+ for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
+ GridFutureAdapter<Boolean> fut = e.getValue();
+
+ if (pingFuts.remove(e.getKey(), fut))
+ fut.onDone(err);
+ }
- UUID newId = UUID.randomUUID();
+ notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
- log.info("Change node id: " + newId);
+ UUID newId = UUID.randomUUID();
- rmtNodes.clear();
+ log.info("Change node id: " + newId + " " + locNode.attribute(IgniteNodeAttributes.ATTR_GRID_NAME));
- locNode.onClientDisconnected(newId);
+ locNode.onClientDisconnected(newId);
- tryJoin();
+ tryJoin();
+ }
}
else {
TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg;
@@ -1298,11 +1328,13 @@ class ClientImpl extends TcpDiscoveryImpl {
* @return {@code True} if client in process of join.
*/
private boolean joining() {
+ ClientImpl.State state = ClientImpl.this.state;
+
return state == ClientImpl.State.STARTING || state == ClientImpl.State.DISCONNECTED;
}
/**
- * @return {@code True} if disconnected.
+ * @return {@code True} if client disconnected.
*/
private boolean disconnected() {
return state == ClientImpl.State.DISCONNECTED;
@@ -1795,17 +1827,23 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
- private volatile State state;
-
+ /**
+ *
+ */
private enum State {
+ /** */
STARTING,
+ /** */
CONNECTED,
+ /** */
DISCONNECTED,
+ /** */
SEGMENTED,
+ /** */
STOPPED
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 9446d2d..3995207 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -327,6 +327,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** */
private boolean forceSrvMode;
+ /** */
+ private boolean clientReconnectDisabled;
+
/** {@inheritDoc} */
@Override public String getSpiState() {
return impl.getSpiState();
@@ -416,6 +419,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return this;
}
+ public boolean isClientReconnectDisabled() {
+ return clientReconnectDisabled;
+ }
+
+ @IgniteSpiConfiguration(optional = true)
+ public void setClientReconnectDisabled(boolean clientReconnectDisabled) {
+ this.clientReconnectDisabled = clientReconnectDisabled;
+ }
+
/**
* Inject resources
*
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index ec043f8..8fca97c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -218,6 +218,9 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
protected IgniteFuture<?> check(CacheException e) {
log.info("Expected exception: " + e);
+ if (!(e.getCause() instanceof IgniteClientDisconnectedException))
+ log.error("Unexpected cause: " + e.getCause(), e);
+
assertTrue("Unexpected cause: " + e.getCause(), e.getCause() instanceof IgniteClientDisconnectedException);
IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
new file mode 100644
index 0000000..551cb1a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+
+import javax.cache.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public abstract class IgniteClientReconnectFailoverAbstractTest extends IgniteClientReconnectAbstractTest {
+ /** */
+ private static final Integer THREADS = 1;
+
+ /** */
+ private volatile CyclicBarrier barrier;
+
+ /** */
+ protected static final long TEST_TIME = 90_000;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int clientCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return TEST_TIME * 60_000;
+ }
+
+ /**
+ * @param c Test closure.
+ * @throws Exception If failed.
+ */
+ protected final void reconnectFailover(final Callable<Void> c) throws Exception {
+ final Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final AtomicBoolean stop = new AtomicBoolean(false);
+
+ final IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ int iter = 0;
+
+ while (!stop.get()) {
+ try {
+ c.call();
+ }
+ catch (CacheException e) {
+ checkAndWait(e);
+ }
+ catch (IgniteClientDisconnectedException e) {
+ checkAndWait(e);
+ }
+
+ if (++iter % 100 == 0)
+ log.info("Iteration: " + iter);
+
+ if (barrier != null)
+ barrier.await();
+ }
+
+ return null;
+ } catch (Throwable e) {
+ log.error("Unexpected error in operation thread: " + e, e);
+
+ stop.set(true);
+
+ throw e;
+ }
+ }
+ }, THREADS, "test-operation-thread");
+
+ final AtomicReference<CountDownLatch> disconnected = new AtomicReference<>();
+ final AtomicReference<CountDownLatch> reconnected = new AtomicReference<>();
+
+ IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ CountDownLatch latch = reconnected.get();
+
+ assertNotNull(latch);
+ assertEquals(1, latch.getCount());
+
+ latch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ CountDownLatch latch = disconnected.get();
+
+ assertNotNull(latch);
+ assertEquals(1, latch.getCount());
+
+ latch.countDown();
+ }
+
+ return true;
+ }
+ };
+
+ client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ try {
+ long stopTime = System.currentTimeMillis() + TEST_TIME;
+
+ String err = null;
+
+ while (System.currentTimeMillis() < stopTime && !fut.isDone()) {
+ U.sleep(100);
+
+ CountDownLatch disconnectLatch = new CountDownLatch(1);
+ CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ disconnected.set(disconnectLatch);
+ reconnected.set(reconnectLatch);
+
+ UUID nodeId = client.cluster().localNode().id();
+
+ log.info("Fail client: " + nodeId);
+
+ srvSpi.failNode(nodeId, null);
+
+ if (!disconnectLatch.await(5000, MILLISECONDS)) {
+ err = "Failed to wait for disconnect";
+
+ break;
+ }
+
+ if (!reconnectLatch.await(5000, MILLISECONDS)) {
+ err = "Failed to wait for reconnect";
+
+ break;
+ }
+
+ barrier = new CyclicBarrier(THREADS + 1, new Runnable() {
+ @Override public void run() {
+ barrier = null;
+ }
+ });
+
+ try {
+ barrier.await(10, SECONDS);
+ }
+ catch (TimeoutException e) {
+ err = "Operations hang or fail with unexpected error.";
+
+ break;
+ }
+ }
+
+ if (err != null) {
+ log.error(err);
+
+ U.dumpThreads(log);
+
+ CyclicBarrier barrier0 = barrier;
+
+ if (barrier0 != null)
+ barrier0.reset();
+
+ stop.set(true);
+
+ fut.get();
+
+ fail(err);
+ }
+
+ stop.set(true);
+
+ fut.get();
+ }
+ finally {
+ client.events().stopLocalListen(p);
+
+ stop.set(true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
index 35f86f5..7cfc329 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
@@ -19,37 +19,24 @@ package org.apache.ignite.internal;
import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.testframework.*;
import org.apache.ignite.transactions.*;
-import javax.cache.*;
import java.util.*;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.transactions.TransactionIsolation.*;
/**
*
*/
-public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectAbstractTest {
+public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectFailoverAbstractTest {
/** */
- public final Integer THREADS = 8;
+ protected static final String ATOMIC_CACHE = "ATOMIC_CACHE";
/** */
- private volatile CyclicBarrier barrier;
-
- /** */
- private static final String ATOMIC_CACHE = "ATOMIC_CACHE";
-
- /** */
- private static final String TX_CACHE = "TX_CACHE";
+ protected static final String TX_CACHE = "TX_CACHE";
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -72,21 +59,6 @@ public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectAbst
return cfg;
}
- /** {@inheritDoc} */
- @Override protected int serverCount() {
- return 3;
- }
-
- /** {@inheritDoc} */
- @Override protected int clientCount() {
- return 1;
- }
-
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return 2 * 60_000;
- }
-
/**
* @throws Exception If failed.
*/
@@ -199,138 +171,33 @@ public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectAbst
}
/**
- * @param c Test closure.
* @throws Exception If failed.
*/
- public void reconnectFailover(final Callable<Void> c) throws Exception {
+ public void testReconnectStreamerApi() throws Exception {
final Ignite client = grid(serverCount());
- assertTrue(client.cluster().localNode().isClient());
-
- Ignite srv = clientRouter(client);
-
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
- final AtomicBoolean stop = new AtomicBoolean(false);
-
- final IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- try {
- int iter = 0;
-
- while (!stop.get()) {
- try {
- c.call();
- }
- catch (CacheException e) {
- checkAndWait(e);
- }
- catch (IgniteClientDisconnectedException e) {
- checkAndWait(e);
- }
-
- if (++iter % 100 == 0)
- log.info("Iteration: " + iter);
-
- if (barrier != null)
- barrier.await();
- }
-
- return null;
- }
- catch (Throwable e) {
- stop.set(true);
-
- log.error("Unexpected error: " + e, e);
-
- throw e;
- }
- }
- }, THREADS, "test-operation-thread");
-
- final AtomicReference<CountDownLatch> reconnected = new AtomicReference<>();
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- CountDownLatch latch = reconnected.get();
-
- assertNotNull(latch);
- assertEquals(1, latch.getCount());
+ reconnectFailover(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ stream(ATOMIC_CACHE);
- latch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED)
- info("Disconnected: " + evt);
+ stream(TX_CACHE);
- return true;
+ return null;
}
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- try {
- long stopTime = System.currentTimeMillis() + 60_000;
-
- String err = null;
- while (System.currentTimeMillis() < stopTime && !fut.isDone()) {
- U.sleep(100);
-
- CountDownLatch latch = new CountDownLatch(1);
-
- reconnected.set(latch);
-
- UUID nodeId = client.cluster().localNode().id();
-
- log.info("Fail client: " + nodeId);
-
- srvSpi.failNode(nodeId, null);
-
- if (!latch.await(5000, MILLISECONDS)) {
- err = "Failed to wait for reconnect";
-
- break;
- }
+ private void stream(String cacheName) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
- barrier = new CyclicBarrier(THREADS + 1, new Runnable() {
- @Override public void run() {
- barrier = null;
- }
- });
+ try (IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(cacheName)) {
+ streamer.allowOverwrite(true);
- try {
- barrier.await(10, SECONDS);
- }
- catch (TimeoutException e) {
- err = "Operation hangs.";
+ streamer.perNodeBufferSize(10);
- break;
+ for (int i = 0; i < 100; i++)
+ streamer.addData(rnd.nextInt(100_000), 0);
}
}
-
- if (err != null) {
- log.error(err);
-
- U.dumpThreads(log);
-
- CyclicBarrier barrier0 = barrier;
-
- if (barrier0 != null)
- barrier0.reset();
-
- stop.set(true);
-
- fail(err);
- }
-
- stop.set(true);
-
- fut.get();
- }
- finally {
- stop.set(true);
- }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
index 27c2a61..a392245 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -62,6 +62,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
IgniteConfiguration cfg = super.getConfiguration(gridName);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setClientReconnectDisabled(true);
if (getTestGridName(nodeCount() - 1).equals(gridName) || getTestGridName(nodeCount() - 2).equals(gridName))
cfg.setClientMode(true);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
index 071341e..8703d32 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -27,7 +26,7 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.testframework.*;
import org.apache.ignite.testframework.junits.common.*;
-import javax.cache.Cache;
+import javax.cache.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -89,7 +88,8 @@ public class IgniteCacheDynamicStopSelfTest extends GridCommonAbstractTest {
@Override public void apply(IgniteFuture<?> f) {
try {
f.get();
- } catch (IgniteException ignore) {
+ }
+ catch (CacheException ignore) {
// This may be debugged.
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
index af3ea9d..30bf5dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.transactions.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.indexing.*;
import org.apache.ignite.testframework.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 55fae9b..ba38dfc 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -1332,6 +1332,27 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectDisabled() throws Exception {
+ // TODO IGNTIE-901.
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDisconnectAfterNetworkTimeout() throws Exception {
+ // TODO IGNTIE-901.
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectSegmentedAfterJoinTimeout() throws Exception {
+ // TODO IGNTIE-901.
+ }
+
+ /**
* @param clientIdx Client index.
* @param srvIdx Server index.
* @throws Exception In case of error.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 64e16bf..b531c35 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -492,7 +492,7 @@ public class GridReduceQueryExecutor {
if (ctx.clientDisconnected()) {
throw new CacheException("Query was cancelled, client node disconnected.",
new IgniteClientDisconnectedException(ctx.cluster().clientReconnectFuture(),
- "Client node disconnected.", null));
+ "Client node disconnected."));
}
Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
@@ -573,7 +573,17 @@ public class GridReduceQueryExecutor {
if (e instanceof CacheException)
throw (CacheException)e;
- throw new CacheException("Failed to run reduce query locally.", e);
+ Throwable cause = e;
+
+ if (e instanceof IgniteCheckedException) {
+ Throwable disconnectedErr =
+ ((IgniteCheckedException)e).getCause(IgniteClientDisconnectedException.class);
+
+ if (disconnectedErr != null)
+ cause = disconnectedErr;
+ }
+
+ throw new CacheException("Failed to run reduce query locally.", cause);
}
finally {
if (!runs.remove(qryReqId, r))
@@ -1109,7 +1119,7 @@ public class GridReduceQueryExecutor {
*/
public void onDisconnected(IgniteFuture<?> reconnectFut) {
CacheException err = new CacheException("Query was cancelled, client node disconnected.",
- new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.", null));
+ new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."));
for (Map.Entry<Long, QueryRun> e : runs.entrySet())
e.getValue().state(err, null);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java
deleted file mode 100644
index b0dc965..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.cache.query.annotations.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.query.*;
-import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.testframework.*;
-
-import javax.cache.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-
-/**
- *
- */
-public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstractTest {
- /** */
- public static final String QUERY_CACHE = "query";
-
- /** {@inheritDoc} */
- @Override protected int serverCount() {
- return 3;
- }
-
- /** {@inheritDoc} */
- @Override protected int clientCount() {
- return 1;
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- CacheConfiguration<Integer, Person> ccfg = new CacheConfiguration<Integer, Person>(QUERY_CACHE)
- .setCacheMode(PARTITIONED)
- .setAtomicityMode(ATOMIC)
- .setBackups(1)
- .setIndexedTypes(Integer.class, Person.class);
-
- cfg.setCacheConfiguration(ccfg);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- super.afterTest();
-
- grid(0).getOrCreateCache(QUERY_CACHE).removeAll();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testQueryReconnect() throws Exception {
- Ignite cln = grid(serverCount());
-
- assertTrue(cln.cluster().localNode().isClient());
-
- final Ignite srv = clientRouter(cln);
-
- final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
-
- final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE);
-
- clnCache.put(1, new Person(1, "name1", "surname1"));
- clnCache.put(2, new Person(2, "name2", "surname2"));
- clnCache.put(3, new Person(3, "name3", "surname3"));
-
- final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key <> 0");
-
- qry.setPageSize(1);
-
- QueryCursor<Cache.Entry<Integer, Person>> cur = clnCache.query(qry);
-
- reconnectClientNode(cln, srv, new Runnable() {
- @Override public void run() {
- srvCache.put(4, new Person(4, "name4", "surname4"));
-
- try {
- clnCache.query(qry);
-
- fail();
- } catch (CacheException e) {
- check(e);
- }
- }
- });
-
- List<Cache.Entry<Integer, Person>> res = cur.getAll();
-
- assertNotNull(res);
- assertEquals(4, res.size());
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testReconnectQueryInProgress() throws Exception {
- Ignite cln = grid(serverCount());
-
- assertTrue(cln.cluster().localNode().isClient());
-
- final Ignite srv = clientRouter(cln);
-
- final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
-
- clnCache.put(1, new Person(1, "name1", "surname1"));
- clnCache.put(2, new Person(2, "name2", "surname2"));
- clnCache.put(3, new Person(3, "name3", "surname3"));
-
- blockMessage(GridQueryNextPageResponse.class);
-
- final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key <> 0");
-
- qry.setPageSize(1);
-
- final QueryCursor<Cache.Entry<Integer, Person>> cur1 = clnCache.query(qry);
-
- final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- try {
- cur1.getAll();
- }
- catch (CacheException e) {
- checkAndWait(e);
-
- return true;
- }
-
- return false;
- }
- });
-
- // Check that client waiting operation.
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- return fut.get(200);
- }
- }, IgniteFutureTimeoutCheckedException.class, null);
-
- assertNotDone(fut);
-
- unblockMessage();
-
- reconnectClientNode(cln, srv, null);
-
- assertTrue((Boolean) fut.get(2, SECONDS));
-
- QueryCursor<Cache.Entry<Integer, Person>> cur2 = clnCache.query(qry);
-
- assertEquals(3, cur2.getAll().size());
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testScanQueryReconnect() throws Exception {
- Ignite cln = grid(serverCount());
-
- assertTrue(cln.cluster().localNode().isClient());
-
- final Ignite srv = clientRouter(cln);
-
- final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
-
- final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE);
-
- for (int i = 0; i < 100; i++)
- clnCache.put(i, new Person(i, "name-" + i, "surname-" + i));
-
- final ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
-
- scanQry.setPageSize(1);
-
- scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() {
- @Override public boolean apply(Integer integer, Person person) {
- return true;
- }
- });
-
- QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry);
-
- reconnectClientNode(cln, srv, new Runnable() {
- @Override public void run() {
- srvCache.put(1000, new Person(1000, "name", "surname"));
-
- try {
- clnCache.query(scanQry);
-
- fail();
- }
- catch (CacheException e) {
- check(e);
- }
- }
- });
-
- try {
- qryCursor.getAll();
-
- fail();
- }
- catch (CacheException e) {
- checkAndWait(e);
- }
-
- qryCursor = clnCache.query(scanQry);
-
- assertEquals(101, qryCursor.getAll().size());
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testScanQueryReconnectInProgress1() throws Exception {
- scanQueryReconnectInProgress(false);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testScanQueryReconnectInProgress2() throws Exception {
- scanQueryReconnectInProgress(true);
- }
-
- /**
- * @param setPart If {@code true} sets partition for scan query.
- * @throws Exception If failed.
- */
- private void scanQueryReconnectInProgress(boolean setPart) throws Exception {
- Ignite cln = grid(serverCount());
-
- assertTrue(cln.cluster().localNode().isClient());
-
- final Ignite srv = clientRouter(cln);
-
- final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
-
- clnCache.put(1, new Person(1, "name1", "surname1"));
- clnCache.put(2, new Person(2, "name2", "surname2"));
- clnCache.put(3, new Person(3, "name3", "surname3"));
-
- final ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
-
- scanQry.setPageSize(1);
-
- scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() {
- @Override public boolean apply(Integer integer, Person person) {
- return true;
- }
- });
-
- if (setPart)
- scanQry.setPartition(1);
-
- blockMessage(GridCacheQueryResponse.class);
-
- final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- try {
- QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry);
-
- qryCursor.getAll();
- }
- catch (CacheException e) {
- checkAndWait(e);
-
- return true;
- }
-
- return false;
- }
- });
-
- // Check that client waiting operation.
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- return fut.get(200);
- }
- }, IgniteFutureTimeoutCheckedException.class, null);
-
- assertNotDone(fut);
-
- unblockMessage();
-
- reconnectClientNode(cln, srv, null);
-
- assertTrue((Boolean)fut.get(2, SECONDS));
-
- QueryCursor<Cache.Entry<Integer, Person>> qryCursor2 = clnCache.query(scanQry);
-
- assertEquals(setPart ? 1 : 3, qryCursor2.getAll().size());
- }
-
- /**
- * @param clazz Message class.
- */
- private void blockMessage(Class<?> clazz) {
- for (int i = 0; i < serverCount(); i++) {
- BlockTpcCommunicationSpi commSpi = commSpi(grid(i));
-
- commSpi.blockMessage(clazz);
- }
- }
-
- /**
- *
- */
- private void unblockMessage() {
- for (int i = 0; i < serverCount(); i++) {
- BlockTpcCommunicationSpi commSpi = commSpi(grid(i));
-
- commSpi.unblockMessage();
- }
- }
-
- /**
- *
- */
- public static class Person {
- /** */
- @QuerySqlField
- public int id;
-
- /** */
- @QuerySqlField
- public String name;
-
- /** */
- @QuerySqlField
- public String surname;
-
- /**
- * @param id Id.
- * @param name Name.
- * @param surname Surname.
- */
- public Person(int id, String name, String surname) {
- this.id = id;
- this.name = name;
- this.surname = surname;
- }
-
- /**
- * @return Id.
- */
- public int getId() {
- return id;
- }
-
- /**
- * @param id Set id.
- */
- public void setId(int id) {
- this.id = id;
- }
-
- /**
- * @return Name.
- */
- public String getName() {
- return name;
- }
-
- /**
- * @param name Name.
- */
- public void setName(String name) {
- this.name = name;
- }
-
- /**
- * @return Surname.
- */
- public String getSurname() {
- return surname;
- }
-
- /**
- * @param surname Surname.
- */
- public void setSurname(String surname) {
- this.surname = surname;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- return this == o || !(o == null || getClass() != o.getClass()) && id == ((Person)o).id;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return id;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(Person.class, this);
- }
- }
-}