You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/01 17:11:14 UTC
[1/2] incubator-ignite git commit: # ignite-901 client reconnect WIP
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-901 [created] f5f3efd16
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 1071ef2..5a898b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -97,17 +97,62 @@ public class GridCacheSharedContext<K, V> {
Collection<CacheStoreSessionListener> storeSesLsnrs
) {
this.kernalCtx = kernalCtx;
+
+ setManagers(txMgr, verMgr, mvccMgr, depMgr, exchMgr, ioMgr);
+
+ this.storeSesLsnrs = storeSesLsnrs;
+
+ txMetrics = new TransactionMetricsAdapter();
+
+ ctxMap = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ void onDisconnected() throws IgniteCheckedException {
+ for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size());
+ it.hasPrevious();) {
+ GridCacheSharedManager<?, ?> mgr = it.previous();
+
+ if (mgr.restartOnDisconnect())
+ mgr.onKernalStop(true, true);
+ }
+
+ for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) {
+ GridCacheSharedManager<?, ?> mgr = it.previous();
+
+ if (mgr.restartOnDisconnect())
+ mgr.stop(true);
+ }
+
+ mgrs = new LinkedList<>();
+
+ setManagers(txMgr,
+ verMgr,
+ mvccMgr,
+ new GridCacheDeploymentManager<K, V>(),
+ new GridCachePartitionExchangeManager<K, V>(),
+ ioMgr);
+
+ for (GridCacheSharedManager<K, V> mgr : mgrs) {
+ if (mgr.restartOnDisconnect())
+ mgr.start(this);
+ }
+ }
+
+ private void setManagers(IgniteTxManager txMgr,
+ GridCacheVersionManager verMgr,
+ GridCacheMvccManager mvccMgr,
+ GridCacheDeploymentManager<K, V> depMgr,
+ GridCachePartitionExchangeManager<K, V> exchMgr,
+ GridCacheIoManager ioMgr) {
this.mvccMgr = add(mvccMgr);
this.verMgr = add(verMgr);
this.txMgr = add(txMgr);
this.depMgr = add(depMgr);
this.exchMgr = add(exchMgr);
this.ioMgr = add(ioMgr);
- this.storeSesLsnrs = storeSesLsnrs;
-
- txMetrics = new TransactionMetricsAdapter();
-
- ctxMap = new ConcurrentHashMap<>();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
index d45052c..5d27657 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
@@ -46,7 +46,7 @@ public interface GridCacheSharedManager <K, V> {
/**
* @param cancel Cancel flag.
*/
- public void onKernalStop(boolean cancel);
+ public void onKernalStop(boolean cancel, boolean disconnected);
/**
* Prints memory statistics (sizes of internal data structures, etc.).
@@ -54,4 +54,9 @@ public interface GridCacheSharedManager <K, V> {
* NOTE: this method is for testing and profiling purposes only.
*/
public void printMemoryStats();
+
+ /**
+ *
+ */
+ public boolean restartOnDisconnect();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
index 2cf7051..61dbc25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
@@ -101,14 +101,14 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
}
/** {@inheritDoc} */
- @Override public final void onKernalStop(boolean cancel) {
+ @Override public final void onKernalStop(boolean cancel, boolean disconnected) {
if (!starting.get())
// Ignoring attempt to stop manager that has never been started.
return;
- onKernalStop0(cancel);
+ onKernalStop0(cancel, disconnected);
- if (log != null && log.isDebugEnabled())
+ if (!disconnected && log != null && log.isDebugEnabled())
log.debug(kernalStopInfo());
}
@@ -121,8 +121,9 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
/**
* @param cancel Cancel flag.
+ * @param disconnected Disconnected flag.
*/
- protected void onKernalStop0(boolean cancel) {
+ protected void onKernalStop0(boolean cancel, boolean disconnected) {
// No-op.
}
@@ -160,6 +161,11 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
}
/** {@inheritDoc} */
+ @Override public boolean restartOnDisconnect() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheSharedManagerAdapter.class, this);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 38a0d55..0369eb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -458,6 +458,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (isDone())
return;
+ log.info("Init exchange: " + exchangeId());
+
if (init.compareAndSet(false, true)) {
if (isDone())
return;
@@ -1024,6 +1026,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
cctx.exchange().onExchangeDone(this, err);
if (super.onDone(res, err) && !dummy && !forcePreload) {
+ log.info("Finished exchange: " + exchangeId() + ", err=" + err);
+
if (log.isDebugEnabled())
log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + ']');
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 0355bb3..969d7a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -180,13 +180,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
topVer.setIfGreater(startTopVer);
- // Generate dummy discovery event for local node joining.
- DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent();
-
- assert discoEvt != null;
-
- assert discoEvt.topologyVersion() == startTopVer;
-
supplyPool.start();
demandPool.start();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 351d6cd..a7e3f4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -90,6 +90,11 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
public abstract GridDhtCacheAdapter<K, V> dht();
/** {@inheritDoc} */
+ @Override public void disconnected() {
+ map = new GridCacheConcurrentMap(ctx, ctx.config().getNearConfiguration().getNearStartSize(), 0.75F);
+ }
+
+ /** {@inheritDoc} */
@Override public boolean isNear() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index d59a51d..39f6bd5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -552,7 +552,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
// Creates task session with task name and task version.
GridTaskSessionImpl ses = ctx.session().createTaskSession(
sesId,
- ctx.config().getNodeId(),
+ ctx.localNodeId(),
taskName,
dep,
taskCls == null ? null : taskCls.getName(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 5e557bd..dd19203 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -58,9 +58,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
/** Ignite instance. */
protected Ignite ignite;
- /** Local node id. */
- protected UUID nodeId;
-
/** Grid instance name. */
protected String gridName;
@@ -111,7 +108,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
/** {@inheritDoc} */
@Override public UUID getLocalNodeId() {
- return nodeId;
+ return ignite.cluster().localNode().id();
}
/** {@inheritDoc} */
@@ -201,10 +198,8 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
protected void injectResources(Ignite ignite) {
this.ignite = ignite;
- if (ignite != null) {
- nodeId = ignite.configuration().getNodeId();
+ if (ignite != null)
gridName = ignite.name();
- }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index addf243d..5eaca21 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -248,7 +248,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isDebugEnabled())
log.debug("Sending local node ID to newly accepted session: " + ses);
- ses.send(nodeIdMsg);
+ ses.send(nodeIdMessage());
}
}
@@ -700,9 +700,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Address resolver. */
private AddressResolver addrRslvr;
- /** Local node ID message. */
- private NodeIdMessage nodeIdMsg;
-
/** Received messages count. */
private final LongAdder8 rcvdMsgsCnt = new LongAdder8();
@@ -739,10 +736,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
- assert evt instanceof DiscoveryEvent;
- assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
+ assert evt instanceof DiscoveryEvent : evt;
+ assert evt.type() == EVT_NODE_LEFT ||
+ evt.type() == EVT_NODE_FAILED ||
+ evt.type() == EVT_CLIENT_NODE_DISCONNECTED : evt;
+
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ for (GridCommunicationClient client : clients.values())
+ client.forceClose();
+
+ IgniteCheckedException err = new IgniteCheckedException("Failed to connect to node, " +
+ "local node node disconnected.");
- onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
+ for (GridFutureAdapter<GridCommunicationClient> clientFut : clientFuts.values())
+ clientFut.onDone(err);
+
+ recoveryDescs.clear();
+ }
+ else
+ onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
}
};
@@ -1237,8 +1249,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** {@inheritDoc} */
@Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
- nodeIdMsg = new NodeIdMessage(getLocalNodeId());
-
assertParameter(locPort > 1023, "locPort > 1023");
assertParameter(locPort <= 0xffff, "locPort < 0xffff");
assertParameter(locPortRange >= 0, "locPortRange >= 0");
@@ -1371,7 +1381,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (boundTcpShmemPort > 0)
spiCtx.registerPort(boundTcpShmemPort, IgnitePortProtocol.TCP);
- spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_CLIENT_NODE_DISCONNECTED);
ctxInitLatch.countDown();
}
@@ -1666,10 +1676,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isTraceEnabled())
log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']');
- UUID locNodeId = getLocalNodeId();
-
- if (node.id().equals(locNodeId))
- notifyListener(locNodeId, msg, NOOP);
+ if (node.isLocal())
+ notifyListener(node.id(), msg, NOOP);
else {
GridCommunicationClient client = null;
@@ -2208,7 +2216,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
if (recovery != null) {
- HandshakeMessage msg = new HandshakeMessage(getLocalNodeId(),
+ HandshakeMessage msg = new HandshakeMessage(getSpiContext().localNode().id(),
recovery.incrementConnectCount(),
recovery.receivedCount());
@@ -2228,7 +2236,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ch.write(buf);
}
else
- ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType));
+ ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
if (recovery != null) {
if (log.isDebugEnabled())
@@ -2355,6 +2363,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
getExceptionRegistry().onException(msg, e);
}
+ /**
+ * @return Node ID message.
+ */
+ private NodeIdMessage nodeIdMessage() {
+ return new NodeIdMessage(getSpiContext().localNode().id());
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpCommunicationSpi.class, this);
@@ -2860,15 +2875,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
try {
+ UUID id = getSpiContext().localNode().id();
+
+ NodeIdMessage msg = new NodeIdMessage(id);
+
out.write(U.IGNITE_HEADER);
out.write(NODE_ID_MSG_TYPE);
- out.write(nodeIdMsg.nodeIdBytes);
+ out.write(msg.nodeIdBytes);
out.flush();
if (log.isDebugEnabled())
- log.debug("Sent local node ID [locNodeId=" + getLocalNodeId() + ", rmtNodeId="
- + rmtNodeId + ']');
+ log.debug("Sent local node ID [locNodeId=" + id + ", rmtNodeId=" + rmtNodeId + ']');
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to perform handshake.", e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 04276d2..4b1cfa7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -70,9 +70,6 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private SocketReader sockReader;
- /** */
- private boolean segmented;
-
/** Last message ID. */
private volatile IgniteUuid lastMsgId;
@@ -325,7 +322,7 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
- if (segmented)
+ if (state == State.SEGMENTED)
throw new IgniteException("Failed to send custom message: client is disconnected");
try {
@@ -339,7 +336,7 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override public void failNode(UUID nodeId, @Nullable String warning) {
- ClusterNode node = rmtNodes.get(nodeId);
+ TcpDiscoveryNode node = rmtNodes.get(nodeId);
if (node != null) {
TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
@@ -366,7 +363,8 @@ class ClientImpl extends TcpDiscoveryImpl {
long startTime = U.currentTimeMillis();
// Marshal credentials for backward compatibility and security.
- marshalCredentials(locNode);
+ if (!recon)
+ marshalCredentials(locNode);
while (true) {
if (Thread.currentThread().isInterrupted())
@@ -947,7 +945,10 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
- assert !segmented;
+ assert state == ClientImpl.State.DISCONNECTED
+ || state == ClientImpl.State.CONNECTED
+ || state == ClientImpl.State.STARTING :
+ state;
boolean success = false;
@@ -1007,9 +1008,11 @@ class ClientImpl extends TcpDiscoveryImpl {
}
success = true;
- }
- return;
+ return;
+ }
+ else // TODO IGNITE-901 reuse socket.
+ return;
}
}
else if (spi.ensured(msg)) {
@@ -1090,45 +1093,33 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@SuppressWarnings("InfiniteLoopStatement")
@Override protected void body() throws InterruptedException {
+ state = ClientImpl.State.STARTING;
+
spi.stats.onJoinStarted();
try {
- final Socket sock = joinTopology(false, spi.joinTimeout);
-
- if (sock == null) {
- joinError(new IgniteSpiException("Join process timed out."));
-
- return;
- }
-
- currSock = sock;
-
- sockWriter.setSocket(sock);
-
- if (spi.joinTimeout > 0) {
- timer.schedule(new TimerTask() {
- @Override public void run() {
- if (joinLatch.getCount() > 0)
- queue.add(JOIN_TIMEOUT);
- }
- }, spi.joinTimeout);
- }
-
- sockReader.setSocket(sock, locNode.clientRouterNodeId());
+ tryJoin();
while (true) {
Object msg = queue.take();
if (msg == JOIN_TIMEOUT) {
- if (joinLatch.getCount() > 0) {
+ if (state == ClientImpl.State.STARTING) {
joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
"join request (consider increasing 'joinTimeout' configuration property) " +
- "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+ "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']'));
break;
}
+ else if (state == ClientImpl.State.DISCONNECTED) {
+ state = ClientImpl.State.SEGMENTED;
+
+ notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+ }
}
else if (msg == SPI_STOP) {
+ state = ClientImpl.State.STOPPED;
+
assert spi.getSpiContext().isStopping();
if (currSock != null) {
@@ -1147,7 +1138,7 @@ class ClientImpl extends TcpDiscoveryImpl {
boolean join = joinLatch.getCount() > 0;
- if (spi.getSpiContext().isStopping() || segmented) {
+ if (spi.getSpiContext().isStopping() || (state == ClientImpl.State.SEGMENTED)) {
leaveLatch.countDown();
if (join) {
@@ -1166,19 +1157,31 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
else if (msg == SPI_RECONNECT_FAILED) {
- if (!segmented) {
- segmented = true;
+ reconnector.cancel();
+ reconnector.join();
- reconnector.cancel();
- reconnector.join();
+ reconnector = null;
- notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
- }
+ state = ClientImpl.State.DISCONNECTED;
+
+ nodeAdded = false;
+
+ notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+
+ UUID newId = UUID.randomUUID();
+
+ log.info("Change node id: " + newId);
+
+ rmtNodes.clear();
+
+ locNode.onClientDisconnected(newId);
+
+ tryJoin();
}
else {
TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg;
- if (joinLatch.getCount() > 0) {
+ if (joinLatch.getCount() > 0) { // TODO IGNITE-901.
IgniteSpiException err = null;
if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage)
@@ -1214,6 +1217,44 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
+ * @throws InterruptedException If interrupted.
+ */
+ private void tryJoin() throws InterruptedException {
+ assert state == ClientImpl.State.DISCONNECTED || state == ClientImpl.State.STARTING : state;
+
+ boolean join = state == ClientImpl.State.STARTING;
+
+ final Socket sock = joinTopology(false, spi.joinTimeout);
+
+ if (sock == null) {
+ if (join)
+ joinError(new IgniteSpiException("Join process timed out."));
+ else {
+ state = ClientImpl.State.SEGMENTED;
+
+ notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+ }
+
+ return;
+ }
+
+ currSock = sock;
+
+ sockWriter.setSocket(sock);
+
+ if (spi.joinTimeout > 0) {
+ timer.schedule(new TimerTask() {
+ @Override public void run() {
+ if (joinLatch.getCount() > 0)
+ queue.add(JOIN_TIMEOUT);
+ }
+ }, spi.joinTimeout);
+ }
+
+ sockReader.setSocket(sock, locNode.clientRouterNodeId());
+ }
+
+ /**
* @param msg Message.
*/
protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) {
@@ -1244,6 +1285,16 @@ class ClientImpl extends TcpDiscoveryImpl {
spi.stats.onMessageProcessingFinished(msg);
}
+ private boolean nodeAdded;
+
+ private boolean joining() {
+ return state == ClientImpl.State.STARTING || state == ClientImpl.State.DISCONNECTED;
+ }
+
+ private boolean disconnected() {
+ return state == ClientImpl.State.DISCONNECTED;
+ }
+
/**
* @param msg Message.
*/
@@ -1256,12 +1307,15 @@ class ClientImpl extends TcpDiscoveryImpl {
UUID newNodeId = node.id();
if (getLocalNodeId().equals(newNodeId)) {
- if (joinLatch.getCount() > 0) {
+ if (joining()) {
Collection<TcpDiscoveryNode> top = msg.topology();
if (top != null) {
spi.gridStartTime = msg.gridStartTime();
+ if (disconnected())
+ rmtNodes.clear();
+
for (TcpDiscoveryNode n : top) {
if (n.order() > 0)
n.visible(true);
@@ -1271,6 +1325,8 @@ class ClientImpl extends TcpDiscoveryImpl {
topHist.clear();
+ nodeAdded = true;
+
if (msg.topologyHistory() != null)
topHist.putAll(msg.topologyHistory());
}
@@ -1308,7 +1364,7 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
if (getLocalNodeId().equals(msg.nodeId())) {
- if (joinLatch.getCount() > 0) {
+ if (joining()) {
Map<UUID, Map<Integer, byte[]>> dataMap = msg.clientDiscoData();
if (dataMap != null) {
@@ -1323,7 +1379,14 @@ class ClientImpl extends TcpDiscoveryImpl {
locNode.order(topVer);
- notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer, msg));
+ Collection<ClusterNode> nodes = updateTopologyHistory(topVer, msg);
+
+ if (disconnected())
+ notifyDiscovery(EVT_CLIENT_NODE_RECONNECTED, topVer, locNode, nodes);
+
+ notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, nodes);
+
+ state = ClientImpl.State.CONNECTED;
joinErr.set(null);;
@@ -1437,7 +1500,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @return {@code True} if received node added message for local node.
*/
private boolean nodeAdded() {
- return !topHist.isEmpty();
+ return nodeAdded;
}
/**
@@ -1582,7 +1645,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
- if (msg.verified() && joinLatch.getCount() == 0) {
+ if (msg.verified() && state == ClientImpl.State.CONNECTED) {
DiscoverySpiListener lsnr = spi.lsnr;
if (lsnr != null) {
@@ -1718,4 +1781,18 @@ class ClientImpl extends TcpDiscoveryImpl {
this.sock = sock;
}
}
+
+ private volatile State state;
+
+ private enum State {
+ STARTING,
+
+ CONNECTED,
+
+ DISCONNECTED,
+
+ SEGMENTED,
+
+ STOPPED
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index fa3e564..05f710d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -585,11 +585,11 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override public void failNode(UUID nodeId, @Nullable String warning) {
- ClusterNode node = ring.node(nodeId);
+ TcpDiscoveryNode node = ring.node(nodeId);
if (node != null) {
TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
- node.id(), node.order());
+ node.id(), node.internalOrder());
msg.warning(warning);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index ace917f..4cb0b8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -112,7 +112,7 @@ abstract class TcpDiscoveryImpl {
* @return Local node ID.
*/
public UUID getLocalNodeId() {
- return spi.getLocalNodeId();
+ return spi.locNode.id();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 7663fe6..9446d2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -844,7 +844,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
}
locNode = new TcpDiscoveryNode(
- getLocalNodeId(),
+ ignite.configuration().getNodeId(),
addrs.get1(),
addrs.get2(),
srvPort,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 36ae39e..d0c3edf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -431,12 +431,29 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
}
/**
+ * @return Metrics provider.
+ */
+ public DiscoveryMetricsProvider metricsProvider() {
+ return metricsProvider;
+ }
+
+ /**
* @param clientRouterNodeId Client router node ID.
*/
public void clientRouterNodeId(UUID clientRouterNodeId) {
this.clientRouterNodeId = clientRouterNodeId;
}
+ /**
+ * @param newId New node ID.
+ */
+ public void onClientDisconnected(UUID newId) {
+ id = newId;
+ order = 0;
+ intOrder = 0;
+ visible = false;
+ }
+
/** {@inheritDoc} */
@Override public int compareTo(@Nullable TcpDiscoveryNode node) {
if (node == null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index 7a88426..000782a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -257,7 +257,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
registerMBean(gridName, this, FileSwapSpaceSpiMBean.class);
- String path = baseDir + File.separator + gridName + File.separator + getLocalNodeId();
+ String path = baseDir + File.separator + gridName + File.separator + ignite.configuration().getNodeId();
try {
dir = U.resolveWorkDirectory(path, true);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
index abc9109..5f2d2b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
@@ -104,8 +104,6 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
* Test kernal gateway that always return uninitialized user stack trace.
*/
private static final GridKernalGateway TEST_GATEWAY = new GridKernalGateway() {
- @Override public void lightCheck() throws IllegalStateException {}
-
@Override public void readLock() throws IllegalStateException {}
@Override public void readLockAnyway() {}
@@ -122,10 +120,6 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
@Override public void writeUnlock() {}
- @Override public void addStopListener(Runnable lsnr) {}
-
- @Override public void removeStopListener(Runnable lsnr) {}
-
@Override public String userStackTrace() {
return null;
}
@@ -133,5 +127,13 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
@Override public boolean tryWriteLock(long timeout) {
return false;
}
+
+ @Override public void onDisconnected() {
+ // No-op.
+ }
+
+ @Override public void onReconnected() {
+ // No-op.
+ }
};
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
new file mode 100644
index 0000000..0512074
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ protected boolean clientMode;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+ disco.setJoinTimeout(2 * 60_000);
+
+ cfg.setDiscoverySpi(disco);
+
+ if (clientMode)
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /**
+ * @return Number of server nodes started before tests.
+ */
+ protected abstract int serverCount();
+
+ /**
+ * @param ignite Node.
+ * @return Discovery SPI.
+ */
+ protected TestTcpDiscoverySpi spi(Ignite ignite) {
+ return ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ int srvs = serverCount();
+
+ if (srvs > 0)
+ startGrids(srvs);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @param client Client.
+ * @return Server node client connected to.
+ */
+ protected Ignite clientRouter(Ignite client) {
+ TcpDiscoveryNode node = (TcpDiscoveryNode)client.cluster().localNode();
+
+ assertTrue(node.isClient());
+ assertNotNull(node.clientRouterNodeId());
+
+ Ignite srv = G.ignite(node.clientRouterNodeId());
+
+ assertNotNull(srv);
+
+ return srv;
+ }
+
+ /**
+ * @param fut Future.
+ * @throws Exception If failed.
+ */
+ protected void assertNotDone(IgniteInternalFuture<?> fut) throws Exception {
+ assertNotNull(fut);
+
+ if (fut.isDone())
+ fail("Future completed with result: " + fut.get());
+ }
+
+ /**
+ *
+ */
+ protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ volatile CountDownLatch writeLatch;
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
+ throws IOException, IgniteCheckedException {
+ if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+ CountDownLatch writeLatch0 = writeLatch;
+
+ if (writeLatch0 != null) {
+ log.info("Block join request send: " + msg);
+
+ U.await(writeLatch0);
+ }
+ }
+
+ super.writeToSocket(sock, msg);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
new file mode 100644
index 0000000..164f6c8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectApiBlockTest extends IgniteClientReconnectAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCacheConfiguration(new CacheConfiguration());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void testIgniteBlockOnDisconnect() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(serverCount());
+
+ assertNotNull(client.cache(null));
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ final List<IgniteInternalFuture> futs = new ArrayList<>();
+
+ // TODO IGNITE-901 test block for others public API.
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ assertEquals(1, reconnectLatch.getCount());
+
+ futs.add(GridTestUtils.runAsync(new Callable() {
+ @Override public Object call() throws Exception {
+ return client.transactions();
+ }
+ }));
+
+ futs.add(GridTestUtils.runAsync(new Callable() {
+ @Override public Object call() throws Exception {
+ return client.cache(null);
+ }
+ }));
+
+ futs.add(GridTestUtils.runAsync(new Callable() {
+ @Override public Object call() throws Exception {
+ return client.dataStreamer(null);
+ }
+ }));
+
+ disconnectLatch.countDown();
+ } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ log.info("Fail client.");
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+
+ assertEquals(3, futs.size());
+
+ for (IgniteInternalFuture<?> fut : futs)
+ assertNotDone(fut);
+
+ U.sleep(2000);
+
+ for (IgniteInternalFuture<?> fut : futs)
+ assertNotDone(fut);
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+
+ IgniteTransactions txs = (IgniteTransactions)futs.get(0).get();
+
+ assertNotNull(txs);
+
+ IgniteCache<Object, Object> cache0 = (IgniteCache<Object, Object>)futs.get(1).get();
+
+ assertNotNull(cache0);
+
+ cache0.put(1, 1);
+
+ assertEquals(1, cache0.get(1));
+
+ IgniteDataStreamer<Object, Object> streamer = (IgniteDataStreamer<Object, Object>)futs.get(2).get();
+
+ streamer.addData(2, 2);
+
+ streamer.close();
+
+ assertEquals(2, cache0.get(2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
new file mode 100644
index 0000000..5687010
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstractTest {
+ /** */
+ private final int SRV_CNT = 1;
+
+ /** */
+ private UUID nodeId;
+
+ /** */
+ private Map<IgnitePredicate<? extends Event>, int[]> lsnrs;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ if (nodeId != null) {
+ cfg.setNodeId(nodeId);
+
+ nodeId = null;
+ }
+
+ if (lsnrs != null) {
+ cfg.setLocalEventListeners(lsnrs);
+
+ lsnrs = null;
+ }
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(SRV_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnect() throws Exception {
+ clientMode = true;
+
+ Ignite client = startGrid(SRV_CNT);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>());
+
+ cache.put(1, 1);
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ final AtomicReference<IgniteInternalFuture> blockPutRef = new AtomicReference<>();
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ assertEquals(1, reconnectLatch.getCount());
+
+ blockPutRef.set(GridTestUtils.runAsync(new Callable() {
+ @Override
+ public Object call() throws Exception {
+ log.info("Start put.");
+
+ cache.put(2, 2);
+
+ log.info("Finish put.");
+
+ return null;
+ }
+ }));
+
+ disconnectLatch.countDown();
+ } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ assertEquals(0, disconnectLatch.getCount());
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ log.info("Fail client.");
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+
+ IgniteInternalFuture putFut = blockPutRef.get();
+
+ assertNotDone(putFut);
+
+ U.sleep(5000);
+
+ assertNotDone(putFut);
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+
+ assertEquals(1, cache.get(1));
+
+ putFut.get();
+
+ assertEquals(2, cache.get(2));
+
+ cache.put(3, 3);
+
+ assertEquals(3, cache.get(3));
+
+ this.clientMode = false;
+
+ IgniteEx srv2 = startGrid(SRV_CNT + 1);
+
+ Integer key = primaryKey(srv2.cache(null));
+
+ cache.put(key, 4);
+
+ assertEquals(4, cache.get(key));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectExchangeInProgress() throws Exception {
+ clientMode = true;
+
+ IgniteEx client = startGrid(SRV_CNT);
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi();
+
+ srvCommSpi.blockMessages(GridDhtPartitionsFullMessage.class, client.localNode().id());
+
+ clientMode = false;
+
+ startGrid(SRV_CNT + 1);
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override
+ public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+
+ try {
+ srvCommSpi.stopBlock(true);
+
+ fail();
+ }
+ catch (IgniteException e) {
+ log.info("Expected error: " + e);
+ }
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setName("newCache");
+
+ ccfg.setCacheMode(REPLICATED);
+
+ log.info("Start new cache.");
+
+ IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+ cache.put(1, 1);
+
+ assertEquals(1, cache.get(1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectInitialExchangeInProgress() throws Exception {
+ final UUID clientId = UUID.randomUUID();
+
+ Ignite srv = grid(0);
+
+ final CountDownLatch joinLatch = new CountDownLatch(1);
+
+ srv.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_NODE_JOINED && ((DiscoveryEvent)evt).eventNode().id().equals(clientId)) {
+ info("Client joined: " + evt);
+
+ joinLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_NODE_JOINED);
+
+ TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi();
+
+ srvCommSpi.blockMessages(GridDhtPartitionsFullMessage.class, clientId);
+
+ clientMode = true;
+
+ nodeId = clientId;
+
+ lsnrs = new HashMap<>();
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ lsnrs.put(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, new int[]{EVT_CLIENT_NODE_RECONNECTED});
+
+ IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
+ @Override
+ public Ignite call() throws Exception {
+ try {
+ return startGrid(SRV_CNT);
+ } catch (Throwable e) {
+ log.error("Unexpected error: " + e, e);
+
+ throw e;
+ }
+ }
+ });
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ assertTrue(joinLatch.await(5000, TimeUnit.MILLISECONDS));
+
+ U.sleep(1000);
+
+ assertNotDone(fut);
+
+ srvSpi.failNode(clientId, null);
+
+ log.info("Wait reconnect.");
+
+ assertTrue(reconnectLatch.await(10 * 60_000, TimeUnit.MILLISECONDS));
+
+ try {
+ srvCommSpi.stopBlock(true);
+
+ fail();
+ }
+ catch (IgniteException e) {
+ log.info("Expected error: " + e);
+ }
+
+ Ignite client = fut.get();
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setName("newCache");
+
+ ccfg.setCacheMode(REPLICATED);
+
+ log.info("Start new cache.");
+
+ IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+ cache.put(1, 1);
+
+ assertEquals(1, cache.get(1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectOperationInProgress() throws Exception {
+ clientMode = true;
+
+ IgniteEx client = startGrid(SRV_CNT);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED)
+ info("Client disconnected: " + evt);
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED)
+ info("Client reconnected: " + evt);
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ IgniteInClosure<IgniteCache<Object, Object>> putOp = new CI1<IgniteCache<Object, Object>>() {
+ @Override public void apply(IgniteCache<Object, Object> cache) {
+ cache.put(1, 1);
+ }
+ };
+
+ IgniteInClosure<IgniteCache<Object, Object>> getOp = new CI1<IgniteCache<Object, Object>>() {
+ @Override public void apply(IgniteCache<Object, Object> cache) {
+ cache.get(1);
+ }
+ };
+
+ int cnt = 0;
+
+ for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) {
+ CacheAtomicWriteOrderMode[] writeOrders =
+ atomicityMode == CacheAtomicityMode.ATOMIC ? CacheAtomicWriteOrderMode.values() :
+ new CacheAtomicWriteOrderMode[]{CacheAtomicWriteOrderMode.CLOCK};
+
+ for (CacheAtomicWriteOrderMode writeOrder : writeOrders) {
+ for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(atomicityMode);
+
+ ccfg.setAtomicWriteOrderMode(writeOrder);
+
+ ccfg.setName("cache-" + cnt++);
+
+ ccfg.setWriteSynchronizationMode(syncMode);
+
+ if (syncMode != CacheWriteSynchronizationMode.FULL_ASYNC) {
+ Class<?> cls = (ccfg.getAtomicityMode() == CacheAtomicityMode.ATOMIC) ?
+ GridNearAtomicUpdateResponse.class : GridNearTxPrepareResponse.class;
+
+ log.info("Test cache put [atomicity=" + atomicityMode +
+ ", writeOrder=" + writeOrder +
+ ", syncMode=" + syncMode + ']');
+
+ checkOperationInProgressFails(client, ccfg, cls, putOp);
+ }
+
+ log.info("Test cache get [atomicity=" + atomicityMode + ", syncMode=" + syncMode + ']');
+
+ checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class, getOp);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param client Client.
+ * @param ccfg Cache configuration.
+ * @param msgToBlock Message to block.
+ * @param c Cache operation closure.
+ * @throws Exception If failed.
+ */
+ private void checkOperationInProgressFails(IgniteEx client,
+ final CacheConfiguration<Object, Object> ccfg,
+ Class<?> msgToBlock,
+ final IgniteInClosure<IgniteCache<Object, Object>> c)
+ throws Exception
+ {
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+ TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi();
+
+ srvCommSpi.blockMessages(msgToBlock, client.localNode().id());
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ c.apply(cache);
+
+ return null;
+ }
+ });
+
+ Thread.sleep(1000);
+
+ assertNotDone(fut);
+
+ log.info("Fail client: " + client.localNode().id());
+
+ srvSpi.failNode(client.localNode().id(), null);
+
+ try {
+ fut.get();
+
+ fail();
+ }
+ catch (IgniteCheckedException e) {
+ log.info("Expected error: " + e);
+ }
+
+ srvCommSpi.stopBlock(false);
+
+ cache.put(1, 1);
+
+ assertEquals(1, cache.get(1));
+
+ client.destroyCache(cache.getName());
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** */
+ private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>();
+
+ /** */
+ private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ if (msg instanceof GridIoMessage) {
+ Object msg0 = ((GridIoMessage)msg).message();
+
+ synchronized (this) {
+ Set<UUID> blockNodes = blockCls.get(msg0.getClass());
+
+ if (F.contains(blockNodes, node.id())) {
+ log.info("Block message [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) +
+ ", msg=" + msg0 + ']');
+
+ blockedMsgs.add(new T2<>(node, (GridIoMessage)msg));
+
+ return;
+ }
+ }
+ }
+
+ super.sendMessage(node, msg);
+ }
+
+ /**
+ * @param cls Message class.
+ * @param nodeId Node ID.
+ */
+ void blockMessages(Class<?> cls, UUID nodeId) {
+ synchronized (this) {
+ Set<UUID> set = blockCls.get(cls);
+
+ if (set == null) {
+ set = new HashSet<>();
+
+ blockCls.put(cls, set);
+ }
+
+ set.add(nodeId);
+ }
+ }
+
+ /**
+ * @param snd Send messages flag.
+ */
+ void stopBlock(boolean snd) {
+ synchronized (this) {
+ blockCls.clear();
+
+ if (snd) {
+ for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
+ ClusterNode node = msg.get1();
+
+ log.info("Send blocked message: [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) +
+ ", msg=" + msg.get2().message() + ']');
+
+ super.sendMessage(msg.get1(), msg.get2());
+ }
+ }
+
+ blockedMsgs.clear();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
new file mode 100644
index 0000000..deffd42
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconnectAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 3;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnect() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(serverCount());
+
+ long topVer = 4;
+
+ IgniteCluster cluster = client.cluster();
+
+ cluster.nodeLocalMap().put("locMapKey", 10);
+
+ Map<Integer, Integer> nodeCnt = new HashMap<>();
+
+ nodeCnt.put(1, 1);
+ nodeCnt.put(2, 2);
+ nodeCnt.put(3, 3);
+ nodeCnt.put(4, 4);
+
+ for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) {
+ Collection<ClusterNode> nodes = cluster.topology(e.getKey());
+
+ assertEquals((int)e.getValue(), nodes.size());
+ }
+
+ ClusterNode locNode = cluster.localNode();
+
+ assertEquals(topVer, locNode.order());
+
+ TestTcpDiscoverySpi srvSpi = spi(clientRouter(client));
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED)
+ info("Disconnected: " + evt);
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+
+ topVer += 2; // Client failed and rejoined.
+
+ locNode = cluster.localNode();
+
+ assertEquals(topVer, locNode.order());
+ assertEquals(topVer, cluster.topologyVersion());
+
+ nodeCnt.put(5, 3);
+ nodeCnt.put(6, 4);
+
+ for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) {
+ Collection<ClusterNode> nodes = cluster.topology(e.getKey());
+
+ assertEquals((int)e.getValue(), nodes.size());
+ }
+
+ assertEquals(10, cluster.nodeLocalMap().get("locMapKey"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
index 19e40bf..7a2e8b3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
@@ -220,7 +220,8 @@ public class GridCacheReplicatedInvalidateSelfTest extends GridCommonAbstractTes
Object msg0 = ((GridIoMessage)msg).message();
if (!(msg0 instanceof GridClockDeltaSnapshotMessage)) {
- info("Sending message [locNodeId=" + getLocalNodeId() + ", destNodeId= " + destNode.id()
+ info("Sending message [locNodeId=" + ignite.cluster().localNode().id() +
+ ", destNodeId= " + destNode.id()
+ ", msg=" + msg + ']');
synchronized (msgCntMap) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index ec6a526..55fae9b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -386,11 +386,11 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
final CountDownLatch latch = new CountDownLatch(1);
((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() {
- @Override public void apply(Socket sock) {
+ @Override
+ public void apply(Socket sock) {
try {
latch.await();
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@@ -744,11 +744,11 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
attachListeners(1, 1);
((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener(new IgniteInClosure<TcpDiscoveryAbstractMessage>() {
- @Override public void apply(TcpDiscoveryAbstractMessage msg) {
+ @Override
+ public void apply(TcpDiscoveryAbstractMessage msg) {
try {
Thread.sleep(1000000);
- }
- catch (InterruptedException ignored) {
+ } catch (InterruptedException ignored) {
Thread.interrupted();
}
}
@@ -778,7 +778,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
G.ignite("client-0").compute().broadcast(F.noop());
assertTrue(GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
+ @Override
+ public boolean apply() {
return checkMetrics(3, 3, 1);
}
}, 10000));
@@ -788,7 +789,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
G.ignite("server-0").compute().broadcast(F.noop());
assertTrue(GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
+ @Override
+ public boolean apply() {
return checkMetrics(3, 3, 2);
}
}, 10000));
@@ -886,7 +888,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
try {
startClientNodes(1);
- assertEquals(G.ignite("server-0").cluster().localNode().id(), ((TcpDiscoveryNode)G.ignite("client-0")
+ assertEquals(G.ignite("server-0").cluster().localNode().id(), ((TcpDiscoveryNode) G.ignite("client-0")
.cluster().localNode()).clientRouterNodeId());
checkNodes(2, 1);
@@ -1193,7 +1195,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
clientsPerSrv = CLIENTS;
GridTestUtils.runMultiThreaded(new Callable<Void>() {
- @Override public Void call() throws Exception {
+ @Override
+ public Void call() throws Exception {
Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
clientNodeIds.add(g.cluster().localNode().id());
@@ -1206,6 +1209,129 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testReconnectAfterFail() throws Exception {
+ reconnectAfterFail(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectAfterFailTopologyChanged() throws Exception {
+ reconnectAfterFail(true);
+ }
+
+ /**
+ * @param changeTop If {@code true} topology is changed after client disconnects.
+ * @throws Exception If failed.
+ */
+ private void reconnectAfterFail(final boolean changeTop) throws Exception {
+ startServerNodes(1);
+
+ startClientNodes(1);
+
+ Ignite srv = G.ignite("server-0");
+
+ TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+
+ Ignite client = G.ignite("client-0");
+
+ final ClusterNode clientNode = client.cluster().localNode();
+
+ final UUID clientId = clientNode.id();
+
+ final TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi());
+
+ assertEquals(2L, clientNode.order());
+
+ final CountDownLatch failLatch = new CountDownLatch(1);
+
+ final CountDownLatch joinLatch = new CountDownLatch(1);
+
+ srv.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Server event: " + evt);
+
+ DiscoveryEvent evt0 = (DiscoveryEvent)evt;
+
+ if (evt0.eventNode().id().equals(clientId) && (evt.type() == EVT_NODE_FAILED)) {
+ if (evt.type() == EVT_NODE_FAILED)
+ failLatch.countDown();
+ }
+ else if (evt.type() == EVT_NODE_JOINED) {
+ TcpDiscoveryNode node = (TcpDiscoveryNode)evt0.eventNode();
+
+ if ("client-0".equals(node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME))) {
+ assertEquals(changeTop ? 5L : 4L, node.order());
+
+ joinLatch.countDown();
+ }
+ }
+
+ return true;
+ }
+ }, EVT_NODE_FAILED, EVT_NODE_JOINED);
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override
+ public boolean apply(Event evt) {
+ info("Client event: " + evt);
+
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ assertEquals(1, reconnectLatch.getCount());
+
+ disconnectLatch.countDown();
+
+ if (changeTop)
+ clientSpi.pauseAll();
+ } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ assertEquals(0, disconnectLatch.getCount());
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ if (changeTop) {
+ Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+
+ srvNodeIds.add(g.cluster().localNode().id());
+
+ clientSpi.resumeAll();
+ }
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ assertTrue(failLatch.await(5000, MILLISECONDS));
+ assertTrue(joinLatch.await(5000, MILLISECONDS));
+
+ long topVer = changeTop ? 5L : 4L;
+
+ assertEquals(topVer, client.cluster().localNode().order());
+
+ assertEquals(topVer, client.cluster().topologyVersion());
+
+ Collection<ClusterNode> clientTop = client.cluster().topology(topVer);
+
+ assertEquals(changeTop ? 3 : 2, clientTop.size());
+
+ clientNodeIds.remove(clientId);
+
+ clientNodeIds.add(client.cluster().localNode().id());
+
+ checkNodes(changeTop ? 2 : 1, 1);
+ }
+
+ /**
* @param clientIdx Client index.
* @param srvIdx Server index.
* @throws Exception In case of error.
@@ -1401,7 +1527,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
private void checkRemoteNodes(Ignite ignite, int expCnt) {
Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes();
- assertEquals(expCnt, nodes.size());
+ assertEquals("Unexpected state for node: " + ignite.name(), expCnt, nodes.size());
for (ClusterNode node : nodes) {
UUID id = node.id();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
new file mode 100644
index 0000000..7533a2c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.*;
+import org.apache.ignite.internal.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectTestSuite extends TestSuite {
+ /**
+ * @return Test suite.
+ * @throws Exception In case of error.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("Ignite Client Reconnect Test Suite");
+
+ suite.addTestSuite(IgniteClientReconnectApiBlockTest.class);
+ suite.addTestSuite(IgniteClientReconnectDiscoveryStateTest.class);
+ suite.addTestSuite(IgniteClientReconnectCacheTest.class);
+
+ return suite;
+ }
+}
[2/2] incubator-ignite git commit: # ignite-901 client reconnect WIP
Posted by sb...@apache.org.
# ignite-901 client reconnect WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f5f3efd1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f5f3efd1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f5f3efd1
Branch: refs/heads/ignite-901
Commit: f5f3efd164ae0b67917bbbbf1b856b2d24e72217
Parents: 6e23608
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jun 29 16:01:17 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 1 18:10:57 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/GridComponent.java | 10 +
.../ignite/internal/GridKernalContextImpl.java | 12 +-
.../ignite/internal/GridKernalGateway.java | 43 +-
.../ignite/internal/GridKernalGatewayImpl.java | 115 ++--
.../apache/ignite/internal/GridKernalState.java | 3 +
.../ignite/internal/GridPluginComponent.java | 10 +
.../IgniteDisconnectedCheckedException.java | 32 ++
.../apache/ignite/internal/IgniteKernal.java | 103 +++-
.../ignite/internal/MarshallerContextImpl.java | 11 +-
.../internal/managers/GridManagerAdapter.java | 10 +
.../deployment/GridDeploymentManager.java | 88 ++-
.../discovery/GridDiscoveryManager.java | 31 +-
.../processors/GridProcessorAdapter.java | 10 +
.../processors/cache/GridCacheAdapter.java | 284 ++++++----
.../cache/GridCacheDeploymentManager.java | 5 +
.../processors/cache/GridCacheGateway.java | 128 ++++-
.../processors/cache/GridCacheIoManager.java | 9 +-
.../processors/cache/GridCacheMvccManager.java | 10 +-
.../GridCachePartitionExchangeManager.java | 29 +-
.../processors/cache/GridCacheProcessor.java | 50 +-
.../cache/GridCacheSharedContext.java | 55 +-
.../cache/GridCacheSharedManager.java | 7 +-
.../cache/GridCacheSharedManagerAdapter.java | 14 +-
.../GridDhtPartitionsExchangeFuture.java | 4 +
.../dht/preloader/GridDhtPreloader.java | 7 -
.../distributed/near/GridNearCacheAdapter.java | 5 +
.../processors/task/GridTaskProcessor.java | 2 +-
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 9 +-
.../communication/tcp/TcpCommunicationSpi.java | 56 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 169 ++++--
.../ignite/spi/discovery/tcp/ServerImpl.java | 4 +-
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 2 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +-
.../tcp/internal/TcpDiscoveryNode.java | 17 +
.../spi/swapspace/file/FileSwapSpaceSpi.java | 2 +-
.../internal/GridUpdateNotifierSelfTest.java | 14 +-
.../IgniteClientReconnectAbstractTest.java | 143 +++++
.../IgniteClientReconnectApiBlockTest.java | 157 ++++++
.../IgniteClientReconnectCacheTest.java | 562 +++++++++++++++++++
...IgniteClientReconnectDiscoveryStateTest.java | 110 ++++
.../GridCacheReplicatedInvalidateSelfTest.java | 3 +-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 148 ++++-
.../IgniteClientReconnectTestSuite.java | 40 ++
43 files changed, 2162 insertions(+), 363 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index fb227cd..5b3b0c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -116,4 +116,14 @@ public interface GridComponent {
* @return Unique component type for discovery data exchange.
*/
@Nullable public DiscoveryDataExchangeType discoveryDataType();
+
+ /**
+ *
+ */
+ public void onDisconnected() throws IgniteCheckedException;
+
+ /**
+ *
+ */
+ public void onReconnected() throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 65107a7..581c891 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal;
import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.managers.checkpoint.*;
import org.apache.ignite.internal.managers.collision.*;
@@ -501,9 +502,18 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
return ((IgniteKernal)grid).isStopping();
}
+ /** */
+ private ClusterNode locNode;
+
/** {@inheritDoc} */
@Override public UUID localNodeId() {
- return cfg.getNodeId();
+ if (locNode != null)
+ return locNode.id();
+
+ if (discoMgr != null)
+ locNode = discoMgr.localNode();
+
+ return locNode != null ? locNode.id() : config().getNodeId();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
index 0156136..20d81de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*;
/**
@@ -39,22 +40,6 @@ import org.apache.ignite.internal.util.tostring.*;
@GridToStringExclude
public interface GridKernalGateway {
/**
- * Performs light-weight check on the kernal state at the moment of this call.
- * <p>
- * This method should only be used when the kernal state should be checked just once
- * at the beginning of the method and the fact that <b>kernal state can change in the middle
- * of such method's execution</b> should not matter.
- * <p>
- * For example, when a method returns a constant value its implementation doesn't depend
- * on the kernal being valid throughout its execution. In such case it is enough to check
- * the kernal's state just once at the beginning of this method to provide consistent behavior
- * of the API without incurring overhead of <code>lock-based</code> guard methods.
- *
- * @throws IllegalStateException Thrown in case when no kernal calls are allowed.
- */
- public void lightCheck() throws IllegalStateException;
-
- /**
* Should be called on entering every kernal related call
* <b>originated directly or indirectly via public API</b>.
* <p>
@@ -113,31 +98,27 @@ public interface GridKernalGateway {
public void writeUnlock();
/**
- * Adds stop listener. Note that the identity set will be used to store listeners for
- * performance reasons. Futures can register a listener to be notified when they need to
- * be internally interrupted.
+ * Gets user stack trace through the first call of grid public API.
*
- * @param lsnr Listener to add.
+ * @return User stack trace.
*/
- public void addStopListener(Runnable lsnr);
+ public String userStackTrace();
/**
- * Removes previously added stop listener.
- *
- * @param lsnr Listener to remove.
+ * @param timeout Timeout.
+ * @return {@code True} if write lock has been acquired.
+ * @throws InterruptedException If interrupted.
*/
- public void removeStopListener(Runnable lsnr);
+ public boolean tryWriteLock(long timeout) throws InterruptedException;
/**
- * Gets user stack trace through the first call of grid public API.
+ * Disconnected callback.
*/
- public String userStackTrace();
+ public void onDisconnected();
/**
- * @param timeout Timeout.
- * @return {@code True} if write lock has been acquired.
- * @throws InterruptedException If interrupted.
+ * Reconnected callback.
*/
- public boolean tryWriteLock(long timeout) throws InterruptedException;
+ public void onReconnected();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
index 35bbbed..ef894cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
@@ -17,12 +17,13 @@
package org.apache.ignite.internal;
+import org.apache.ignite.*;
import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import java.io.*;
-import java.util.*;
import java.util.concurrent.*;
/**
@@ -38,10 +39,6 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
private final GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock();
/** */
- @GridToStringExclude
- private final Collection<Runnable> lsnrs = new GridSetWrapper<>(new IdentityHashMap<Runnable, Object>());
-
- /** */
private volatile GridKernalState state = GridKernalState.STOPPED;
/** */
@@ -63,12 +60,6 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
}
/** {@inheritDoc} */
- @Override public void lightCheck() throws IllegalStateException {
- if (state != GridKernalState.STARTED)
- throw illegalState();
- }
-
- /** {@inheritDoc} */
@SuppressWarnings({"LockAcquiredButNotSafelyReleased", "BusyWait"})
@Override public void readLock() throws IllegalStateException {
if (stackTrace == null)
@@ -76,12 +67,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
rwLock.readLock();
- if (state != GridKernalState.STARTED) {
- // Unlock just acquired lock.
- rwLock.readUnlock();
-
- throw illegalState();
- }
+ checkState(true);
}
/** {@inheritDoc} */
@@ -90,6 +76,8 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
stackTrace = stackTrace();
rwLock.readLock();
+
+ checkState(false);
}
/** {@inheritDoc} */
@@ -137,6 +125,36 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
return false;
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected() {
+ rwLock.readLock();
+
+ try {
+ if (state == GridKernalState.STARTED)
+ state = GridKernalState.DISCONNECTED;
+ }
+ finally {
+ rwLock.readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected() {
+ rwLock.writeLock();
+
+ try {
+ if (state == GridKernalState.DISCONNECTED)
+ state = GridKernalState.STARTED;
+ }
+ finally {
+ rwLock.writeUnlock();
+ }
+
+ synchronized (this) {
+ notifyAll();
+ }
+ }
+
/**
* Retrieves user stack trace.
*
@@ -173,16 +191,8 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
// NOTE: this method should always be called within write lock.
this.state = state;
- if (state == GridKernalState.STOPPING) {
- Runnable[] runs;
-
- synchronized (lsnrs) {
- lsnrs.toArray(runs = new Runnable[lsnrs.size()]);
- }
-
- // In the same thread.
- for (Runnable r : runs)
- r.run();
+ synchronized (this) {
+ notifyAll();
}
}
@@ -192,33 +202,42 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
}
/** {@inheritDoc} */
- @Override public void addStopListener(Runnable lsnr) {
- assert lsnr != null;
-
- if (state == GridKernalState.STARTING || state == GridKernalState.STARTED)
- synchronized (lsnrs) {
- lsnrs.add(lsnr);
- }
- else
- // Call right away in the same thread.
- lsnr.run();
+ @Override public String userStackTrace() {
+ return stackTrace;
}
- /** {@inheritDoc} */
- @Override public void removeStopListener(Runnable lsnr) {
- assert lsnr != null;
-
- synchronized (lsnrs) {
- lsnrs.remove(lsnr);
+ /**
+ * @param err If {@code true} throws {@link IllegalStateException} if not started.
+ */
+ private void checkState(boolean err) {
+ if (state != GridKernalState.STARTED) {
+ do {
+ if (state == GridKernalState.DISCONNECTED) {
+ rwLock.readUnlock();
+
+ try {
+ synchronized (this) {
+ while (state == GridKernalState.DISCONNECTED)
+ this.wait();
+ }
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException(e);
+ }
+
+ rwLock.readLock();
+ }
+ else if (err) {
+ rwLock.readUnlock();
+
+ throw illegalState();
+ }
+ }
+ while (state != GridKernalState.STARTED);
}
}
/** {@inheritDoc} */
- @Override public String userStackTrace() {
- return stackTrace;
- }
-
- /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridKernalGatewayImpl.class, this);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java
index fbb8f45..7d63578 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java
@@ -32,6 +32,9 @@ public enum GridKernalState {
/** Kernal is stopping. */
STOPPING,
+ /** Kernal is disconnected. */
+ DISCONNECTED,
+
/** Kernal is stopped.
* <p>
* This is also the initial state of the kernal.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
index b438bc1..709ce3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
@@ -64,6 +64,16 @@ public class GridPluginComponent implements GridComponent {
}
/** {@inheritDoc} */
+ @Override public void onDisconnected() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
plugin.onIgniteStop(cancel);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java
new file mode 100644
index 0000000..0684356
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+
+/**
+ *
+ */
+public class IgniteDisconnectedCheckedException extends IgniteCheckedException {
+ /**
+ * @param msg Message.
+ */
+ public IgniteDisconnectedCheckedException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index e19d3d3..821a1f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.session.*;
import org.apache.ignite.internal.processors.task.*;
import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -439,7 +440,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
assert cfg != null;
return F.transform(cfg.getUserAttributes().entrySet(), new C1<Map.Entry<String, ?>, String>() {
- @Override public String apply(Map.Entry<String, ?> e) {
+ @Override
+ public String apply(Map.Entry<String, ?> e) {
return e.getKey() + ", " + e.getValue().toString();
}
});
@@ -2800,6 +2802,105 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/**
+ *
+ */
+ public void disconnected() {
+ ctx.gateway().onDisconnected();
+
+ try {
+ for (GridComponent comp : ctx.components())
+ comp.onDisconnected();
+ }
+ catch (IgniteCheckedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void stopOnDisconnect() {
+ GridCacheProcessor cacheProcessor = ctx.cache();
+
+ List<GridComponent> comps = ctx.components();
+
+ // Callback component in reverse order while kernal is still functional
+ // if called in the same thread, at least.
+ for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) {
+ GridComponent comp = it.previous();
+
+ try {
+ if (!skipDaemon(comp) && (!(comp instanceof GridManager)))
+ comp.onKernalStop(true);
+ }
+ catch (Throwable e) {
+ errOnStop = true;
+
+ U.error(log, "Failed to pre-stop processor: " + comp, e);
+
+ if (e instanceof Error)
+ throw e;
+ }
+ }
+
+ if (cacheProcessor != null)
+ cacheProcessor.cancelUserOperations();
+
+ for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) {
+ GridComponent comp = it.previous();
+
+ try {
+ if (!skipDaemon(comp) && (!(comp instanceof GridManager))) {
+ comp.stop(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Component stopped: " + comp);
+ }
+ }
+ catch (Throwable e) {
+ errOnStop = true;
+
+ U.error(log, "Failed to stop component (ignoring): " + comp, e);
+
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ }
+
+ ctx.marshallerContext().onDisconnected();
+ }
+
+ private void restart() throws IgniteCheckedException {
+ List<PluginProvider> plugins = U.allPluginProviders();
+
+ startProcessor(new ClusterProcessor(ctx));
+
+ GridResourceProcessor rsrcProc = new GridResourceProcessor(ctx);
+
+ rsrcProc.setSpringContext(rsrcCtx);
+
+ scheduler = new IgniteSchedulerImpl(ctx);
+
+ startProcessor(rsrcProc);
+ }
+
+ /**
+ *
+ */
+ public void reconnected() {
+ new Thread() {
+ public void run() {
+ try {
+ ctx.gateway().onReconnected();
+
+ for (GridComponent comp : ctx.components())
+ comp.onReconnected();
+ }
+ catch (IgniteCheckedException e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+ }
+
+ /**
* Creates optional component.
*
* @param cls Component interface.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 9f7c983..948babc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -32,7 +32,7 @@ import java.util.concurrent.*;
*/
public class MarshallerContextImpl extends MarshallerContextAdapter {
/** */
- private final CountDownLatch latch = new CountDownLatch(1);
+ private CountDownLatch latch = new CountDownLatch(1);
/** */
private final File workDir;
@@ -57,6 +57,15 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
}
/**
+ *
+ */
+ public void onDisconnected() {
+ latch = new CountDownLatch(1);
+
+ cache = null;
+ }
+
+ /**
* @param ctx Kernal context.
* @throws IgniteCheckedException In case of error.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 40a5ea5..d886f4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -166,6 +166,16 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
// No-op.
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected() throws IgniteCheckedException {
+ // No-op.
+ }
+
/**
* Starts wrapped SPI.
*
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
index 75fe98f..b82090f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
@@ -110,20 +110,24 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
}
/** {@inheritDoc} */
- @Override public void stop(boolean cancel) throws IgniteCheckedException {
- GridProtocolHandler.deregisterDeploymentManager();
+ @Override public void onDisconnected() throws IgniteCheckedException {
+ storesOnKernalStop();
- if (verStore != null)
- verStore.stop();
+ storesStop();
- if (ldrStore != null)
- ldrStore.stop();
+ startStores();
+ }
- if (locStore != null)
- locStore.stop();
+ /** {@inheritDoc} */
+ @Override public void onReconnected() throws IgniteCheckedException {
+ storesOnKernalStart();
+ }
- if (comm != null)
- comm.stop();
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ GridProtocolHandler.deregisterDeploymentManager();
+
+ storesStop();
getSpi().setListener(null);
@@ -135,21 +139,12 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
/** {@inheritDoc} */
@Override public void onKernalStart0() throws IgniteCheckedException {
- locStore.onKernalStart();
- ldrStore.onKernalStart();
- verStore.onKernalStart();
+ storesOnKernalStart();
}
/** {@inheritDoc} */
@Override public void onKernalStop0(boolean cancel) {
- if (verStore != null)
- verStore.onKernalStop();
-
- if (ldrStore != null)
- ldrStore.onKernalStop();
-
- if (locStore != null)
- locStore.onKernalStop();
+ storesOnKernalStop();
}
/** {@inheritDoc} */
@@ -547,6 +542,57 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
return ldr instanceof GridDeploymentClassLoader;
}
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void startStores() throws IgniteCheckedException {
+ locStore = new GridDeploymentLocalStore(getSpi(), ctx, comm);
+ ldrStore = new GridDeploymentPerLoaderStore(getSpi(), ctx, comm);
+ verStore = new GridDeploymentPerVersionStore(getSpi(), ctx, comm);
+
+ locStore.start();
+ ldrStore.start();
+ verStore.start();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void storesOnKernalStart() throws IgniteCheckedException {
+ locStore.onKernalStart();
+ ldrStore.onKernalStart();
+ verStore.onKernalStart();
+ }
+
+ /**
+ *
+ */
+ private void storesOnKernalStop() {
+ if (verStore != null)
+ verStore.onKernalStop();
+
+ if (ldrStore != null)
+ ldrStore.onKernalStop();
+
+ if (locStore != null)
+ locStore.onKernalStop();
+ }
+
+ /**
+ *
+ */
+ private void storesStop() {
+ if (verStore != null)
+ verStore.stop();
+
+ if (ldrStore != null)
+ ldrStore.stop();
+
+ if (locStore != null)
+ locStore.stop();
+ }
+
/**
*
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 1e4b972..7a524a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -287,6 +287,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/** {@inheritDoc} */
+ @Override public void onDisconnected() throws IgniteCheckedException {
+ locJoinEvt = new GridFutureAdapter<>();
+ }
+
+ /** {@inheritDoc} */
@Override protected void onKernalStart0() throws IgniteCheckedException {
if (Boolean.TRUE.equals(ctx.config().isClientMode()) && !getSpi().isClientMode())
ctx.performance().add("Enable client mode for TcpDiscoverySpi " +
@@ -386,7 +391,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
verChanged = false;
}
else {
- if (type != EVT_NODE_SEGMENTED) {
+ if (type != EVT_NODE_SEGMENTED &&
+ type != EVT_CLIENT_NODE_DISCONNECTED &&
+ type != EVT_CLIENT_NODE_RECONNECTED) {
minorTopVer = 0;
verChanged = true;
@@ -1693,6 +1700,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
else if (type == EVT_NODE_SEGMENTED)
evt.message("Node segmented: " + node);
+ else if (type == EVT_CLIENT_NODE_DISCONNECTED)
+ evt.message("Client node disconnected: " + node);
+
+ else if (type == EVT_CLIENT_NODE_RECONNECTED)
+ evt.message("Client node reconnected: " + node);
+
else
assert false;
@@ -1862,6 +1875,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
break;
}
+ case EVT_CLIENT_NODE_DISCONNECTED: {
+ assert localNode().isClient() : evt;
+
+ ((IgniteKernal)ctx.grid()).disconnected();
+
+ break;
+ }
+
+ case EVT_CLIENT_NODE_RECONNECTED: {
+ assert localNode().isClient() : evt;
+
+ ((IgniteKernal)ctx.grid()).reconnected();
+
+ break;
+ }
+
case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
if (ctx.event().isRecordable(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT)) {
DiscoveryCustomEvent customEvt = new DiscoveryCustomEvent();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
index a84c48a..04a39d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
@@ -62,6 +62,16 @@ public abstract class GridProcessorAdapter implements GridProcessor {
}
/** {@inheritDoc} */
+ @Override public void onDisconnected() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 7335d72..d754e3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -906,12 +906,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public Set<K> keySet() {
- return keySet((CacheEntryPredicate[])null);
+ return keySet((CacheEntryPredicate[]) null);
}
/** {@inheritDoc} */
@Override public Set<K> keySetx() {
- return keySetx((CacheEntryPredicate[])null);
+ return keySetx((CacheEntryPredicate[]) null);
}
/** {@inheritDoc} */
@@ -1217,7 +1217,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null,
taskName, true, false).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
- @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
+ @Override
+ public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
return e.get().get(key);
}
});
@@ -1259,11 +1260,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
String taskName,
final IgniteBiInClosure<KeyCacheObject, Object> vis) {
return ctx.closures().callLocalSafe(new GPC<Object>() {
- @Nullable @Override public Object call() {
+ @Nullable
+ @Override
+ public Object call() {
try {
ctx.store().loadAll(tx, keys, vis);
- }
- catch (IgniteCheckedException e) {
+ } catch (IgniteCheckedException e) {
throw new GridClosureException(e);
}
@@ -1465,8 +1467,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (ctx.config().getInterceptor() != null)
fut = fut.chain(new CX1<IgniteInternalFuture<V>, V>() {
- @Override public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException {
- return (V)ctx.config().getInterceptor().onGet(key, f.get());
+ @Override
+ public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException {
+ return (V) ctx.config().getInterceptor().onGet(key, f.get());
}
});
@@ -1978,12 +1981,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return asyncOp(new AsyncOp<V>() {
- @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, filter)
- .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
+ .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putAsync [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']';
}
});
@@ -2041,11 +2046,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
syncOp(new SyncInOp(drMap.size() == 1) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.putAllDrAsync(ctx, drMap).get();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putAllConflict [drMap=" + drMap + ']';
}
});
@@ -2060,11 +2067,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
return asyncOp(new AsyncInOp(drMap.keySet()) {
- @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
return tx.putAllDrAsync(ctx, drMap);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putAllConflictAsync [drMap=" + drMap + ']';
}
});
@@ -2081,7 +2090,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
- @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx)
+ @Nullable
+ @Override
+ public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx)
throws IgniteCheckedException {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor);
@@ -2113,11 +2124,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) {
- @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+ @Nullable
+ @Override
+ public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
throws IgniteCheckedException {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys,
new C1<K, EntryProcessor<K, V, Object>>() {
- @Override public EntryProcessor apply(K k) {
+ @Override
+ public EntryProcessor apply(K k) {
return entryProcessor;
}
});
@@ -2158,7 +2172,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<GridCacheReturn> fut0 = (IgniteInternalFuture<GridCacheReturn>)fut;
return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, EntryProcessorResult<T>>() {
- @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<GridCacheReturn> fut)
+ @Override
+ public EntryProcessorResult<T> applyx(IgniteInternalFuture<GridCacheReturn> fut)
throws IgniteCheckedException {
GridCacheReturn ret = fut.get();
@@ -2188,7 +2203,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp(keys) {
@Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() {
- @Override public EntryProcessor apply(K k) {
+ @Override
+ public EntryProcessor apply(K k) {
return entryProcessor;
}
});
@@ -2205,7 +2221,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
(IgniteInternalFuture<GridCacheReturn>)fut;
return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Map<K, EntryProcessorResult<T>>>() {
- @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
+ @Override
+ public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
throws IgniteCheckedException {
GridCacheReturn ret = fut.get();
@@ -2238,7 +2255,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<GridCacheReturn> fut0 = (IgniteInternalFuture<GridCacheReturn>)fut;
return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Map<K, EntryProcessorResult<T>>>() {
- @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
+ @Override
+ public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
throws IgniteCheckedException {
GridCacheReturn ret = fut.get();
@@ -2259,10 +2277,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(map.keySet());
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(map.size() == 1) {
- @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+ @Nullable
+ @Override
+ public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
throws IgniteCheckedException {
IgniteInternalFuture<GridCacheReturn> fut =
- tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args);
+ tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>) map, args);
return fut.get().value();
}
@@ -2310,12 +2330,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, filter).chain(
(IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putxAsync [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']';
}
});
@@ -2345,11 +2367,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return syncOp(new SyncOp<V>(true) {
- @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- return (V)tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value();
+ @Override
+ public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ return (V) tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putIfAbsent [key=" + key + ", val=" + val + ']';
}
});
@@ -2369,12 +2393,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
- @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray())
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putIfAbsentAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -2399,11 +2425,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
Boolean stored = syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).get().success();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putxIfAbsent [key=" + key + ", val=" + val + ']';
}
});
@@ -2428,12 +2456,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
+ (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putxIfAbsentAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -2504,11 +2534,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).get().success();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "replacex [key=" + key + ", val=" + val + ']';
}
});
@@ -2524,12 +2556,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain(
(IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "replacexAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -2547,7 +2581,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(newVal);
return syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
@@ -2556,7 +2591,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
.success();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "replace [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']';
}
});
@@ -2619,11 +2655,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValues(m.values());
syncOp(new SyncInOp(m.size() == 1) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.putAllAsync(ctx, m, false, null, -1, CU.empty0()).get();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putAll [map=" + m + ']';
}
});
@@ -2665,16 +2703,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
V prevVal = syncOp(new SyncOp<V>(true) {
- @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
V ret = tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, CU.empty0()).get().value();
if (ctx.config().getInterceptor() != null)
- return (V)ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2();
+ return (V) ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2();
return ret;
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "remove [key=" + key + ']';
}
});
@@ -2697,13 +2737,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
- @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
// TODO should we invoke interceptor here?
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, CU.empty0())
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAsync [key=" + key + ']';
}
});
@@ -2745,11 +2787,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
syncOp(new SyncInOp(keys.size() == 1) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.removeAllAsync(ctx, keys, null, false, CU.empty0()).get();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAll [keys=" + keys + ']';
}
});
@@ -2771,11 +2815,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
IgniteInternalFuture<Object> fut = asyncOp(new AsyncInOp(keys) {
- @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
return tx.removeAllAsync(ctx, keys, null, false, CU.empty0()).chain(RET2NULL);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAllAsync [keys=" + keys + ']';
}
});
@@ -2798,11 +2844,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
boolean rmv = syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, CU.empty0()).get().success();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removex [key=" + key + ']';
}
});
@@ -2836,12 +2884,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, filter).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
+ (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAsync [key=" + key + ", filter=" + Arrays.toString(filter) + ']';
}
});
@@ -2860,19 +2910,21 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<GridCacheReturn>(true) {
- @Override public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(val);
- return (GridCacheReturn)tx.removeAllAsync(ctx,
+ return (GridCacheReturn) tx.removeAllAsync(ctx,
Collections.singletonList(key),
null,
true,
ctx.equalsValArray(val)).get();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "remove [key=" + key + ", val=" + val + ']';
}
});
@@ -2887,11 +2939,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
syncOp(new SyncInOp(false) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.removeAllDrAsync(ctx, drMap).get();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAllConflict [drMap=" + drMap + ']';
}
});
@@ -2906,11 +2960,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
return asyncOp(new AsyncInOp(drMap.keySet()) {
- @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
return tx.removeAllDrAsync(ctx, drMap);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAllDrASync [drMap=" + drMap + ']';
}
});
@@ -2926,20 +2982,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<GridCacheReturn>(true) {
- @Override public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
return (GridCacheReturn) tx.putAllAsync(ctx,
- F.t(key, newVal),
- true,
- null,
- -1,
- ctx.equalsValArray(oldVal)).get();
+ F.t(key, newVal),
+ true,
+ null,
+ -1,
+ ctx.equalsValArray(oldVal)).get();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "replace [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']';
}
});
@@ -2953,17 +3011,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return asyncOp(new AsyncOp<GridCacheReturn>() {
- @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
// Register before hiding in the filter.
try {
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(val);
- }
- catch (IgniteCheckedException e) {
+ } catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
- IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture)tx.removeAllAsync(ctx,
+ IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture) tx.removeAllAsync(ctx,
Collections.singletonList(key),
null,
true,
@@ -2972,7 +3030,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return fut;
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -2987,17 +3046,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return asyncOp(new AsyncOp<GridCacheReturn>() {
- @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
// Register before hiding in the filter.
try {
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
- }
- catch (IgniteCheckedException e) {
+ } catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
- IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture)tx.putAllAsync(ctx,
+ IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture) tx.putAllAsync(ctx,
F.t(key, newVal),
true,
null,
@@ -3007,7 +3066,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return fut;
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "replaceAsync [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']';
}
});
@@ -3027,16 +3087,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
boolean rmv = syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(val);
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false,
- ctx.equalsValArray(val)).get().success();
+ ctx.equalsValArray(val)).get().success();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "remove [key=" + key + ", val=" + val + ']';
}
});
@@ -3061,23 +3123,24 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
// Register before hiding in the filter.
if (ctx.deploymentEnabled()) {
try {
ctx.deploy().registerClass(val);
- }
- catch (IgniteCheckedException e) {
+ } catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
}
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false,
ctx.equalsValArray(val)).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
+ (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -3247,10 +3310,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
TransactionConfiguration cfg = ctx.gridConfig().getTransactionConfiguration();
return txStart(
- concurrency,
- isolation,
- cfg.getDefaultTxTimeout(),
- 0
+ concurrency,
+ isolation,
+ cfg.getDefaultTxTimeout(),
+ 0
);
}
@@ -3686,19 +3749,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return F.iterator(iterator(),
new IgniteClosure<Cache.Entry<K, V>, Cache.Entry<K, V>>() {
private IgniteCacheExpiryPolicy expiryPlc =
- ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
+ ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
- @Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) {
+ @Override
+ public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) {
CacheOperationContext prev = ctx.gate().enter(opCtx);
try {
V val = localPeek(lazyEntry.getKey(), CachePeekModes.ONHEAP_ONLY, expiryPlc);
return new CacheEntryImpl<>(lazyEntry.getKey(), val);
- }
- catch (IgniteCheckedException e) {
+ } catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
- }
- finally {
+ } finally {
ctx.gate().leave(prev);
}
}
@@ -3722,20 +3784,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
.execute();
return ctx.itHolder().iterator(fut, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() {
- @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) {
+ @Override
+ protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) {
return new CacheEntryImpl<>(e.getKey(), e.getValue());
}
- @Override protected void remove(Cache.Entry<K, V> item) {
+ @Override
+ protected void remove(Cache.Entry<K, V> item) {
CacheOperationContext prev = ctx.gate().enter(opCtx);
try {
GridCacheAdapter.this.remove(item.getKey());
- }
- catch (IgniteCheckedException e) {
+ } catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
- }
- finally {
+ } finally {
ctx.gate().leave(prev);
}
}
@@ -4380,7 +4442,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return getAllAsync(Collections.singletonList(key), deserializePortable).chain(
new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
- @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
+ @Override
+ public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
Map<K, V> map = e.get();
assert map.isEmpty() || map.size() == 1 : map.size();
@@ -4428,6 +4491,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
public abstract void onDeferredDelete(GridCacheEntryEx entry, GridCacheVersion ver);
/**
+ *
+ */
+ public void disconnected() {
+ // No-op.
+ }
+
+ /**
* Validates that given cache value implements {@link Externalizable}.
*
* @param val Cache value.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
index ff109ed..475a6e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
@@ -116,6 +116,11 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
}
/** {@inheritDoc} */
+ @Override public boolean restartOnDisconnect() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override protected void stop0(boolean cancel) {
if (discoLsnr != null)
cctx.gridEvents().removeLocalEventListener(discoLsnr);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index d9d151c..d63e818 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -33,7 +33,7 @@ public class GridCacheGateway<K, V> {
private final GridCacheContext<K, V> ctx;
/** Stopped flag for dynamic caches. */
- private volatile boolean stopped;
+ private volatile State state = State.STARTED;
/** */
private GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock();
@@ -56,11 +56,46 @@ public class GridCacheGateway<K, V> {
rwLock.readLock();
- if (stopped) {
- rwLock.readUnlock();
+ checkState(true, true);
+ }
- throw new IllegalStateException("Dynamic cache has been stopped: " + ctx.name());
+ /**
+ *
+ */
+ private boolean checkState(boolean lock, boolean err) {
+ if (state != State.STARTED) {
+ do {
+ if (state == State.STOPPED) {
+ if (lock)
+ rwLock.readUnlock();
+
+ if (err)
+ throw new IllegalStateException("Cache has been stopped: " + ctx.name());
+ else
+ return false;
+ }
+ else {
+ if (lock)
+ rwLock.readUnlock();
+
+ try {
+ synchronized (this) {
+ while (state == State.DISCONNECTED)
+ wait();
+ }
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException(e);
+ }
+
+ if (lock)
+ rwLock.readLock();
+ }
+ }
+ while (state != State.STARTED);
}
+
+ return true;
}
/**
@@ -71,17 +106,11 @@ public class GridCacheGateway<K, V> {
public boolean enterIfNotClosed() {
onEnter();
- // Must unlock in case of unexpected errors to avoid
- // deadlocks during kernal stop.
+ // Must unlock in case of unexpected errors to avoid deadlocks during kernal stop.
rwLock.readLock();
- if (stopped) {
- rwLock.readUnlock();
+ return checkState(true, false);
- return false;
- }
-
- return true;
}
/**
@@ -92,7 +121,7 @@ public class GridCacheGateway<K, V> {
public boolean enterIfNotClosedNoLock() {
onEnter();
- return !stopped;
+ return checkState(false, false);
}
/**
@@ -144,11 +173,7 @@ public class GridCacheGateway<K, V> {
rwLock.readLock();
- if (stopped) {
- rwLock.readUnlock();
-
- throw new IllegalStateException("Cache has been stopped: " + ctx.name());
- }
+ checkState(true, true);
// Must unlock in case of unexpected errors to avoid
// deadlocks during kernal stop.
@@ -169,8 +194,7 @@ public class GridCacheGateway<K, V> {
@Nullable public CacheOperationContext enterNoLock(@Nullable CacheOperationContext opCtx) {
onEnter();
- if (stopped)
- throw new IllegalStateException("Cache has been stopped: " + ctx.name());
+ checkState(false, false);
return setOperationContextPerCall(opCtx);
}
@@ -229,8 +253,48 @@ public class GridCacheGateway<K, V> {
/**
*
*/
- public void block() {
- stopped = true;
+ public void stopped() {
+ state = State.STOPPED;
+
+ synchronized (this) {
+ notifyAll();
+ }
+ }
+
+ /**
+ *
+ */
+ public void onDisconnected() {
+ if (state == State.STARTED)
+ state = State.DISCONNECTED;
+ }
+
+ /**
+ *
+ */
+ public void waitOperations() {
+ rwLock.writeLock();
+
+ rwLock.writeUnlock();
+ }
+
+ /**
+ * @param stopped Cache stopped flag.
+ */
+ public void reconnected(boolean stopped) {
+ rwLock.writeLock();
+
+ try {
+ if (state == State.DISCONNECTED)
+ state = stopped ? State.STOPPED : State.STARTED;
+ }
+ finally {
+ rwLock.writeUnlock();
+ }
+
+ synchronized (this) {
+ notifyAll();
+ }
}
/**
@@ -257,10 +321,28 @@ public class GridCacheGateway<K, V> {
try {
// No-op.
- stopped = true;
+ state = State.STOPPED;
}
finally {
rwLock.writeUnlock();
}
+
+ synchronized (this) {
+ notifyAll();
+ }
+ }
+
+ /**
+ *
+ */
+ private enum State {
+ /** */
+ STARTED,
+
+ /** */
+ DISCONNECTED,
+
+ /** */
+ STOPPED
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 74a4512..48a16d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -170,7 +170,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
/** {@inheritDoc} */
@SuppressWarnings("BusyWait")
- @Override protected void onKernalStop0(boolean cancel) {
+ @Override protected void onKernalStop0(boolean cancel, boolean disconnected) {
cctx.gridIO().removeMessageListener(TOPIC_CACHE);
for (Object ordTopic : orderedHandlers.keySet())
@@ -891,6 +891,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param cacheId Cache ID to remove handlers for.
+ */
+ public void removeHandler(int cacheId, Class<? extends GridCacheMessage> type) {
+ clsHandlers.remove(new ListenerKey(cacheId, type));
+ }
+
+ /**
* @param msgCls Message class to check.
* @return Message index.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index c528e08..e2d22dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -216,7 +216,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/** {@inheritDoc} */
- @Override public void onKernalStop0(boolean cancel) {
+ @Override public void onKernalStop0(boolean cancel, boolean disconnected) {
cctx.gridEvents().removeLocalEventListener(discoLsnr);
}
@@ -293,9 +293,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
/**
* Cancels all client futures.
+ *
+ * @param stop If {@code true} node is stopping, otherwise disconnected.
*/
- public void cancelClientFutures() {
- IgniteCheckedException e = new IgniteCheckedException("Operation has been cancelled (grid is stopping).");
+ public void cancelClientFutures(boolean stop) {
+ IgniteCheckedException e = stop ?
+ new IgniteCheckedException("Operation has been cancelled (node is stopping).") :
+ new IgniteCheckedException("Operation has been cancelled (node disconnected).");
for (Collection<GridCacheFuture<?>> futures : futs.values()) {
for (GridCacheFuture<?> future : futures)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index af87685..f0c9b3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -207,6 +207,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
};
/** {@inheritDoc} */
+ @Override public boolean restartOnDisconnect() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
super.start0();
@@ -281,6 +286,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
break;
}
+ catch (IgniteDisconnectedCheckedException e) {
+ log.info("Disconnected while waiting for initial partition map exchange: " + e);
+
+ break;
+ }
catch (IgniteFutureTimeoutCheckedException ignored) {
if (first) {
U.warn(log, "Failed to wait for initial partition map exchange. " +
@@ -313,13 +323,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/** {@inheritDoc} */
- @Override protected void onKernalStop0(boolean cancel) {
+ @Override protected void onKernalStop0(boolean cancel, boolean disconnected) {
+ cctx.gridEvents().removeLocalEventListener(discoLsnr);
+
+ cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class);
+ cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class);
+ cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class);
+
+ IgniteCheckedException err = disconnected ?
+ new IgniteDisconnectedCheckedException("Node disconnected: " + cctx.gridName()) :
+ new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName());
+
// Finish all exchange futures.
for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
- f.onDone(new IgniteInterruptedCheckedException("Grid is stopping: " + cctx.gridName()));
+ f.onDone(err);
for (AffinityReadyFuture f : readyFuts.values())
- f.onDone(new IgniteInterruptedCheckedException("Grid is stopping: " + cctx.gridName()));
+ f.onDone(err);
U.cancel(exchWorker);
@@ -1099,6 +1119,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
catch (IgniteInterruptedCheckedException e) {
throw e;
}
+ catch (IgniteDisconnectedCheckedException e) {
+ return;
+ }
catch (IgniteCheckedException e) {
U.error(log, "Failed to wait for completion of partition map exchange " +
"(preloading will not start): " + exchFut, e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 2f7f22c..e11a221 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -341,8 +341,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
"Serializable transactions are disabled while default transaction isolation is SERIALIZABLE " +
"(most likely misconfiguration - either update 'isTxSerializableEnabled' or " +
"'defaultTxIsolationLevel' properties) for cache: " + U.maskName(cc.getName()),
- "Serializable transactions are disabled while default transaction isolation is SERIALIZABLE " +
- "for cache: " + U.maskName(cc.getName()));
+ "Serializable transactions are disabled while default transaction isolation is SERIALIZABLE " +
+ "for cache: " + U.maskName(cc.getName()));
if (cc.isWriteBehindEnabled()) {
if (cfgStore == null)
@@ -567,8 +567,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
- sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx,
- ctx.config().getCacheStoreSessionListenerFactories()));
+ sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(
+ ctx, ctx.config().getCacheStoreSessionListenerFactories()));
ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)",
!ctx.config().getTransactionConfiguration().isTxSerializableEnabled());
@@ -871,10 +871,42 @@ public class GridCacheProcessor extends GridProcessorAdapter {
it.hasPrevious();) {
GridCacheSharedManager<?, ?> mgr = it.previous();
- mgr.onKernalStop(cancel);
+ mgr.onKernalStop(cancel, false);
}
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected() throws IgniteCheckedException {
+ for (GridCacheAdapter cache : caches.values())
+ cache.context().gate().onDisconnected();
+
+ sharedCtx.mvcc().cancelClientFutures(false);
+
+ for (GridCacheAdapter cache : caches.values())
+ cache.disconnected();
+
+ registeredCaches.clear();
+
+ sharedCtx.onDisconnected();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected() throws IgniteCheckedException {
+ for (GridCacheAdapter cache : caches.values())
+ cache.context().gate().reconnected(false);
+
+ ctx.marshallerContext().onMarshallerCacheStarted(ctx);
+
+ marshallerCache().context().preloader().syncFuture().listen(new CIX1<IgniteInternalFuture<?>>() {
+ @Override public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException {
+ ctx.marshallerContext().onMarshallerCachePreloaded(ctx);
+ }
+ });
+
+ for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
+ mgr.onKernalStart();
+ }
+
/**
* @param cache Cache to start.
* @throws IgniteCheckedException If failed to start cache.
@@ -1487,7 +1519,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName()));
if (proxy != null)
- proxy.gate().block();
+ proxy.gate().stopped();
}
/**
@@ -1591,7 +1623,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Shared context.
*/
@SuppressWarnings("unchecked")
- private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx,
+ private GridCacheSharedContext createSharedContext(
+ GridKernalContext kernalCtx,
Collection<CacheStoreSessionListener> storeSesLsnrs) {
IgniteTxManager tm = new IgniteTxManager();
GridCacheMvccManager mvccMgr = new GridCacheMvccManager();
@@ -2796,8 +2829,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* Cancel all user operations.
*/
public void cancelUserOperations() {
- for (GridCacheAdapter<?, ?> cache : caches.values())
- cache.ctx.mvcc().cancelClientFutures();
+ sharedCtx.mvcc().cancelClientFutures(true);
}
/**