You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/07 16:47:40 UTC
[3/3] incubator-ignite git commit: # ignite-901
# ignite-901
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/89fb3951
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/89fb3951
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/89fb3951
Branch: refs/heads/ignite-901
Commit: 89fb3951a3b2d96488f45d352d3867a4e2b0ed43
Parents: 55bd754
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jul 7 10:49:43 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jul 7 17:45:13 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 2 +-
.../discovery/GridDiscoveryManager.java | 2 +-
.../cache/CacheOsConflictResolutionManager.java | 6 +
.../processors/cache/GridCacheAdapter.java | 4 +-
.../cache/GridCacheConcurrentMap.java | 15 +-
.../processors/cache/GridCacheGateway.java | 3 +-
.../processors/cache/GridCacheManager.java | 6 +
.../cache/GridCacheManagerAdapter.java | 6 +
.../GridCachePartitionExchangeManager.java | 5 +-
.../processors/cache/GridCacheProcessor.java | 19 +-
.../distributed/near/GridNearCacheAdapter.java | 7 +-
.../cache/dr/GridOsCacheDrManager.java | 7 +-
.../query/GridCacheDistributedQueryManager.java | 22 +
.../cache/query/GridCacheQueryAdapter.java | 11 +-
.../query/GridCacheQueryFutureAdapter.java | 2 +-
.../continuous/CacheContinuousQueryHandler.java | 5 +
.../continuous/GridContinuousProcessor.java | 95 ++-
.../datastreamer/DataStreamProcessor.java | 17 +-
.../datastreamer/DataStreamerImpl.java | 68 +-
.../processors/query/GridQueryIndexing.java | 7 +
.../processors/query/GridQueryProcessor.java | 6 +
.../service/GridServiceProcessor.java | 28 +
.../ignite/internal/util/lang/GridFunc.java | 2 +
.../ignite/spi/discovery/tcp/ClientImpl.java | 7 +-
.../IgniteClientReconnectAbstractTest.java | 28 +-
.../IgniteClientReconnectApiBlockTest.java | 845 -------------------
.../IgniteClientReconnectApiExceptionTest.java | 845 +++++++++++++++++++
.../IgniteClientReconnectAtomicsTest.java | 16 +-
.../IgniteClientReconnectCacheTest.java | 11 +
.../IgniteClientReconnectCollectionsTest.java | 12 +-
.../IgniteClientReconnectComputeTest.java | 14 +-
...eClientReconnectContinuousProcessorTest.java | 60 +-
.../IgniteClientReconnectFailoverTest.java | 371 ++++----
.../IgniteClientReconnectServicesTest.java | 14 +-
.../IgniteClientReconnectStreamerTest.java | 96 ++-
.../IgniteClientReconnectTestSuite.java | 2 +-
.../processors/query/h2/IgniteH2Indexing.java | 5 +
.../query/h2/twostep/GridMergeIndex.java | 48 +-
.../h2/twostep/GridReduceQueryExecutor.java | 44 +-
.../IgniteClientReconnectQueriesTest.java | 164 +++-
40 files changed, 1719 insertions(+), 1208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 4af69f7..0dd3c29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2803,7 +2803,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/**
*
*/
- public void disconnected() {
+ public void onDisconnected() {
Throwable err = null;
GridFutureAdapter<?> reconnectFut = ctx.gateway().onDisconnected();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index f95788a..4a064d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1900,7 +1900,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
case EVT_CLIENT_NODE_DISCONNECTED: {
assert localNode().isClient() : evt;
- ((IgniteKernal)ctx.grid()).disconnected();
+ ((IgniteKernal)ctx.grid()).onDisconnected();
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOsConflictResolutionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOsConflictResolutionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOsConflictResolutionManager.java
index 29e50b6..9e765d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOsConflictResolutionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOsConflictResolutionManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.lang.*;
/**
* OS conflict resolver manager.
@@ -55,4 +56,9 @@ public class CacheOsConflictResolutionManager<K ,V> implements CacheConflictReso
@Override public void printMemoryStats() {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ // No-op.
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 5ee88a9..8a8e096 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -212,7 +212,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
@SuppressWarnings("OverriddenMethodCallDuringObjectConstruction")
protected GridCacheAdapter(GridCacheContext<K, V> ctx, int startSize) {
- this(ctx, new GridCacheConcurrentMap(ctx, startSize, 0.75F));
+ this(ctx, new GridCacheConcurrentMap(ctx, startSize, 0.75F, null));
}
/**
@@ -4508,7 +4508,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
*
*/
- public void disconnected() {
+ public void onDisconnected() {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
index db5eed1..966dcc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
@@ -279,11 +279,17 @@ public class GridCacheConcurrentMap {
* @param loadFactor the load factor threshold, used to control resizing.
* Resizing may be performed when the average number of elements per
* bin exceeds this threshold.
+ * @param factory Entries factory.
* @throws IllegalArgumentException if the initial capacity of
* elements is negative or the load factor is non-positive.
*/
- public GridCacheConcurrentMap(GridCacheContext ctx, int initCap, float loadFactor) {
+ public GridCacheConcurrentMap(GridCacheContext ctx,
+ int initCap,
+ float loadFactor,
+ @Nullable GridCacheMapEntryFactory factory) {
this(ctx, initCap, loadFactor, DFLT_CONCUR_LEVEL);
+
+ this.factory = factory;
}
/**
@@ -312,6 +318,13 @@ public class GridCacheConcurrentMap {
}
/**
+ * @return Entries factory.
+ */
+ public GridCacheMapEntryFactory getEntryFactory() {
+ return factory;
+ }
+
+ /**
* @return Non-internal predicate.
*/
private static CacheEntryPredicate[] nonInternal() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index 9e0706e..a9a73eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -85,7 +85,8 @@ public class GridCacheGateway<K, V> {
else {
assert reconnectFut != null;
- throw new CacheException(new IgniteClientDisconnectedException(reconnectFut, "Client disconnected,", null));
+ throw new CacheException(
+ new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.", null));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManager.java
index 775daf5..ae7e9d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManager.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
/**
* Interface for cache managers.
@@ -49,6 +50,11 @@ public interface GridCacheManager<K, V> {
public void onKernalStop(boolean cancel);
/**
+ * @param reconnectFut Reconnect future.
+ */
+ public void onDisconnected(IgniteFuture<?> reconnectFut);
+
+ /**
* Prints memory statistics (sizes of internal data structures, etc.).
*
* NOTE: this method is for testing and profiling purposes only.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
index 52fade8..54b1915 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import java.util.concurrent.atomic.*;
@@ -127,6 +128,11 @@ public class GridCacheManagerAdapter<K, V> implements GridCacheManager<K, V> {
}
/** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void printMemoryStats() {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e091c67..47bb279 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -331,8 +331,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class);
cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class);
- IgniteCheckedException err = cctx.kernalContext().gateway().getState() == DISCONNECTED ?
- new IgniteClientDisconnectedCheckedException(null, "Node disconnected: " + cctx.gridName()) :
+ IgniteCheckedException err = cctx.kernalContext().clientDisconnected() ?
+ new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(),
+ "Client node disconnected: " + cctx.gridName()) :
new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName());
// Finish all exchange futures.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index b505e51..79b9e00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -923,13 +923,24 @@ public class GridCacheProcessor extends GridProcessorAdapter {
registeredTemplates.clear();
- for (GridCacheAdapter cache : caches.values())
- cache.context().gate().onDisconnected(reconnectFut);
+ for (GridCacheAdapter cache : caches.values()) {
+ GridCacheContext cctx = cache.context();
- for (GridCacheAdapter cache : caches.values())
- cache.disconnected();
+ cctx.gate().onDisconnected(reconnectFut);
+
+ List<GridCacheManager> mgrs = cache.context().managers();
+
+ for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) {
+ GridCacheManager mgr = it.previous();
+
+ mgr.onDisconnected(reconnectFut);
+ }
+ }
sharedCtx.onDisconnected(reconnectFut);
+
+ for (GridCacheAdapter cache : caches.values())
+ cache.onDisconnected();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index a7e3f4b..688299a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -90,8 +90,11 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
public abstract GridDhtCacheAdapter<K, V> dht();
/** {@inheritDoc} */
- @Override public void disconnected() {
- map = new GridCacheConcurrentMap(ctx, ctx.config().getNearConfiguration().getNearStartSize(), 0.75F);
+ @Override public void onDisconnected() {
+ map = new GridCacheConcurrentMap(ctx,
+ ctx.config().getNearConfiguration().getNearStartSize(),
+ 0.75F,
+ map.getEntryFactory());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
index 00ed020..7f0a568 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
@@ -20,9 +20,9 @@ package org.apache.ignite.internal.processors.cache.dr;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.dr.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.dr.*;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
/**
@@ -103,4 +103,9 @@ public class GridOsCacheDrManager implements GridCacheDrManager {
@Override public boolean receiveEnabled() {
return false;
}
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture reconnectFut) {
+ // No-op.
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 2b93144..316713f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -110,6 +110,20 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
}
/** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
+ "Query was cancelled, client node disconnected.");
+
+ for (Map.Entry<Long, GridCacheDistributedQueryFuture<?, ?, ?>> e : futs.entrySet()) {
+ GridCacheDistributedQueryFuture<?, ?, ?> fut = e.getValue();
+
+ fut.onPage(null, null, err, true);
+
+ futs.remove(e.getKey(), fut);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void printMemoryStats() {
super.printMemoryStats();
@@ -125,6 +139,14 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
*/
protected void addQueryFuture(long reqId, GridCacheDistributedQueryFuture<?, ?, ?> fut) {
futs.put(reqId, fut);
+
+ if (cctx.kernalContext().clientDisconnected()) {
+ IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
+ cctx.kernalContext().cluster().clientReconnectFuture(),
+ "Query was cancelled, client node disconnected.");
+
+ fut.onDone(err);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 5b82c34..18738ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -551,6 +551,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/**
* @param nodes Nodes.
+ * @return Nodes for query execution.
*/
private Queue<ClusterNode> fallbacks(Collection<ClusterNode> nodes) {
Queue<ClusterNode> fallbacks = new LinkedList<>();
@@ -568,18 +569,22 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/**
*
*/
+ @SuppressWarnings("unchecked")
private void init() {
ClusterNode node = nodes.poll();
- GridCacheQueryFutureAdapter<?, ?, R> fut0 =
- (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? qryMgr.queryLocal(bean) :
- qryMgr.queryDistributed(bean, Collections.singleton(node)));
+ GridCacheQueryFutureAdapter<?, ?, R> fut0 = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ?
+ qryMgr.queryLocal(bean) :
+ qryMgr.queryDistributed(bean, Collections.singleton(node)));
fut0.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() {
@Override public void apply(IgniteInternalFuture<Collection<R>> fut) {
try {
onDone(fut.get());
}
+ catch (IgniteClientDisconnectedCheckedException e) {
+ onDone(e);
+ }
catch (IgniteCheckedException e) {
if (F.isEmpty(nodes))
onDone(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index a8bace0..53017c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -163,7 +163,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
return null;
}
catch (IgniteCheckedException e) {
- throw new IgniteException(e);
+ throw CU.convertToCacheException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index ff2905f..d5309dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -438,6 +438,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
taskHash = in.readInt();
}
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheContinuousQueryHandler.class, this);
+ }
+
/**
* @param ctx Kernal context.
* @return Cache context.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 3c414e4..defcd3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -153,21 +153,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.event().addLocalEventListener(new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
- for (Iterator<StartFuture> itr = startFuts.values().iterator(); itr.hasNext(); ) {
- StartFuture fut = itr.next();
-
- itr.remove();
-
- fut.onDone(new IgniteException("Topology segmented"));
- }
-
- for (Iterator<StopFuture> itr = stopFuts.values().iterator(); itr.hasNext(); ) {
- StopFuture fut = itr.next();
-
- itr.remove();
-
- fut.onDone(new IgniteException("Topology segmented"));
- }
+ cancelFutures(new IgniteCheckedException("Topology segmented"));
}
}, EVT_NODE_SEGMENTED);
@@ -263,6 +249,27 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
/**
+ * @param e Error.
+ */
+ private void cancelFutures(IgniteCheckedException e) {
+ for (Iterator<StartFuture> itr = startFuts.values().iterator(); itr.hasNext(); ) {
+ StartFuture fut = itr.next();
+
+ itr.remove();
+
+ fut.onDone(e);
+ }
+
+ for (Iterator<StopFuture> itr = stopFuts.values().iterator(); itr.hasNext(); ) {
+ StopFuture fut = itr.next();
+
+ itr.remove();
+
+ fut.onDone(e);
+ }
+ }
+
+ /**
* @return {@code true} if lock successful, {@code false} if processor already stopped.
*/
@SuppressWarnings("LockAcquiredButNotSafelyReleased")
@@ -326,8 +333,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
UUID routineId = e.getKey();
LocalRoutineInfo info = e.getValue();
- data.addItem(new DiscoveryDataItem(routineId, info.prjPred,
- info.hnd, info.bufSize, info.interval));
+ data.addItem(new DiscoveryDataItem(routineId,
+ info.prjPred,
+ info.hnd,
+ info.bufSize,
+ info.interval,
+ info.autoUnsubscribe));
}
return data;
@@ -337,7 +348,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public void onDiscoveryDataReceived(UUID nodeId, UUID rmtNodeId, Serializable obj) {
+ @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable obj) {
DiscoveryData data = (DiscoveryData)obj;
if (!ctx.isDaemon() && data != null) {
@@ -491,7 +502,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
// Register routine locally.
- locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval));
+ locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe));
StartFuture fut = new StartFuture(ctx, routineId);
@@ -580,6 +591,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param obj Notification object.
* @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent.
* @param sync If {@code true} then waits for event acknowledgment.
+ * @param msg If {@code true} then sent data is message.
* @throws IgniteCheckedException In case of error.
*/
public void addNotification(UUID nodeId,
@@ -632,6 +644,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ cancelFutures(new IgniteClientDisconnectedCheckedException(reconnectFut, "Client node disconnected."));
+
for (UUID rmtId : rmtInfos.keySet())
unregisterRemote(rmtId);
@@ -714,8 +728,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
assert old == null;
}
- clientRouteMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(), hnd, data.bufferSize(),
- data.interval()));
+ clientRouteMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(),
+ hnd,
+ data.bufferSize(),
+ data.interval(),
+ data.autoUnsubscribe()));
}
boolean registered = false;
@@ -1033,14 +1050,22 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** Time interval. */
private final long interval;
+ /** Automatic unsubscribe flag. */
+ private boolean autoUnsubscribe;
+
/**
* @param prjPred Projection predicate.
* @param hnd Continuous routine handler.
* @param bufSize Buffer size.
* @param interval Interval.
+ * @param autoUnsubscribe Automatic unsubscribe flag.
*/
- LocalRoutineInfo(@Nullable IgnitePredicate<ClusterNode> prjPred, GridContinuousHandler hnd, int bufSize,
- long interval) {
+ LocalRoutineInfo(@Nullable IgnitePredicate<ClusterNode> prjPred,
+ GridContinuousHandler hnd,
+ int bufSize,
+ long interval,
+ boolean autoUnsubscribe)
+ {
assert hnd != null;
assert bufSize > 0;
assert interval >= 0;
@@ -1049,6 +1074,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
this.hnd = hnd;
this.bufSize = bufSize;
this.interval = interval;
+ this.autoUnsubscribe = autoUnsubscribe;
}
/**
@@ -1057,6 +1083,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
GridContinuousHandler handler() {
return hnd;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(LocalRoutineInfo.class, this);
+ }
}
/**
@@ -1064,7 +1095,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
*/
private static class RemoteRoutineInfo {
/** Master node ID. */
- private final UUID nodeId;
+ private UUID nodeId;
/** Continuous routine handler. */
private final GridContinuousHandler hnd;
@@ -1216,6 +1247,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
return F.t(toSnd, diff < interval ? interval - diff : interval);
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RemoteRoutineInfo.class, this);
+ }
}
/**
@@ -1321,9 +1357,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param hnd Handler.
* @param bufSize Buffer size.
* @param interval Time interval.
+ * @param autoUnsubscribe Automatic unsubscribe flag.
*/
- DiscoveryDataItem(UUID routineId, @Nullable IgnitePredicate<ClusterNode> prjPred,
- GridContinuousHandler hnd, int bufSize, long interval) {
+ DiscoveryDataItem(UUID routineId,
+ @Nullable IgnitePredicate<ClusterNode> prjPred,
+ GridContinuousHandler hnd,
+ int bufSize,
+ long interval,
+ boolean autoUnsubscribe)
+ {
assert routineId != null;
assert hnd != null;
assert bufSize > 0;
@@ -1334,6 +1376,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
this.hnd = hnd;
this.bufSize = bufSize;
this.interval = interval;
+ this.autoUnsubscribe = autoUnsubscribe;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index ee95019..74734bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -64,13 +64,15 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
public DataStreamProcessor(GridKernalContext ctx) {
super(ctx);
- ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
- assert msg instanceof DataStreamerRequest;
+ if (!ctx.clientNode()) {
+ ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof DataStreamerRequest;
- processRequest(nodeId, (DataStreamerRequest)msg);
- }
- });
+ processRequest(nodeId, (DataStreamerRequest)msg);
+ }
+ });
+ }
marsh = ctx.config().getMarshaller();
}
@@ -114,7 +116,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
if (ctx.config().isDaemon())
return;
- ctx.io().removeMessageListener(TOPIC_DATASTREAM);
+ if (!ctx.clientNode())
+ ctx.io().removeMessageListener(TOPIC_DATASTREAM);
busyLock.block();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index b0be06d..55915f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -145,6 +145,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** Busy lock. */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+ /** */
+ private CacheException disconnectErr;
+
/** Closed flag. */
private final AtomicBoolean closed = new AtomicBoolean();
@@ -245,7 +248,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
fut = new DataStreamerFuture(this);
- publicFut = new IgniteFutureImpl<>(fut);
+ publicFut = new IgniteCacheFutureImpl<>(fut);
}
/**
@@ -284,8 +287,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* Enters busy lock.
*/
private void enterBusy() {
- if (!busyLock.enterBusy())
+ if (!busyLock.enterBusy()) {
+ if (disconnectErr != null)
+ throw disconnectErr;
+
throw new IllegalStateException("Data streamer has been closed.");
+ }
}
/**
@@ -435,7 +442,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
load0(entries0, resFut, keys, 0);
- return new IgniteFutureImpl<>(resFut);
+ return new IgniteCacheFutureImpl<>(resFut);
}
catch (IgniteException e) {
return new IgniteFinishedFutureImpl<>(e);
@@ -487,7 +494,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
load0(entries, resFut, keys, 0);
- return new IgniteFutureImpl<>(resFut);
+ return new IgniteCacheFutureImpl<>(resFut);
}
catch (Throwable e) {
resFut.onDone(e);
@@ -631,6 +638,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
resFut.onDone();
}
}
+ catch (IgniteClientDisconnectedCheckedException e1) {
+ if (log.isDebugEnabled())
+ log.debug("Future finished with disconnect error [nodeId=" + nodeId + ", err=" + e1 + ']');
+
+ resFut.onDone(e1);
+ }
catch (IgniteCheckedException e1) {
if (log.isDebugEnabled())
log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
@@ -757,6 +770,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
try {
fut.get();
}
+ catch (IgniteClientDisconnectedCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to flush buffer: " + e);
+
+ throw CU.convertToCacheException(e);
+ }
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to flush buffer: " + e);
@@ -802,7 +821,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
doFlush();
}
catch (IgniteCheckedException e) {
- throw GridCacheUtils.convertToCacheException(e);
+ throw CU.convertToCacheException(e);
}
finally {
leaveBusy();
@@ -843,7 +862,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
closeEx(cancel);
}
catch (IgniteCheckedException e) {
- throw GridCacheUtils.convertToCacheException(e);
+ throw CU.convertToCacheException(e);
}
}
@@ -852,6 +871,15 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* @throws IgniteCheckedException If failed.
*/
public void closeEx(boolean cancel) throws IgniteCheckedException {
+ closeEx(cancel, null);
+ }
+
+ /**
+ * @param cancel {@code True} to close with cancellation.
+ * @param err Error.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void closeEx(boolean cancel, IgniteCheckedException err) throws IgniteCheckedException {
if (!closed.compareAndSet(false, true))
return;
@@ -868,7 +896,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
cancelled = true;
for (Buffer buf : bufMappings.values())
- buf.cancelAll();
+ buf.cancelAll(err);
}
else
doFlush();
@@ -881,7 +909,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
e = e0;
}
- fut.onDone(null, e);
+ fut.onDone(null, e != null ? e : err);
if (e != null)
throw e;
@@ -889,9 +917,18 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/**
* @param reconnectFut Reconnect future.
+ * @throws IgniteCheckedException If failed.
*/
- public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
+ "Data streamer has been closed, client node disconnected.");
+ disconnectErr = (CacheException)CU.convertToCacheException(err);
+
+ for (Buffer buf : bufMappings.values())
+ buf.cancelAll(err);
+
+ closeEx(true, err);
}
/**
@@ -1034,7 +1071,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
submit(entries0, topVer, curFut0);
if (cancelled)
- curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this));
+ curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
+ DataStreamerImpl.this));
+ else if (ctx.clientDisconnected())
+ curFut0.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+ "Client node disconnected."));
}
return curFut0;
@@ -1311,10 +1352,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
/**
- *
+ * @param err Error.
*/
- void cancelAll() {
- IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this);
+ void cancelAll(@Nullable IgniteCheckedException err) {
+ if (err == null)
+ err = new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this);
for (IgniteInternalFuture<?> f : locFuts) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 0cbb77a..8639bc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -246,4 +246,11 @@ public interface GridQueryIndexing {
* @return Backup filter.
*/
public IndexingQueryFilter backupFilter(List<String> caches, AffinityTopologyVersion topVer, int[] parts);
+
+ /**
+ * Client disconnected callback.
+ *
+ * @param reconnectFut Reconnect future.
+ */
+ public void onDisconnected(IgniteFuture<?> reconnectFut);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index e080c6d..85b505a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -234,6 +234,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
idx.stop();
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ if (idx != null)
+ idx.onDisconnected(reconnectFut);
+ }
+
/**
* @param cctx Cache context.
* @throws IgniteCheckedException If failed.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 76ea73b..78b09e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -211,6 +211,27 @@ public class GridServiceProcessor extends GridProcessorAdapter {
log.debug("Stopped service processor.");
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ for (Map.Entry<String, GridServiceDeploymentFuture> e : depFuts.entrySet()) {
+ GridServiceDeploymentFuture fut = e.getValue();
+
+ fut.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+ "Failed to deploy service, client node disconnected."));
+
+ depFuts.remove(e.getKey(), fut);
+ }
+
+ for (Map.Entry<String, GridFutureAdapter<?>> e : undepFuts.entrySet()) {
+ GridFutureAdapter fut = e.getValue();
+
+ fut.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+ "Failed to undeploy service, client node disconnected."));
+
+ undepFuts.remove(e.getKey(), fut);
+ }
+ }
+
/**
* Validates service configuration.
*
@@ -330,6 +351,13 @@ public class GridServiceProcessor extends GridProcessorAdapter {
return old;
}
+ if (ctx.clientDisconnected()) {
+ fut.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+ "Failed to deploy service, client node disconnected."));
+
+ depFuts.remove(cfg.getName(), fut);
+ }
+
while (true) {
try {
GridServiceDeploymentKey key = new GridServiceDeploymentKey(cfg.getName());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 6f544e0..f3bcab0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -1570,6 +1570,7 @@ public class GridFunc {
* @param <T> Type of the collection.
* @return Light-weight view on given collection with provided predicate.
*/
+ @SafeVarargs
public static <T> Collection<T> view(@Nullable final Collection<T> c,
@Nullable final IgnitePredicate<? super T>... p) {
if (isEmpty(c) || isAlwaysFalse(p))
@@ -2706,6 +2707,7 @@ public class GridFunc {
* @param <T> Type of the free variable, i.e. the element the predicate is called on.
* @return Negated predicate.
*/
+ @SafeVarargs
public static <T> IgnitePredicate<T> not(@Nullable final IgnitePredicate<? super T>... p) {
return isAlwaysFalse(p) ? F.<T>alwaysTrue() : isAlwaysTrue(p) ? F.<T>alwaysFalse() : new P1<T>() {
@Override public boolean apply(T t) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index f3f19bb..38ba8fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -322,8 +322,13 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
+ State state = this.state;
+
if (state == State.SEGMENTED)
- throw new IgniteException("Failed to send custom message: client is disconnected");
+ throw new IgniteException("Failed to send custom message: client is segmented.");
+
+ if (state == State.DISCONNECTED)
+ throw new IgniteException("Failed to send custom message: client is disconnected.");
try {
sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 1367c9f..ec043f8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -213,8 +213,9 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
/**
* @param e Client disconnected exception.
+ * @return Reconnect future.
*/
- protected void checkAndWait(CacheException e) {
+ protected IgniteFuture<?> check(CacheException e) {
log.info("Expected exception: " + e);
assertTrue("Unexpected cause: " + e.getCause(), e.getCause() instanceof IgniteClientDisconnectedException);
@@ -223,7 +224,14 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
assertNotNull(e0.reconnectFuture());
- e0.reconnectFuture().get();
+ return e0.reconnectFuture();
+ }
+
+ /**
+ * @param e Client disconnected exception.
+ */
+ protected void checkAndWait(CacheException e) {
+ check(e).get();
}
/**
@@ -266,7 +274,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
*/
protected static class BlockTpcCommunicationSpi extends TcpCommunicationSpi {
/** */
- volatile Class msgClass;
+ volatile Class msgCls;
/** */
AtomicBoolean collectStart = new AtomicBoolean(false);
@@ -280,13 +288,13 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
- Class msgClass0 = msgClass;
+ Class msgCls0 = msgCls;
if (collectStart.get() && msg instanceof GridIoMessage)
classes.put(((GridIoMessage)msg).message().getClass().getName(), node);
- if (msgClass0 != null && msg instanceof GridIoMessage
- && ((GridIoMessage)msg).message().getClass().equals(msgClass)) {
+ if (msgCls0 != null && msg instanceof GridIoMessage
+ && ((GridIoMessage)msg).message().getClass().equals(msgCls)) {
log.info("Block message: " + msg);
return;
@@ -298,15 +306,15 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
/**
* @param clazz Class of messages which will be block.
*/
- public void blockMsg(Class clazz) {
- msgClass = clazz;
+ public void blockMessage(Class clazz) {
+ msgCls = clazz;
}
/**
* Unlock all message.
*/
- public void unblockMsg() {
- msgClass = null;
+ public void unblockMessage() {
+ msgCls = null;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
deleted file mode 100644
index 624a2b0..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
+++ /dev/null
@@ -1,845 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.testframework.*;
-
-import javax.cache.*;
-import javax.cache.processor.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.events.EventType.*;
-
-/**
- *
- */
-public class IgniteClientReconnectApiBlockTest extends IgniteClientReconnectAbstractTest {
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setCacheConfiguration(new CacheConfiguration());
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected int serverCount() {
- return 1;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testErrorOnDisconnect() throws Exception {
- // Check cache operations.
- cacheOperationsTest();
-
- // Check cache operations.
- beforeTestsStarted();
- dataStructureOperationsTest();
-
- // Check ignite operations.
- beforeTestsStarted();
- igniteOperationsTest();
- }
-
- /**
- * @throws Exception If failed.
- */
- @SuppressWarnings("unchecked")
- public void dataStructureOperationsTest() throws Exception {
- clientMode = true;
-
- final Ignite client = startGrid(serverCount());
-
- doTestIgniteOperationOnDisconnect(client, Arrays.asList(
- // Check atomic long.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- client.atomicLong("testAtomic", 41, true);
- }
- catch (IgniteClientDisconnectedException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- return client.atomicLong("testAtomic", 41, true);
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- assertNotNull(o);
-
- IgniteAtomicLong atomicLong = (IgniteAtomicLong)o;
-
- assertEquals(42, atomicLong.incrementAndGet());
-
- return true;
- }
- }
- ),
- // Check set.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- client.set("testSet", new CollectionConfiguration());
- }
- catch (IgniteClientDisconnectedException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- return client.set("testSet", new CollectionConfiguration());
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- assertNotNull(o);
-
- IgniteSet set = (IgniteSet)o;
-
- String val = "testVal";
-
- set.add(val);
-
- assertEquals(1, set.size());
- assertTrue(set.contains(val));
-
- return true;
- }
- }
- ),
- // Check ignite queue.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- client.queue("TestQueue", 10, new CollectionConfiguration());
- }
- catch (IgniteClientDisconnectedException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- return client.queue("TestQueue", 10, new CollectionConfiguration());
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- assertNotNull(o);
-
- IgniteQueue queue = (IgniteQueue)o;
-
- String val = "Test";
-
- queue.add(val);
-
- assertEquals(val, queue.poll());
-
- return true;
- }
- }
- )
- ));
-
- clientMode = false;
- }
-
- /**
- * @throws Exception If failed.
- */
- @SuppressWarnings("unchecked")
- public void cacheOperationsTest() throws Exception {
- clientMode = true;
-
- final Ignite client = startGrid(serverCount());
-
- final IgniteCache<Object, Object> defaultCache = client.cache(null);
-
- assertNotNull(defaultCache);
-
- doTestIgniteOperationOnDisconnect(client, Arrays.asList(
- // Check put and get operation.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- defaultCache.getAndPut(9999, 9999);
- }
- catch (CacheException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- return defaultCache.getAndPut(9999, 9999);
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- assertNull(o);
-
- assertEquals(9999, defaultCache.get(9999));
-
- return true;
- }
- }
- ),
- // Check put operation.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- defaultCache.put(10000, 10000);
- }
- catch (CacheException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- defaultCache.put(10000, 10000);
-
- return true;
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- assertTrue((Boolean)o);
-
- assertEquals(10000, defaultCache.get(10000));
-
- return true;
- }
- }
- ),
- // Check get operation.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- defaultCache.get(10001);
- }
- catch (CacheException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- return defaultCache.get(10001);
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- assertNull(o);
-
- return true;
- }
- }
- ),
- // Check invoke operation.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- defaultCache.invoke(10000, new CacheEntryProcessor<Object, Object, Object>() {
- @Override public Object process(MutableEntry<Object, Object> entry,
- Object... arguments) throws EntryProcessorException {
- assertTrue(entry.exists());
-
- return (int)entry.getValue() * 2;
- }
- });
- }
- catch (CacheException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- return defaultCache.invoke(10000, new CacheEntryProcessor<Object, Object, Object>() {
- @Override public Object process(MutableEntry<Object, Object> entry,
- Object... arguments) throws EntryProcessorException {
- assertTrue(entry.exists());
-
- return (int)entry.getValue() * 2;
- }
- });
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- assertNotNull(o);
-
- assertEquals(20000, (int)o);
-
- return true;
- }
- }
- ),
- // Check put async operation.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- IgniteCache<Object, Object> async = defaultCache.withAsync();
-
- boolean failed = false;
-
- try {
- async.put(10002, 10002);
-
- async.future().get();
- }
- catch (CacheException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- async.put(10002, 10002);
-
- return async.future().get();
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- assertNull(o);
-
- assertEquals(10002, defaultCache.get(10002));
-
- return true;
- }
- }
- ),
- // Check transaction.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- client.transactions();
- }
- catch (IgniteClientDisconnectedException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- return client.transactions();
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- IgniteTransactions txs = (IgniteTransactions)o;
-
- assertNotNull(txs);
-
- return true;
- }
- }
- ),
- // Check get cache.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- client.cache(null);
- }
- catch (IgniteClientDisconnectedException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- return client.cache(null);
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- IgniteCache<Object, Object> cache0 = (IgniteCache<Object, Object>)o;
-
- assertNotNull(cache0);
-
- cache0.put(1, 1);
-
- assertEquals(1, cache0.get(1));
-
- return true;
- }
- }
- ),
- // Check streamer.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- client.dataStreamer(null);
- }
- catch (IgniteClientDisconnectedException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- return client.dataStreamer(null);
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- IgniteDataStreamer<Object, Object> streamer = (IgniteDataStreamer<Object, Object>)o;
-
- streamer.addData(2, 2);
-
- streamer.close();
-
- assertEquals(2, client.cache(null).get(2));
-
- return true;
- }
- }
- ),
- // Check create cache.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- client.createCache("test_cache");
- }
- catch (IgniteClientDisconnectedException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- return client.createCache("test_cache");
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- IgniteCache<Object, Object> cache = (IgniteCache<Object, Object>)o;
-
- assertNotNull(cache);
-
- cache.put(1, 1);
-
- assertEquals(1, cache.get(1));
-
- return true;
- }
- }
- )
-
- ));
-
- clientMode = false;
- }
-
- /**
- * @throws Exception If failed.
- */
- @SuppressWarnings("unchecked")
- public void igniteOperationsTest() throws Exception {
- clientMode = true;
-
- final Ignite client = startGrid(serverCount());
-
- final IgniteCache<Object, Object> dfltCache = client.cache(null);
-
- final CountDownLatch recvLatch = new CountDownLatch(1);
-
- assertNotNull(dfltCache);
-
- doTestIgniteOperationOnDisconnect(client, Arrays.asList(
- // Check compute.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- client.compute();
- }
- catch (IgniteClientDisconnectedException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- return client.compute();
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- IgniteCompute comp = (IgniteCompute)o;
-
- Collection<UUID> uuids = comp.broadcast(new IgniteCallable<UUID>() {
- @IgniteInstanceResource
- private Ignite ignite;
-
- @Override public UUID call() throws Exception {
- return ignite.cluster().localNode().id();
- }
- });
-
- assertFalse(uuids.isEmpty());
-
- for (UUID uuid : uuids)
- assertNotNull(uuid);
-
- return true;
- }
- }
- ),
-
- // Check ping node.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- client.cluster().pingNode(new UUID(0, 0));
- }
- catch (IgniteClientDisconnectedException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- return client.cluster().pingNode(new UUID(0, 0));
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- Boolean pingNode = (Boolean)o;
-
- assertFalse(pingNode);
-
- return true;
- }
- }
- ),
- // Check register remote listener.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- client.events().remoteListen(null, new IgnitePredicate<Event>() {
- @Override public boolean apply(Event event) {
- return true;
- }
- });
- }
- catch (IgniteClientDisconnectedException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- return client.events().remoteListen(null, new IgnitePredicate<Event>() {
- @Override public boolean apply(Event event) {
- return true;
- }
- });
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- UUID remoteId = (UUID)o;
-
- assertNotNull(remoteId);
-
- client.events().stopRemoteListen(remoteId);
-
- return true;
- }
- }
- ),
- // Check message operation.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- client.message().remoteListen(null, new IgniteBiPredicate<UUID, Object>() {
- @Override public boolean apply(UUID uuid, Object o) {
- if (o.equals("Test message."))
- recvLatch.countDown();
-
- return true;
- }
- });
- }
- catch (IgniteClientDisconnectedException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- return client.message().remoteListen(null, new IgniteBiPredicate<UUID, Object>() {
- @Override public boolean apply(UUID uuid, Object o) {
- if (o.equals("Test message."))
- recvLatch.countDown();
-
- return true;
- }
- });
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- assertNotNull(o);
-
- IgniteMessaging msg = client.message();
-
- msg.send(null, "Test message.");
-
- try {
- assertTrue(recvLatch.await(2, TimeUnit.SECONDS));
- }
- catch (InterruptedException e) {
- fail("Message wasn't received.");
- }
-
- return true;
- }
- }
- ),
- // Check executor.
- new T2<Callable, C1<Object, Boolean>>(
- new Callable() {
- @Override public Object call() throws Exception {
- boolean failed = false;
-
- try {
- client.executorService().submit(new Callable<Integer>() {
- @Override public Integer call() throws Exception {
- return 42;
- }
- });
- }
- catch (IgniteClientDisconnectedException e) {
- failed = true;
-
- checkAndWait(e);
- }
-
- assertTrue(failed);
-
- return client.executorService().submit(new Callable<Integer>() {
- @Override public Integer call() throws Exception {
- return 42;
- }
- });
- }
- },
- new C1<Object, Boolean>() {
- @Override public Boolean apply(Object o) {
- assertNotNull(o);
-
- Future<Integer> fut = (Future<Integer>)o;
-
- try {
- assertEquals(42, (int)fut.get());
- }
- catch (Exception e) {
- fail("Failed submit task.");
- }
-
- return true;
- }
- }
- )
- ));
-
- clientMode = false;
- }
-
- /**
- * @param client Client.
- * @param ops Operations closures.
- * @throws Exception If failed.
- */
- @SuppressWarnings("unchecked")
- private void doTestIgniteOperationOnDisconnect(Ignite client, final List<T2<Callable, C1<Object, Boolean>>> ops)
- throws Exception {
- assertNotNull(client.cache(null));
-
- final TestTcpDiscoverySpi clientSpi = spi(client);
-
- Ignite srv = clientRouter(client);
-
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
-
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
- log.info("Block reconnect.");
-
- clientSpi.writeLatch = new CountDownLatch(1);
-
- final List<IgniteInternalFuture> futs = new ArrayList<>();
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- assertEquals(1, reconnectLatch.getCount());
-
- for (T2<Callable, C1<Object, Boolean>> op : ops)
- futs.add(GridTestUtils.runAsync(op.get1()));
-
- disconnectLatch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- reconnectLatch.countDown();
- }
-
- return true;
- }
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- try {
- log.info("Fail client.");
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, TimeUnit.MILLISECONDS));
-
- assertEquals(ops.size(), futs.size());
-
- for (IgniteInternalFuture<?> fut : futs)
- assertNotDone(fut);
-
- U.sleep(2000);
-
- for (IgniteInternalFuture<?> fut : futs)
- assertNotDone(fut);
-
- log.info("Allow reconnect.");
-
- clientSpi.writeLatch.countDown();
-
- assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS));
-
- // Check operation after reconnect working.
- for (int i = 0; i < futs.size(); i++) {
- final int i0 = i;
-
- try {
- final Object furRes = futs.get(i0).get(2, TimeUnit.SECONDS);
-
- assertTrue(GridTestUtils.runAsync(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return ops.get(i0).get2().apply(furRes);
- }
- }).get(2, TimeUnit.SECONDS));
- }
- catch (IgniteFutureTimeoutCheckedException e) {
- e.printStackTrace();
-
- fail("Operation timeout. Iteration: " + i + ".");
- }
- }
- }
- finally {
- clientSpi.writeLatch.countDown();
-
- for (IgniteInternalFuture fut : futs)
- fut.cancel();
-
- stopAllGrids();
- }
- }
-}