You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/02 16:38:46 UTC
incubator-ignite git commit: # ignite-901 client reconnect WIP
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-901 f5f3efd16 -> ff3d61fcc
# ignite-901 client reconnect WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ff3d61fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ff3d61fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ff3d61fc
Branch: refs/heads/ignite-901
Commit: ff3d61fcc6ac310c247e2a5c91d91495f3f66a04
Parents: f5f3efd
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 2 13:18:25 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 2 17:38:41 2015 +0300
----------------------------------------------------------------------
.../IgniteClientDisconnectedException.java | 33 ++
.../apache/ignite/internal/GridComponent.java | 1 +
...gniteClientDisconnectedCheckedException.java | 32 ++
.../IgniteDisconnectedCheckedException.java | 32 --
.../apache/ignite/internal/IgniteKernal.java | 65 ----
.../processors/cache/GridCacheMvccManager.java | 2 +-
.../GridCachePartitionExchangeManager.java | 6 +-
.../continuous/GridContinuousHandler.java | 9 +-
.../continuous/GridContinuousProcessor.java | 23 +-
.../datastructures/DataStructuresProcessor.java | 7 +-
.../GridCacheCountDownLatchImpl.java | 49 ++-
.../service/GridServiceProcessor.java | 8 +-
.../processors/service/GridServiceProxy.java | 10 +-
.../ignite/internal/util/IgniteUtils.java | 6 +
.../spi/discovery/DiscoverySpiDataExchange.java | 3 +-
.../IgniteClientReconnectAbstractTest.java | 19 +-
.../IgniteClientReconnectAtomicsTest.java | 315 +++++++++++++++++
.../IgniteClientReconnectCacheTest.java | 9 +-
.../IgniteClientReconnectCollectionsTest.java | 132 +++++++
.../IgniteClientReconnectComputeTest.java | 35 ++
...eClientReconnectContinuousProcessorTest.java | 350 +++++++++++++++++++
.../IgniteClientReconnectTestSuite.java | 4 +
22 files changed, 1010 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
new file mode 100644
index 0000000..9500ac2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import org.jetbrains.annotations.*;
+
+/**
+ *
+ */
+public class IgniteClientDisconnectedException extends IgniteException {
+ /**
+ * @param msg Error message.
+ * @param cause Optional nested exception (can be {@code null}).
+ */
+ public IgniteClientDisconnectedException(String msg, @Nullable Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 5b3b0c3..7f0f1b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -87,6 +87,7 @@ public interface GridComponent {
/**
* Receives discovery data object from remote nodes (called
* on new node during discovery process).
+ *
* @param joiningNodeId Joining node ID.
* @param rmtNodeId Remote node ID for which data is provided.
* @param data Discovery data object or {@code null} if nothing was
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java
new file mode 100644
index 0000000..2e999f4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+
+/**
+ *
+ */
+public class IgniteClientDisconnectedCheckedException extends IgniteCheckedException {
+ /**
+ * @param msg Message.
+ */
+ public IgniteClientDisconnectedCheckedException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java
deleted file mode 100644
index 0684356..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal;
-
-import org.apache.ignite.*;
-
-/**
- *
- */
-public class IgniteDisconnectedCheckedException extends IgniteCheckedException {
- /**
- * @param msg Message.
- */
- public IgniteDisconnectedCheckedException(String msg) {
- super(msg);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 821a1f5..c04f327 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2816,71 +2816,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
}
- private void stopOnDisconnect() {
- GridCacheProcessor cacheProcessor = ctx.cache();
-
- List<GridComponent> comps = ctx.components();
-
- // Callback component in reverse order while kernal is still functional
- // if called in the same thread, at least.
- for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) {
- GridComponent comp = it.previous();
-
- try {
- if (!skipDaemon(comp) && (!(comp instanceof GridManager)))
- comp.onKernalStop(true);
- }
- catch (Throwable e) {
- errOnStop = true;
-
- U.error(log, "Failed to pre-stop processor: " + comp, e);
-
- if (e instanceof Error)
- throw e;
- }
- }
-
- if (cacheProcessor != null)
- cacheProcessor.cancelUserOperations();
-
- for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) {
- GridComponent comp = it.previous();
-
- try {
- if (!skipDaemon(comp) && (!(comp instanceof GridManager))) {
- comp.stop(true);
-
- if (log.isDebugEnabled())
- log.debug("Component stopped: " + comp);
- }
- }
- catch (Throwable e) {
- errOnStop = true;
-
- U.error(log, "Failed to stop component (ignoring): " + comp, e);
-
- if (e instanceof Error)
- throw (Error)e;
- }
- }
-
- ctx.marshallerContext().onDisconnected();
- }
-
- private void restart() throws IgniteCheckedException {
- List<PluginProvider> plugins = U.allPluginProviders();
-
- startProcessor(new ClusterProcessor(ctx));
-
- GridResourceProcessor rsrcProc = new GridResourceProcessor(ctx);
-
- rsrcProc.setSpringContext(rsrcCtx);
-
- scheduler = new IgniteSchedulerImpl(ctx);
-
- startProcessor(rsrcProc);
- }
-
/**
*
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index e2d22dd..9c89041 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -299,7 +299,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
public void cancelClientFutures(boolean stop) {
IgniteCheckedException e = stop ?
new IgniteCheckedException("Operation has been cancelled (node is stopping).") :
- new IgniteCheckedException("Operation has been cancelled (node disconnected).");
+ new IgniteClientDisconnectedCheckedException("Operation has been cancelled (node disconnected).");
for (Collection<GridCacheFuture<?>> futures : futs.values()) {
for (GridCacheFuture<?> future : futures)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f0c9b3b..3f06e8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -286,7 +286,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
break;
}
- catch (IgniteDisconnectedCheckedException e) {
+ catch (IgniteClientDisconnectedCheckedException e) {
log.info("Disconnected while waiting for initial partition map exchange: " + e);
break;
@@ -331,7 +331,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class);
IgniteCheckedException err = disconnected ?
- new IgniteDisconnectedCheckedException("Node disconnected: " + cctx.gridName()) :
+ new IgniteClientDisconnectedCheckedException("Node disconnected: " + cctx.gridName()) :
new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName());
// Finish all exchange futures.
@@ -1119,7 +1119,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
catch (IgniteInterruptedCheckedException e) {
throw e;
}
- catch (IgniteDisconnectedCheckedException e) {
+ catch (IgniteClientDisconnectedCheckedException e) {
return;
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index ce9b7c0..79020da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -33,7 +33,14 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
* Listener registration status.
*/
public enum RegisterStatus {
- REGISTERED, NOT_REGISTERED, DELAYED
+ /** */
+ REGISTERED,
+
+ /** */
+ NOT_REGISTERED,
+
+ /** */
+ DELAYED
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index dd04bf4..7508acd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -318,11 +318,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
- if (!nodeId.equals(ctx.localNodeId())) {
+ if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos);
- // Collect listeners information (will be sent to
- // joining node during discovery process).
+ // Collect listeners information (will be sent to joining node during discovery process).
for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) {
UUID routineId = e.getKey();
LocalRoutineInfo info = e.getValue();
@@ -333,8 +332,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
return data;
}
- else
- return null;
+
+ return null;
}
/** {@inheritDoc} */
@@ -377,6 +376,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* Callback invoked when cache is started.
*
* @param ctx Cache context.
+ * @throws IgniteCheckedException If failed.
*/
public void onCacheStart(GridCacheContext ctx) throws IgniteCheckedException {
for (Map.Entry<UUID, RemoteRoutineInfo> entry : rmtInfos.entrySet()) {
@@ -630,6 +630,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected() throws IgniteCheckedException {
+ for (UUID rmtId : rmtInfos.keySet())
+ unregisterRemote(rmtId);
+
+ rmtInfos.clear();
+
+ clientInfos.clear();
+ }
+
/**
* @param nodeId Node ID.
* @param routineId Routine ID.
@@ -637,6 +647,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param toSnd Notification object to send.
* @param orderedTopic Topic for ordered notifications.
* If {@code null}, non-ordered message will be sent.
+ * @param msg If {@code true} then sent data is collection of messages.
* @throws IgniteCheckedException In case of error.
*/
private void sendNotification(UUID nodeId,
@@ -1221,6 +1232,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
@GridToStringInclude
private Collection<DiscoveryDataItem> items;
+ /** */
private Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos;
/**
@@ -1232,6 +1244,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/**
* @param nodeId Node ID.
+ * @param clientInfos Client information.
*/
DiscoveryData(UUID nodeId, Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) {
assert nodeId != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index e6335b6..d4f67fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -1001,8 +1001,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsView.put(key, val);
}
- latch = new GridCacheCountDownLatchImpl(name, val.get(), val.initialCount(),
- val.autoDelete(), key, cntDownLatchView, dsCacheCtx);
+ latch = new GridCacheCountDownLatchImpl(name, val.initialCount(),
+ val.autoDelete(),
+ key,
+ cntDownLatchView,
+ dsCacheCtx);
dsMap.put(key, latch);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 33547d9..6a0f5af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -66,9 +66,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** Cache context. */
private GridCacheContext ctx;
- /** Current count. */
- private int cnt;
-
/** Initial count. */
private int initCnt;
@@ -95,7 +92,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
* Constructor.
*
* @param name Latch name.
- * @param cnt Current count.
* @param initCnt Initial count.
* @param autoDel Auto delete flag.
* @param key Latch key.
@@ -103,7 +99,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
* @param ctx Cache context.
*/
public GridCacheCountDownLatchImpl(String name,
- int cnt,
int initCnt,
boolean autoDel,
GridCacheInternalKey key,
@@ -111,14 +106,12 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
GridCacheContext ctx)
{
assert name != null;
- assert cnt >= 0;
assert initCnt >= 0;
assert key != null;
assert latchView != null;
assert ctx != null;
this.name = name;
- this.cnt = cnt;
this.initCnt = initCnt;
this.autoDel = autoDel;
this.key = key;
@@ -135,7 +128,12 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** {@inheritDoc} */
@Override public int count() {
- return cnt;
+ try {
+ return CU.outTx(new GetCountCallable(), ctx);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
}
/** {@inheritDoc} */
@@ -211,8 +209,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** {@inheritDoc} */
@Override public boolean onRemoved() {
- assert cnt == 0;
-
return rmvd = true;
}
@@ -235,8 +231,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
@Override public void onUpdate(int cnt) {
assert cnt >= 0;
- this.cnt = cnt;
-
while (internalLatch != null && internalLatch.getCount() > cnt)
internalLatch.countDown();
}
@@ -257,9 +251,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
if (log.isDebugEnabled())
log.debug("Failed to find count down latch with given name: " + name);
- assert cnt == 0;
-
- return new CountDownLatch(cnt);
+ return new CountDownLatch(0);
}
tx.commit();
@@ -342,6 +334,29 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/**
*
*/
+ private class GetCountCallable implements Callable<Integer> {
+ /** {@inheritDoc} */
+ @Override public Integer call() throws Exception {
+ Integer val;
+
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheCountDownLatchValue latchVal = latchView.get(key);
+
+ if (latchVal == null)
+ throw new IgniteCheckedException("Failed to find count down latch with given name: " + name);
+
+ val = latchVal.get();
+
+ tx.rollback();
+ }
+
+ return val;
+ }
+ }
+
+ /**
+ *
+ */
private class CountDownCallable implements Callable<Integer> {
/** Value to count down on (if 0 then latch is counted down to 0). */
private final int val;
@@ -364,9 +379,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
if (log.isDebugEnabled())
log.debug("Failed to find count down latch with given name: " + name);
- assert cnt == 0;
-
- return cnt;
+ return 0;
}
int retVal;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index bb451c7..89b2a31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -124,7 +124,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
cache = ctx.cache().utilityCache();
- ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY);
+ if (!ctx.clientNode())
+ ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY);
try {
if (ctx.deploy().enabled())
@@ -165,7 +166,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
busyLock.block();
- ctx.event().removeLocalEventListener(topLsnr);
+ if (!ctx.clientNode())
+ ctx.event().removeLocalEventListener(topLsnr);
if (cfgQryId != null)
cache.context().continuousQueries().cancelInternalQuery(cfgQryId);
@@ -1164,7 +1166,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
}
catch (IgniteCheckedException ex) {
- log.error("Failed to clean up zombie assignments for service: " + name, ex);
+ U.error(log, "Failed to clean up zombie assignments for service: " + name, ex);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
index 8e13bc4..67ddc6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
@@ -68,9 +68,15 @@ class GridServiceProxy<T> implements Serializable {
* @param name Service name.
* @param svc Service type class.
* @param sticky Whether multi-node request should be done.
+ * @param ctx Context.
*/
- @SuppressWarnings("unchecked") GridServiceProxy(ClusterGroup prj, String name, Class<? super T> svc,
- boolean sticky, GridKernalContext ctx) {
+ @SuppressWarnings("unchecked")
+ GridServiceProxy(ClusterGroup prj,
+ String name,
+ Class<? super T> svc,
+ boolean sticky,
+ GridKernalContext ctx)
+ {
this.prj = prj;
this.ctx = ctx;
hasLocNode = hasLocalNode(prj);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index f457d6c..7516f79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -626,6 +626,12 @@ public abstract class IgniteUtils {
}
});
+ m.put(IgniteClientDisconnectedCheckedException.class, new C1<IgniteCheckedException, IgniteException>() {
+ @Override public IgniteException apply(IgniteCheckedException e) {
+ return new IgniteClientDisconnectedException(e.getMessage(), e);
+ }
+ });
+
return m;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
index 46d6716..038ea59 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
@@ -39,7 +39,8 @@ public interface DiscoverySpiDataExchange {
/**
* Notifies discovery manager about data received from remote node.
*
- * @param joiningNodeId Remote node ID.
+ * @param joiningNodeId ID of new node that joins topology.
+ * @param nodeId ID of the node provided data.
* @param data Collection of discovery data objects from different components.
*/
public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 0512074..23b8a15 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -65,6 +65,13 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
protected abstract int serverCount();
/**
+ * @return Number of client nodes started before tests.
+ */
+ protected int clientCount() {
+ return 0;
+ }
+
+ /**
* @param ignite Node.
* @return Discovery SPI.
*/
@@ -79,7 +86,17 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
int srvs = serverCount();
if (srvs > 0)
- startGrids(srvs);
+ startGridsMultiThreaded(srvs);
+
+ int clients = clientCount();
+
+ if (clients > 0) {
+ clientMode = true;
+
+ startGridsMultiThreaded(srvs, clients);
+
+ clientMode = false;
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
new file mode 100644
index 0000000..bbb7eef
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * TODO IGNITE-901: test AtomicReference, AtomicStamped, usage after remove, test API block, fail current call on disconnect.
+ */
+public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int clientCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicLongReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLong", 0, true);
+
+ assertEquals(0L, clientAtomicLong.getAndAdd(1));
+
+ IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLong", 0, false);
+
+ assertEquals(1L, srvAtomicLong.getAndAdd(1));
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ assertEquals(2L, srvAtomicLong.getAndAdd(1));
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ assertEquals(3L, clientAtomicLong.getAndAdd(1));
+
+ assertEquals(4L, srvAtomicLong.getAndAdd(1));
+
+ assertEquals(5L, clientAtomicLong.getAndAdd(1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicLongReconnectRemoved() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongRmv", 0, true);
+
+ assertEquals(0L, clientAtomicLong.getAndAdd(1));
+
+ IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongRmv", 0, false);
+
+ assertEquals(1L, srvAtomicLong.getAndAdd(1));
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ srvAtomicLong.close();
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientAtomicLong.getAndAdd(1);
+
+ return null;
+ }
+ }, IgniteException.class, null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLatchReconnect1() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ IgniteCountDownLatch clientLatch = client.countDownLatch("latch1", 3, false, true);
+
+ assertEquals(3, clientLatch.count());
+
+ IgniteCountDownLatch srvLatch = srv.countDownLatch("latch1", 3, false, false);
+
+ assertEquals(3, srvLatch.count());
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override
+ public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ srvLatch.countDown();
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ assertEquals(2, srvLatch.count());
+ assertEquals(2, clientLatch.count());
+
+ srvLatch.countDown();
+
+ assertEquals(1, srvLatch.count());
+ assertEquals(1, clientLatch.count());
+
+ clientLatch.countDown();
+
+ assertEquals(0, srvLatch.count());
+ assertEquals(0, clientLatch.count());
+
+ assertTrue(srvLatch.await(1000));
+ assertTrue(clientLatch.await(1000));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void _testLatchReconnect2() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final IgniteCountDownLatch clientLatch = client.countDownLatch("latch2", 1, false, true);
+
+ IgniteCountDownLatch srvLatch = srv.countDownLatch("latch2", 1, false, false);
+
+ assertFalse(clientLatch.await(100));
+
+ IgniteInternalFuture<Boolean> waitFut = GridTestUtils.runAsync(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ return clientLatch.await(60_000, MILLISECONDS);
+ }
+ });
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ srvLatch.countDown();
+
+ assertNotDone(waitFut);
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ assertTrue(waitFut.get(5000));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 5687010..452f808 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -129,8 +129,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
assertEquals(1, reconnectLatch.getCount());
blockPutRef.set(GridTestUtils.runAsync(new Callable() {
- @Override
- public Object call() throws Exception {
+ @Override public Object call() throws Exception {
log.info("Start put.");
cache.put(2, 2);
@@ -218,8 +217,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
final CountDownLatch reconnectLatch = new CountDownLatch(1);
client.events().localListen(new IgnitePredicate<Event>() {
- @Override
- public boolean apply(Event evt) {
+ @Override public boolean apply(Event evt) {
if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
info("Reconnected: " + evt);
@@ -305,8 +303,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
}, new int[]{EVT_CLIENT_NODE_RECONNECTED});
IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
- @Override
- public Ignite call() throws Exception {
+ @Override public Ignite call() throws Exception {
try {
return startGrid(SRV_CNT);
} catch (Throwable e) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
new file mode 100644
index 0000000..fcb74cd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * TODO IGNITE-901: test for queue, check set/queue usage after remove, test API block, fail current call on disconnect.
+ */
+public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int clientCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSetReconnect() throws Exception {
+ CollectionConfiguration colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(TRANSACTIONAL);
+
+ setReconnect(colCfg);
+
+ colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(ATOMIC);
+
+ setReconnect(colCfg);
+ }
+
+ /**
+ * @param colCfg Collection configuration.
+ * @throws Exception If failed.
+ */
+ private void setReconnect(CollectionConfiguration colCfg) throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final String setName = "set-" + colCfg.getAtomicityMode();
+
+ IgniteSet<String> clientSet = client.set(setName, colCfg);
+
+ IgniteSet<String> srvSet = srv.set(setName, null);
+
+ assertTrue(clientSet.add("1"));
+
+ assertFalse(srvSet.add("1"));
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ assertTrue(srvSet.add("2"));
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ assertFalse(clientSet.add("2"));
+
+ assertTrue(clientSet.remove("2"));
+
+ assertFalse(srvSet.contains("2"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
new file mode 100644
index 0000000..01eb2ca
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+/**
+ *
+ */
+public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectOperationInProgress() throws Exception {
+ // TODO IGNITE-901.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
new file mode 100644
index 0000000..bf0130b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+
+import javax.cache.event.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientReconnectAbstractTest {
+ /** */
+ private static volatile CountDownLatch latch;
+
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int clientCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEventListenerReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ EventListener lsnr = new EventListener();
+
+ UUID opId = client.events().remoteListen(lsnr, null, EventType.EVT_JOB_STARTED);
+
+ lsnr.latch = new CountDownLatch(1);
+
+ log.info("Created remote listener: " + opId);
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ client.compute().run(new DummyJob());
+
+ assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+
+ lsnr.latch = new CountDownLatch(1);
+
+ srv.compute().run(new DummyJob());
+
+ assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+
+ lsnr.latch = new CountDownLatch(1);
+
+ log.info("Stop listen, should not get events anymore.");
+
+ client.events().stopRemoteListen(opId);
+
+ assertFalse(lsnr.latch.await(3000, MILLISECONDS));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMessageListenerReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final String topic = "testTopic";
+
+ MessageListener locLsnr = new MessageListener();
+
+ UUID opId = client.message().remoteListen(topic, new RemoteMessageListener());
+
+ client.message().localListen(topic, locLsnr);
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ locLsnr.latch = new CountDownLatch(1);
+ latch = new CountDownLatch(2);
+
+ client.message().send(topic, "msg1");
+
+ assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+ assertTrue(latch.await(5000, MILLISECONDS));
+
+ locLsnr.latch = new CountDownLatch(1);
+ latch = new CountDownLatch(2);
+
+ srv.message().send(topic, "msg2");
+
+ assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+ assertTrue(latch.await(5000, MILLISECONDS));
+
+ log.info("Stop listen, should not get remote messages anymore.");
+
+ client.message().stopRemoteListen(opId);
+
+ srv.message().send(topic, "msg3");
+
+ locLsnr.latch = new CountDownLatch(1);
+ latch = new CountDownLatch(1);
+
+ assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+ assertFalse(latch.await(3000, MILLISECONDS));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheContinuousQueryReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+ CacheEventListener lsnr = new CacheEventListener();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ QueryCursor<?> cur = clientCache.query(qry);
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ lsnr.latch = new CountDownLatch(1);
+
+ clientCache.put(1, 1);
+
+ assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+
+ lsnr.latch = new CountDownLatch(1);
+
+ srv.cache(null).put(2, 2);
+
+ assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+
+ log.info("Close cursor, should not get cache events anymore.");
+
+ cur.close();
+
+ lsnr.latch = new CountDownLatch(1);
+
+ clientCache.put(3, 3);
+
+ assertFalse(lsnr.latch.await(3000, MILLISECONDS));
+ }
+
+ // TODO IGNITE-901 test operations in progress are cancelled.
+
+ /**
+ *
+ */
+ private static class EventListener implements P2<UUID, Event> {
+ /** */
+ private volatile CountDownLatch latch;
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, Event evt) {
+ assertTrue(ignite.cluster().localNode().isClient());
+
+ ignite.log().info("Received event: " + evt);
+
+ if (latch != null)
+ latch.countDown();
+
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class MessageListener implements P2<UUID, Object> {
+ /** */
+ private volatile CountDownLatch latch;
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, Object msg) {
+ assertTrue(ignite.cluster().localNode().isClient());
+
+ ignite.log().info("Local listener received message: " + msg);
+
+ if (latch != null)
+ latch.countDown();
+
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoteMessageListener implements P2<UUID, Object> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, Object msg) {
+ ignite.log().info("Remote listener received message: " + msg);
+
+ if (latch != null)
+ latch.countDown();
+
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
+ /** */
+ private volatile CountDownLatch latch;
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ int cnt = 0;
+
+ for (CacheEntryEvent<?, ?> evt : evts) {
+ ignite.log().info("Received cache event: " + evt);
+
+ cnt++;
+ }
+
+ assertEquals(1, cnt);
+
+ if (latch != null)
+ latch.countDown();
+ }
+ }
+
+ /**
+ *
+ */
+ static class DummyJob implements IgniteRunnable {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ ignite.log().info("Job run.");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
index 7533a2c..0f5b3ee 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -34,6 +34,10 @@ public class IgniteClientReconnectTestSuite extends TestSuite {
suite.addTestSuite(IgniteClientReconnectApiBlockTest.class);
suite.addTestSuite(IgniteClientReconnectDiscoveryStateTest.class);
suite.addTestSuite(IgniteClientReconnectCacheTest.class);
+ suite.addTestSuite(IgniteClientReconnectContinuousProcessorTest.class);
+ suite.addTestSuite(IgniteClientReconnectComputeTest.class);
+ suite.addTestSuite(IgniteClientReconnectAtomicsTest.class);
+ suite.addTestSuite(IgniteClientReconnectCollectionsTest.class);
return suite;
}