You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/09 17:15:59 UTC
[50/50] incubator-ignite git commit: # ignite-901
# ignite-901
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fa007b1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fa007b1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fa007b1f
Branch: refs/heads/ignite-901
Commit: fa007b1f5e6d9a60827eef0d979bbd2959fb7cc3
Parents: e73e496
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 9 09:59:59 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 9 17:53:41 2015 +0300
----------------------------------------------------------------------
.../internal/managers/GridManagerAdapter.java | 6 +-
.../discovery/GridDiscoveryManager.java | 41 ++--
.../processors/cache/GridCacheProcessor.java | 10 +-
.../cache/GridCacheSharedContext.java | 24 +-
.../GridDhtPartitionsExchangeFuture.java | 4 -
.../java/org/apache/ignite/spi/IgniteSpi.java | 15 ++
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 13 +
.../ignite/spi/discovery/tcp/ClientImpl.java | 77 ++++--
.../tcp/internal/TcpDiscoveryNode.java | 15 +-
.../GridDeploymentManagerStopSelfTest.java | 11 +
.../loadtests/hashmap/GridCacheTestContext.java | 4 +-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 236 ++++++++++++++++++-
.../query/h2/twostep/GridMergeIndex.java | 29 +--
.../h2/twostep/GridReduceQueryExecutor.java | 3 +
14 files changed, 401 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 9faa056..298ff24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -168,12 +168,14 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
- // No-op.
+ for (T t : spis)
+ t.onClientDisconnected(reconnectFut);
}
/** {@inheritDoc} */
@Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
- // No-op.
+ for (T t : spis)
+ t.onClientReconnected(clusterRestarted);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index c0d9f13..986a995 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -491,10 +491,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
assert node.isClient() : node;
((IgniteKernal)ctx.grid()).onDisconnected();
-
- recordEvent(type, topVer, node, topSnapshot);
-
- return;
}
else if (type == EVT_CLIENT_NODE_RECONNECTED) {
assert locNode.isClient() : locNode;
@@ -506,16 +502,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted);
- recordEvent(type, topVer, node, topSnapshot);
-
ctx.gateway().onReconnected();
-
- if (log.isInfoEnabled())
- log.info("Client node reconnected to cluster: " + node);
-
- ackTopology(topVer, true);
-
- return;
}
discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg);
@@ -989,7 +976,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
Collection<ClusterNode> rmtNodes = discoCache.remoteNodes();
- Collection<ClusterNode> serverNodes = F.view(discoCache.allNodes(), F.not(clientFilter));
+ Collection<ClusterNode> srvNodes = F.view(discoCache.allNodes(), F.not(clientFilter));
Collection<ClusterNode> clientNodes = F.view(discoCache.allNodes(), clientFilter);
@@ -1009,7 +996,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
double heap = U.heapSize(allNodes, 2);
if (log.isQuiet())
- U.quiet(false, topologySnapshotMessage(serverNodes.size(), clientNodes.size(), totalCpus, heap));
+ U.quiet(false, topologySnapshotMessage(srvNodes.size(), clientNodes.size(), totalCpus, heap));
if (log.isDebugEnabled()) {
String dbg = "";
@@ -1019,7 +1006,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
">>> " + PREFIX + "." + U.nl() +
">>> +----------------+" + U.nl() +
">>> Grid name: " + (ctx.gridName() == null ? "default" : ctx.gridName()) + U.nl() +
- ">>> Number of server nodes: " + serverNodes.size() + U.nl() +
+ ">>> Number of server nodes: " + srvNodes.size() + U.nl() +
">>> Number of client nodes: " + clientNodes.size() + U.nl() +
(discoOrdered ? ">>> Topology version: " + topVer + U.nl() : "") +
">>> Topology hash: 0x" + Long.toHexString(hash).toUpperCase() + U.nl();
@@ -1053,7 +1040,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
log.debug(dbg);
}
else if (log.isInfoEnabled())
- log.info(topologySnapshotMessage(serverNodes.size(), clientNodes.size(), totalCpus, heap));
+ log.info(topologySnapshotMessage(srvNodes.size(), clientNodes.size(), totalCpus, heap));
}
/**
@@ -1063,10 +1050,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @param heap Heap size.
* @return Topology snapshot message.
*/
- private String topologySnapshotMessage(int serverNodesNum, int clientNodesNum, int totalCpus, double heap) {
+ private String topologySnapshotMessage(int srvNodesNum, int clientNodesNum, int totalCpus, double heap) {
return PREFIX + " [" +
(discoOrdered ? "ver=" + topSnap.get().topVer.topologyVersion() + ", " : "") +
- "server nodes=" + serverNodesNum +
+ "server nodes=" + srvNodesNum +
", client nodes=" + clientNodesNum +
", CPUs=" + totalCpus +
", heap=" + heap + "GB" +
@@ -1917,6 +1904,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
break;
}
+ case EVT_CLIENT_NODE_DISCONNECTED: {
+ if (log.isInfoEnabled())
+ log.info("Client node disconnected from topology: " + node);
+
+ break;
+ }
+
+ case EVT_CLIENT_NODE_RECONNECTED: {
+ if (log.isInfoEnabled())
+ log.info("Client node reconnected to topology: " + node);
+
+ ackTopology(topVer.topologyVersion(), true);
+
+ break;
+ }
+
case EVT_NODE_FAILED: {
// Check only if resolvers were configured.
if (hasRslvrs)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index ebbe639..61f7e58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -575,8 +575,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
- sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(
- ctx, ctx.config().getCacheStoreSessionListenerFactories()));
+ sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx,
+ ctx.config().getCacheStoreSessionListenerFactories()));
ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)",
!ctx.config().getTransactionConfiguration().isTxSerializableEnabled());
@@ -1721,8 +1721,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
depMgr,
exchMgr,
ioMgr,
- storeSesLsnrs,
- jta
+ jta,
+ storeSesLsnrs
);
}
@@ -1733,8 +1733,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
+ // Collect dynamically started caches to a single object.
Collection<DynamicCacheChangeRequest> reqs =
- // Collect dynamically started caches to a single object.
new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 715d514..d0064f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -87,9 +87,15 @@ public class GridCacheSharedContext<K, V> {
private Collection<CacheStoreSessionListener> storeSesLsnrs;
/**
+ * @param kernalCtx Context.
* @param txMgr Transaction manager.
* @param verMgr Version manager.
* @param mvccMgr MVCC manager.
+ * @param depMgr Deployment manager.
+ * @param exchMgr Exchange manager.
+ * @param ioMgr IO manager.
+ * @param jtaMgr JTA manager.
+ * @param storeSesLsnrs Store session listeners.
*/
public GridCacheSharedContext(
GridKernalContext kernalCtx,
@@ -99,12 +105,12 @@ public class GridCacheSharedContext<K, V> {
GridCacheDeploymentManager<K, V> depMgr,
GridCachePartitionExchangeManager<K, V> exchMgr,
GridCacheIoManager ioMgr,
- Collection<CacheStoreSessionListener> storeSesLsnrs,
- CacheJtaManagerAdapter jtaMgr
+ CacheJtaManagerAdapter jtaMgr,
+ Collection<CacheStoreSessionListener> storeSesLsnrs
) {
this.kernalCtx = kernalCtx;
- setManagers(txMgr, verMgr, mvccMgr, depMgr, exchMgr, ioMgr);
+ setManagers(txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, ioMgr);
this.storeSesLsnrs = storeSesLsnrs;
@@ -114,6 +120,7 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @param reconnectFut Reconnect future.
* @throws IgniteCheckedException If failed.
*/
void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
@@ -137,6 +144,7 @@ public class GridCacheSharedContext<K, V> {
mgrs = new LinkedList<>();
setManagers(txMgr,
+ jtaMgr,
verMgr,
mvccMgr,
new GridCacheDeploymentManager<K, V>(),
@@ -149,7 +157,17 @@ public class GridCacheSharedContext<K, V> {
}
}
+ /**
+ * @param txMgr Transaction manager.
+ * @param verMgr Version manager.
+ * @param mvccMgr MVCC manager.
+ * @param depMgr Deployment manager.
+ * @param exchMgr Exchange manager.
+ * @param ioMgr IO manager.
+ * @param jtaMgr JTA manager.
+ */
private void setManagers(IgniteTxManager txMgr,
+ CacheJtaManagerAdapter jtaMgr,
GridCacheVersionManager verMgr,
GridCacheMvccManager mvccMgr,
GridCacheDeploymentManager<K, V> depMgr,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 0369eb9..38a0d55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -458,8 +458,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (isDone())
return;
- log.info("Init exchange: " + exchangeId());
-
if (init.compareAndSet(false, true)) {
if (isDone())
return;
@@ -1026,8 +1024,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
cctx.exchange().onExchangeDone(this, err);
if (super.onDone(res, err) && !dummy && !forcePreload) {
- log.info("Finished exchange: " + exchangeId() + ", err=" + err);
-
if (log.isDebugEnabled())
log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + ']');
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
index 968d88d..0f6ed5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
@@ -17,6 +17,7 @@
package org.apache.ignite.spi;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -106,4 +107,18 @@ public interface IgniteSpi {
* @throws IgniteSpiException Thrown in case of any error during SPI stop.
*/
public void spiStop() throws IgniteSpiException;
+
+ /**
+ * Client node disconnected callback.
+ *
+ * @param reconnectFut Future that will be completed when client reconnected.
+ */
+ public void onClientDisconnected(IgniteFuture<?> reconnectFut);
+
+ /**
+ * Client node reconnected callback.
+ *
+ * @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected.
+ */
+ public void onClientReconnected(boolean clusterRestarted);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index dd19203..a49d85a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.plugin.security.*;
import org.apache.ignite.resources.*;
@@ -191,8 +192,20 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
spiCtx = new GridDummySpiContext(locNode, true, spiCtx);
}
+ /** {@inheritDoc} */
+ @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onClientReconnected(boolean clusterRestarted) {
+ // No-op.
+ }
+
/**
* Inject ignite instance.
+ *
+ * @param ignite Ignite instance.
*/
@IgniteInstanceResource
protected void injectResources(Ignite ignite) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index b3793b1..8041a63 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -512,9 +512,18 @@ class ClientImpl extends TcpDiscoveryImpl {
tstamp = U.currentTimeMillis();
- TcpDiscoveryAbstractMessage msg = recon ?
- new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId) :
- new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId()));
+ TcpDiscoveryAbstractMessage msg;
+
+ if (!recon) {
+ TcpDiscoveryNode node = locNode;
+
+ if (locNode.order() > 0)
+ node = locNode.clientReconnectNode();
+
+ msg = new TcpDiscoveryJoinRequestMessage(node, spi.collectExchangeData(getLocalNodeId()));
+ }
+ else
+ msg = new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId);
msg.client(true);
@@ -977,6 +986,8 @@ class ClientImpl extends TcpDiscoveryImpl {
long timeout = join ? spi.joinTimeout : spi.netTimeout;
+ log.info("Will try to reconnect with timeout: " + timeout);
+
long startTime = U.currentTimeMillis();
try {
@@ -1020,6 +1031,8 @@ class ClientImpl extends TcpDiscoveryImpl {
TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
if (res.creatorNodeId().equals(getLocalNodeId())) {
+ log.info("Reconnect status: " + res.success());
+
if (res.success()) {
msgWorker.addMessage(res);
@@ -1032,7 +1045,7 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
}
- else // TODO IGNITE-901 reuse socket.
+ else
return;
}
}
@@ -1136,6 +1149,8 @@ class ClientImpl extends TcpDiscoveryImpl {
break;
}
else if (state == ClientImpl.State.DISCONNECTED) {
+ log.info("Rejoin timeout, will segment.");
+
state = ClientImpl.State.SEGMENTED;
notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
@@ -1172,6 +1187,8 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
else {
+ log.info("Socket closed, will try to reconnect.");
+
assert reconnector == null;
final Reconnector reconnector = new Reconnector(join);
@@ -1187,30 +1204,40 @@ class ClientImpl extends TcpDiscoveryImpl {
reconnector = null;
if (spi.isClientReconnectDisabled()) {
- state = ClientImpl.State.SEGMENTED;
+ if (state != ClientImpl.State.SEGMENTED && state != ClientImpl.State.STOPPED) {
+ log.info("Reconnected failed, will segment.");
- notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+ state = ClientImpl.State.SEGMENTED;
+
+ notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+ }
}
else {
- state = ClientImpl.State.DISCONNECTED;
+ log.info("Reconnected failed, will try join.");
- nodeAdded = false;
+ if (state != ClientImpl.State.DISCONNECTED) {
+ state = ClientImpl.State.DISCONNECTED;
- IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
- null, "Failed to ping node, client node disconnected.");
+ nodeAdded = false;
- for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
- GridFutureAdapter<Boolean> fut = e.getValue();
+ IgniteClientDisconnectedCheckedException err =
+ new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, client node disconnected.");
- if (pingFuts.remove(e.getKey(), fut))
- fut.onDone(err);
- }
+ for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
+ GridFutureAdapter<Boolean> fut = e.getValue();
- notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+ if (pingFuts.remove(e.getKey(), fut))
+ fut.onDone(err);
+ }
+
+ notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+ }
UUID newId = UUID.randomUUID();
- log.info("Change node id: " + newId + " " + locNode.attribute(IgniteNodeAttributes.ATTR_GRID_NAME));
+ if (log.isInfoEnabled())
+ log.info("Client will try to reconnect to cluster with new id " +
+ "[id=" + newId + ", prevId=" + locNode.id() + ']');
locNode.onClientDisconnected(newId);
@@ -1220,7 +1247,7 @@ class ClientImpl extends TcpDiscoveryImpl {
else {
TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg;
- if (joinLatch.getCount() > 0) { // TODO IGNITE-901.
+ if (joining()) {
IgniteSpiException err = null;
if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage)
@@ -1231,7 +1258,13 @@ class ClientImpl extends TcpDiscoveryImpl {
err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
if (err != null) {
- joinError(err);
+ if (state == ClientImpl.State.DISCONNECTED) {
+ state = ClientImpl.State.SEGMENTED;
+
+ notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+ }
+ else
+ joinError(err);
break;
}
@@ -1263,12 +1296,16 @@ class ClientImpl extends TcpDiscoveryImpl {
boolean join = state == ClientImpl.State.STARTING;
+ log.info("Try join topology with timeout: " + spi.joinTimeout);
+
final Socket sock = joinTopology(false, spi.joinTimeout);
if (sock == null) {
if (join)
joinError(new IgniteSpiException("Join process timed out."));
else {
+ log.info("Send join request on rejoin failed, will segment.");
+
state = ClientImpl.State.SEGMENTED;
notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
@@ -1284,7 +1321,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (spi.joinTimeout > 0) {
timer.schedule(new TimerTask() {
@Override public void run() {
- if (joinLatch.getCount() > 0)
+ if (joining())
queue.add(JOIN_TIMEOUT);
}
}, spi.joinTimeout);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 1e849f4..b8f1a86 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -449,9 +449,18 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
*/
public void onClientDisconnected(UUID newId) {
id = newId;
- order = 0;
- intOrder = 0;
- visible = false;
+ }
+
+ /**
+ * @return Copy of local node for client reconnect request.
+ */
+ public TcpDiscoveryNode clientReconnectNode() {
+ TcpDiscoveryNode node = new TcpDiscoveryNode(id, addrs, hostNames, discPort, metricsProvider, ver);
+
+ node.attrs = attrs;
+ node.clientRouterNodeId = clientRouterNodeId;
+
+ return node;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
index 9780080..b8f9ce1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.managers.deployment;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.resource.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.jdk.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.deployment.*;
@@ -95,5 +96,15 @@ public class GridDeploymentManagerStopSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public boolean unregister(String rsrcName) { return false; }
+
+ /** {@inheritDoc} */
+ @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onClientReconnected(boolean clusterRestarted) {
+ // No-op.
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index e9d7a45..9a883b3 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -55,8 +55,8 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
new GridCacheDeploymentManager<K, V>(),
new GridCachePartitionExchangeManager<K, V>(),
new GridCacheIoManager(),
- null,
- new CacheNoopJtaManager()
+ new CacheNoopJtaManager(),
+ null
),
defaultCacheConfiguration(),
CacheType.USER,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index ba38dfc..5838481 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -111,6 +111,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** */
private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
+ /** */
+ private boolean reconnectDisabled;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -159,6 +162,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
disco.setJoinTimeout(joinTimeout);
disco.setNetworkTimeout(netTimeout);
+ disco.setClientReconnectDisabled(reconnectDisabled);
+
disco.afterWrite(afterWrite);
cfg.setDiscoverySpi(disco);
@@ -633,6 +638,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
public void testClientSegmentation() throws Exception {
clientsPerSrv = 1;
+ reconnectDisabled = true;
+
startServerNodes(3);
startClientNodes(3);
@@ -656,6 +663,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
final TcpDiscoverySpi disco = (TcpDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi();
try {
+ log.info("Fail server: " + 2);
+
failServer(2);
await(srvFailedLatch);
@@ -888,8 +897,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
try {
startClientNodes(1);
- assertEquals(G.ignite("server-0").cluster().localNode().id(), ((TcpDiscoveryNode) G.ignite("client-0")
- .cluster().localNode()).clientRouterNodeId());
+ assertEquals(G.ignite("server-0").cluster().localNode().id(),
+ ((TcpDiscoveryNode) G.ignite("client-0").cluster().localNode()).clientRouterNodeId());
checkNodes(2, 1);
@@ -1278,8 +1287,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
final CountDownLatch disconnectLatch = new CountDownLatch(1);
client.events().localListen(new IgnitePredicate<Event>() {
- @Override
- public boolean apply(Event evt) {
+ @Override public boolean apply(Event evt) {
info("Client event: " + evt);
if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
@@ -1334,22 +1342,230 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testClientReconnectDisabled() throws Exception {
- // TODO IGNTIE-901.
+ public void testClientFailReconnectDisabled() throws Exception {
+ reconnectDisabled = true;
+
+ startServerNodes(1);
+
+ startClientNodes(1);
+
+ Ignite srv = G.ignite("server-0");
+
+ TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+
+ Ignite client = G.ignite("client-0");
+
+ final CountDownLatch segmentedLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_NODE_SEGMENTED)
+ segmentedLatch.countDown();
+
+ return false;
+ }
+ }, EVT_NODE_SEGMENTED);
+
+ srvFailedLatch = new CountDownLatch(1);
+
+ attachListeners(1, 0);
+
+ log.info("Fail client node.");
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(srvFailedLatch.await(5000, MILLISECONDS));
+ assertTrue(segmentedLatch.await(5000, MILLISECONDS));
+
+ checkNodes(1, 0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception {
+ reconnectSegmentedAfterJoinTimeout(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception {
+ reconnectSegmentedAfterJoinTimeout(false);
+ }
+
+ /**
+ * @param failSrv If {@code true} fails server, otherwise server does not send join message.
+ * @throws Exception If failed.
+ */
+ private void reconnectSegmentedAfterJoinTimeout(boolean failSrv) throws Exception {
+ netTimeout = 4000;
+ joinTimeout = 5000;
+
+ startServerNodes(1);
+
+ startClientNodes(1);
+
+ Ignite srv = G.ignite("server-0");
+ Ignite client = G.ignite("client-0");
+
+ TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+ TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi());
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch segmentedLatch = new CountDownLatch(1);
+ final AtomicBoolean err = new AtomicBoolean(false);
+
+ if (!failSrv) {
+ srvFailedLatch = new CountDownLatch(1);
+
+ attachListeners(1, 0);
+ }
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ log.info("Disconnected event.");
+
+ assertEquals(1, segmentedLatch.getCount());
+ assertEquals(1, disconnectLatch.getCount());
+ assertFalse(err.get());
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_NODE_SEGMENTED) {
+ log.info("Segmented event.");
+
+ assertEquals(1, segmentedLatch.getCount());
+ assertEquals(0, disconnectLatch.getCount());
+ assertFalse(err.get());
+
+ segmentedLatch.countDown();
+ }
+ else {
+ log.error("Unexpected event: " + evt);
+
+ err.set(true);
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED);
+
+ if (failSrv) {
+ log.info("Fail server.");
+
+ failServer(0);
+ }
+ else {
+ log.info("Fail client connection.");
+
+ srvSpi.failClientReconnect.set(1_000_000);
+ srvSpi.failNodeAdded.set(1_000_000);
+
+ clientSpi.brakeConnection();
+ }
+
+ assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+
+ assertTrue(segmentedLatch.await(10_000, MILLISECONDS));
+
+ waitSegmented(client);
+
+ assertFalse(err.get());
+
+ if (!failSrv)
+ await(srvFailedLatch);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectClusterRestart() throws Exception {
+ netTimeout = 3000;
+ joinTimeout = 60_000;
+
+ clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+ clientIpFinder.setAddresses(Collections.singleton("localhost:47500..47509"));
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+ final AtomicBoolean err = new AtomicBoolean(false);
+
+ startServerNodes(1);
+
+ startClientNodes(1);
+
+ Ignite srv = G.ignite("server-0");
+ Ignite client = G.ignite("client-0");
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override
+ public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ log.info("Disconnected event.");
+
+ assertEquals(1, reconnectLatch.getCount());
+ assertEquals(1, disconnectLatch.getCount());
+
+ disconnectLatch.countDown();
+ } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ log.info("Reconnected event.");
+
+ assertEquals(1, reconnectLatch.getCount());
+ assertEquals(0, disconnectLatch.getCount());
+ assertFalse(err.get());
+
+ reconnectLatch.countDown();
+ } else {
+ log.error("Unexpected event: " + evt);
+
+ err.set(true);
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED);
+
+ log.info("Stop server.");
+
+ srv.close();
+
+ assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+
+ srvNodeIds.clear();
+ srvIdx.set(0);
+
+ Thread.sleep(3000);
+
+ log.info("Restart server.");
+
+ startServerNodes(1);
+
+ assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+ assertFalse(err.get());
}
/**
* @throws Exception If failed.
*/
public void testDisconnectAfterNetworkTimeout() throws Exception {
- // TODO IGNTIE-901.
}
/**
+ * @param ignite Ignite.
* @throws Exception If failed.
*/
- public void testReconnectSegmentedAfterJoinTimeout() throws Exception {
- // TODO IGNTIE-901.
+ private void waitSegmented(final Ignite ignite) throws Exception {
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return IgniteState.STOPPED_ON_SEGMENTATION == Ignition.state(ignite.name());
+ }
+ }, 5000);
+
+ assertEquals(IgniteState.STOPPED_ON_SEGMENTATION, Ignition.state(ignite.name()));
}
/**
@@ -1567,7 +1783,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @throws InterruptedException If interrupted.
*/
private void await(CountDownLatch latch) throws InterruptedException {
- assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS));
+ assertTrue("Latch count: " + latch.getCount(), latch.await(10_000, MILLISECONDS));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 05677a4..2b2996d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -26,7 +26,7 @@ import org.h2.table.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
-import javax.cache.CacheException;
+import javax.cache.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -93,6 +93,9 @@ public abstract class GridMergeIndex extends BaseIndex {
throw new IllegalStateException();
}
+ /**
+ * @param e Error.
+ */
public void fail(final CacheException e) {
for (UUID nodeId0 : remainingRows.keySet()) {
addPage0(new GridResultPage(null, nodeId0, null) {
@@ -100,8 +103,7 @@ public abstract class GridMergeIndex extends BaseIndex {
return true;
}
- @Override
- public void fetchNextPage() {
+ @Override public void fetchNextPage() {
throw e;
}
});
@@ -111,23 +113,12 @@ public abstract class GridMergeIndex extends BaseIndex {
/**
* @param nodeId Node ID.
*/
- public void fail(@Nullable UUID nodeId) {
- if (nodeId != null) {
- addPage0(new GridResultPage(null, nodeId, null) {
- @Override public boolean isFail() {
- return true;
- }
- });
- }
- else {
- for (UUID nodeId0 : remainingRows.keySet()) {
- addPage0(new GridResultPage(null, nodeId0, null) {
- @Override public boolean isFail() {
- return true;
- }
- });
+ public void fail(UUID nodeId) {
+ addPage0(new GridResultPage(null, nodeId, null) {
+ @Override public boolean isFail() {
+ return true;
}
- }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 8f03681..cde3288 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -1162,6 +1162,9 @@ public class GridReduceQueryExecutor {
tbl.getScanIndex(null).fail(nodeId);
}
+ /**
+ * @param e Error.
+ */
void disconnected(CacheException e) {
if (!state.compareAndSet(null, e))
return;