You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/09/07 22:46:50 UTC
[23/50] [abbrv] ignite git commit: # ignite-901 client reconnect
support
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 29e3551..84e4dc2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -893,6 +893,14 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param cacheId Cache ID to remove handlers for.
+ * @param type Message type.
+ */
+ public void removeHandler(int cacheId, Class<? extends GridCacheMessage> type) {
+ clsHandlers.remove(new ListenerKey(cacheId, type));
+ }
+
+ /**
* @param msgCls Message class to check.
* @return Message index.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManager.java
index 775daf5..ae7e9d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManager.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
/**
* Interface for cache managers.
@@ -49,6 +50,11 @@ public interface GridCacheManager<K, V> {
public void onKernalStop(boolean cancel);
/**
+ * @param reconnectFut Reconnect future.
+ */
+ public void onDisconnected(IgniteFuture<?> reconnectFut);
+
+ /**
* Prints memory statistics (sizes of internal data structures, etc.).
*
* NOTE: this method is for testing and profiling purposes only.
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
index 52fade8..54b1915 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import java.util.concurrent.atomic.*;
@@ -127,6 +128,11 @@ public class GridCacheManagerAdapter<K, V> implements GridCacheManager<K, V> {
}
/** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void printMemoryStats() {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index f24cf01..36e108f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -208,10 +208,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
exchLog = cctx.logger(getClass().getName() + ".exchange");
pendingExplicit = GridConcurrentFactory.newMap();
- }
- /** {@inheritDoc} */
- @Override public void onKernalStart0() throws IgniteCheckedException {
cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
}
@@ -295,15 +292,39 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* Cancels all client futures.
*/
public void cancelClientFutures() {
- IgniteCheckedException e = new IgniteCheckedException("Operation has been cancelled (grid is stopping).");
+ cancelClientFutures(new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture reconnectFut) {
+ IgniteClientDisconnectedCheckedException err = disconnectedError(reconnectFut);
+
+ cancelClientFutures(err);
+ }
+ /**
+ * @param err Error.
+ */
+ private void cancelClientFutures(IgniteCheckedException err) {
for (Collection<GridCacheFuture<?>> futures : futs.values()) {
for (GridCacheFuture<?> future : futures)
- ((GridFutureAdapter)future).onDone(e);
+ ((GridFutureAdapter)future).onDone(err);
}
for (GridCacheAtomicFuture<?> future : atomicFuts.values())
- ((GridFutureAdapter)future).onDone(e);
+ ((GridFutureAdapter)future).onDone(err);
+ }
+
+ /**
+ * @param reconnectFut Reconnect future.
+ * @return Client disconnected exception.
+ */
+ private IgniteClientDisconnectedCheckedException disconnectedError(@Nullable IgniteFuture<?> reconnectFut) {
+ if (reconnectFut == null)
+ reconnectFut = cctx.kernalContext().cluster().clientReconnectFuture();
+
+ return new IgniteClientDisconnectedCheckedException(reconnectFut,
+ "Operation has been cancelled (client node disconnected).");
}
/**
@@ -339,6 +360,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut);
assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']';
+
+ if (cctx.kernalContext().clientDisconnected())
+ ((GridFutureAdapter)fut).onDone(disconnectedError(null));
}
/**
@@ -459,7 +483,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
fut.onNodeLeft(n.id());
}
- // Just in case if future was complete before it was added.
+ if (cctx.kernalContext().clientDisconnected())
+ ((GridFutureAdapter)fut).onDone(disconnectedError(null));
+
+ // Just in case if future was completed before it was added.
if (fut.isDone())
removeFuture(fut);
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4398b4c..1f6a8bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -97,6 +97,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private final AtomicReference<AffinityTopologyVersion> readyTopVer =
new AtomicReference<>(AffinityTopologyVersion.NONE);
+ /** */
+ private GridFutureAdapter<?> reconnectExchangeFut;
+
/**
* Partition map futures.
* This set also contains already completed exchange futures to address race conditions when coordinator
@@ -237,9 +240,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
});
}
+ /**
+ * @return Reconnect partition exchange future.
+ */
+ public IgniteInternalFuture<?> reconnectExchangeFuture() {
+ return reconnectExchangeFut;
+ }
+
/** {@inheritDoc} */
- @Override protected void onKernalStart0() throws IgniteCheckedException {
- super.onKernalStart0();
+ @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
+ super.onKernalStart0(reconnect);
ClusterNode loc = cctx.localNode();
@@ -260,6 +270,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, null);
+ if (reconnect)
+ reconnectExchangeFut = new GridFutureAdapter<>();
+
new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start();
onDiscoveryEvent(cctx.localNodeId(), fut);
@@ -267,10 +280,30 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
// Allow discovery events to get processed.
locExchFut.onDone();
- if (log.isDebugEnabled())
- log.debug("Beginning to wait on local exchange future: " + fut);
+ if (reconnect) {
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ try {
+ fut.get();
+
+ for (GridCacheContext cacheCtx : cctx.cacheContexts())
+ cacheCtx.preloader().onInitialExchangeComplete(null);
+
+ reconnectExchangeFut.onDone();
+ }
+ catch (IgniteCheckedException e) {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts())
+ cacheCtx.preloader().onInitialExchangeComplete(e);
+
+ reconnectExchangeFut.onDone(e);
+ }
+ }
+ });
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Beginning to wait on local exchange future: " + fut);
- try {
boolean first = true;
while (true) {
@@ -296,28 +329,35 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
for (GridCacheContext cacheCtx : cctx.cacheContexts())
cacheCtx.preloader().onInitialExchangeComplete(null);
- }
- catch (IgniteFutureTimeoutCheckedException e) {
- IgniteCheckedException err = new IgniteCheckedException("Timed out waiting for exchange future: " + fut, e);
- for (GridCacheContext cacheCtx : cctx.cacheContexts())
- cacheCtx.preloader().onInitialExchangeComplete(err);
-
- throw err;
+ if (log.isDebugEnabled())
+ log.debug("Finished waiting for initial exchange: " + fut.exchangeId());
}
-
- if (log.isDebugEnabled())
- log.debug("Finished waiting on local exchange: " + fut.exchangeId());
}
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
+ cctx.gridEvents().removeLocalEventListener(discoLsnr);
+
+ cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class);
+ cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class);
+ cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class);
+
+ IgniteCheckedException err = cctx.kernalContext().clientDisconnected() ?
+ new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(),
+ "Client node disconnected: " + cctx.gridName()) :
+ new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName());
+
// Finish all exchange futures.
- for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
- f.onDone(new IgniteInterruptedCheckedException("Grid is stopping: " + cctx.gridName()));
+ ExchangeFutureSet exchFuts0 = exchFuts;
+
+ if (exchFuts0 != null) {
+ for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
+ f.onDone(err);
+ }
for (AffinityReadyFuture f : readyFuts.values())
- f.onDone(new IgniteInterruptedCheckedException("Grid is stopping: " + cctx.gridName()));
+ f.onDone(err);
U.cancel(exchWorker);
@@ -634,7 +674,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
node.id() + ", msg=" + m + ']');
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to send partitions full message [node=" + node + ']', e);
+ U.warn(log, "Failed to send partitions full message [node=" + node + ", err=" + e + ']');
}
}
@@ -1097,6 +1137,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
catch (IgniteInterruptedCheckedException e) {
throw e;
}
+ catch (IgniteClientDisconnectedCheckedException e) {
+ return;
+ }
catch (IgniteCheckedException e) {
U.error(log, "Failed to wait for completion of partition map exchange " +
"(preloading will not start): " + exchFut, e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index e0f6181..b8bb08e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -56,6 +56,11 @@ public interface GridCachePreloader {
public void onKernalStop();
/**
+ * Client reconnected callback.
+ */
+ public void onReconnected();
+
+ /**
* Callback by exchange manager when initial partition exchange is complete.
*
* @param err Error, if any happened on initial exchange.
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index b4f386f..0adf510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -87,6 +87,11 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
+ @Override public void onReconnected() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
this.preloadPred = preloadPred;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index bb87a86..bda0485 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -127,6 +127,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Count down latch for caches. */
private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
+ /** */
+ private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
+
/**
* @param ctx Kernal context.
*/
@@ -704,7 +707,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
});
for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
- mgr.onKernalStart();
+ mgr.onKernalStart(false);
for (GridCacheAdapter<?, ?> cache : caches.values())
onKernalStart(cache);
@@ -796,7 +799,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
// Must call onKernalStart on shared managers after creation of fetched caches.
for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
- mgr.onKernalStart();
+ mgr.onKernalStart(false);
for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
GridCacheAdapter cache = e.getValue();
@@ -911,6 +914,90 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ cachesOnDisconnect = new HashMap<>(registeredCaches);
+
+ IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
+ ctx.cluster().clientReconnectFuture(),
+ "Failed to execute dynamic cache change request, client node disconnected.");
+
+ for (IgniteInternalFuture fut : pendingFuts.values())
+ ((GridFutureAdapter)fut).onDone(err);
+
+ for (IgniteInternalFuture fut : pendingTemplateFuts.values())
+ ((GridFutureAdapter)fut).onDone(err);
+
+ for (GridCacheAdapter cache : caches.values()) {
+ GridCacheContext cctx = cache.context();
+
+ cctx.gate().onDisconnected(reconnectFut);
+
+ List<GridCacheManager> mgrs = cache.context().managers();
+
+ for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) {
+ GridCacheManager mgr = it.previous();
+
+ mgr.onDisconnected(reconnectFut);
+ }
+ }
+
+ sharedCtx.onDisconnected(reconnectFut);
+
+ registeredCaches.clear();
+
+ registeredTemplates.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+ List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size());
+
+ for (GridCacheAdapter cache : caches.values()) {
+ String name = cache.name();
+
+ boolean stopped;
+
+ boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name);
+
+ if (!sysCache) {
+ DynamicCacheDescriptor oldDesc = cachesOnDisconnect.get(maskNull(name));
+
+ assert oldDesc != null : "No descriptor for cache: " + name;
+
+ DynamicCacheDescriptor newDesc = registeredCaches.get(maskNull(name));
+
+ stopped = newDesc == null || !oldDesc.deploymentId().equals(newDesc.deploymentId());
+ }
+ else
+ stopped = false;
+
+ if (stopped) {
+ cache.context().gate().reconnected(true);
+
+ sharedCtx.removeCacheContext(cache.ctx);
+
+ caches.remove(maskNull(cache.name()));
+ jCacheProxies.remove(maskNull(cache.name()));
+
+ onKernalStop(cache, true);
+ stopCache(cache, true);
+ }
+ else {
+ cache.onReconnected();
+
+ reconnected.add(cache);
+ }
+ }
+
+ sharedCtx.onReconnected();
+
+ for (GridCacheAdapter cache : reconnected)
+ cache.context().gate().reconnected(false);
+
+ cachesOnDisconnect = null;
+ }
+
/**
* @param cache Cache to start.
* @throws IgniteCheckedException If failed to start cache.
@@ -1529,7 +1616,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (proxy != null) {
if (req.stop())
- proxy.gate().block();
+ proxy.gate().stopped();
else
proxy.closeProxy();
}
@@ -1673,8 +1760,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
depMgr,
exchMgr,
ioMgr,
- storeSesLsnrs,
- jta
+ jta,
+ storeSesLsnrs
);
}
@@ -1689,7 +1776,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
Collection<DynamicCacheChangeRequest> reqs =
new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null;
+
+ Map<String, DynamicCacheDescriptor> descs = reconnect ? cachesOnDisconnect : registeredCaches;
+
+ for (DynamicCacheDescriptor desc : descs.values()) {
if (!desc.cancelled()) {
DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
@@ -1717,7 +1808,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheChangeBatch req = new DynamicCacheChangeBatch(reqs);
- req.clientNodes(ctx.discovery().clientNodesMap());
+ Map<String, Map<UUID, Boolean>> clientNodesMap = ctx.discovery().clientNodesMap();
+
+ if (reconnect) {
+ clientNodesMap = U.newHashMap(caches.size());
+
+ for (GridCacheAdapter<?, ?> cache : caches.values()) {
+ Boolean nearEnabled = cache.isNear();
+
+ Map<UUID, Boolean> map = U.newHashMap(1);
+
+ map.put(nodeId, nearEnabled);
+
+ clientNodesMap.put(cache.name(), map);
+ }
+ }
+
+ req.clientNodes(clientNodesMap);
+
+ req.clientReconnect(reconnect);
return req;
}
@@ -1727,38 +1836,86 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (data instanceof DynamicCacheChangeBatch) {
DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data;
- for (DynamicCacheChangeRequest req : batch.requests()) {
- if (req.template()) {
- CacheConfiguration ccfg = req.startCacheConfiguration();
+ if (batch.clientReconnect()) {
+ for (DynamicCacheChangeRequest req : batch.requests()) {
+ assert !req.template() : req;
- assert ccfg != null : req;
+ String name = req.cacheName();
- DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName()));
+ boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name);
- if (existing == null) {
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
- ctx,
- ccfg,
- req.cacheType(),
- true,
- req.deploymentId());
+ if (!sysCache) {
+ DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
- registeredTemplates.put(maskNull(req.cacheName()), desc);
- }
+ if (desc != null && !desc.cancelled() && desc.deploymentId().equals(req.deploymentId())) {
+ Map<UUID, Boolean> nodes = batch.clientNodes().get(name);
- continue;
+ assert nodes != null : req;
+ assert nodes.containsKey(joiningNodeId) : nodes;
+
+ ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, nodes.get(joiningNodeId));
+ }
+ }
+ else
+ ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, false);
}
+ }
+ else {
+ for (DynamicCacheChangeRequest req : batch.requests()) {
+ if (req.template()) {
+ CacheConfiguration ccfg = req.startCacheConfiguration();
- DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName()));
+ assert ccfg != null : req;
- if (req.start() && !req.clientStartOnly()) {
- CacheConfiguration ccfg = req.startCacheConfiguration();
+ DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName()));
- if (existing != null) {
- if (existing.locallyConfigured()) {
- existing.deploymentId(req.deploymentId());
+ if (existing == null) {
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+ ctx,
+ ccfg,
+ req.cacheType(),
+ true,
+ req.deploymentId());
+
+ registeredTemplates.put(maskNull(req.cacheName()), desc);
+ }
+
+ continue;
+ }
- existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
+ DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName()));
+
+ if (req.start() && !req.clientStartOnly()) {
+ CacheConfiguration ccfg = req.startCacheConfiguration();
+
+ if (existing != null) {
+ if (existing.locallyConfigured()) {
+ existing.deploymentId(req.deploymentId());
+
+ existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
+
+ ctx.discovery().setCacheFilter(
+ req.cacheName(),
+ ccfg.getNodeFilter(),
+ ccfg.getNearConfiguration() != null,
+ ccfg.getCacheMode() == LOCAL);
+ }
+ }
+ else {
+ assert req.cacheType() != null : req;
+
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+ ctx,
+ ccfg,
+ req.cacheType(),
+ false,
+ req.deploymentId());
+
+ // Received statically configured cache.
+ if (req.initiatingNodeId() == null)
+ desc.staticallyConfigured(true);
+
+ registeredCaches.put(maskNull(req.cacheName()), desc);
ctx.discovery().setCacheFilter(
req.cacheName(),
@@ -1767,37 +1924,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ccfg.getCacheMode() == LOCAL);
}
}
- else {
- assert req.cacheType() != null : req;
-
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
- ctx,
- ccfg,
- req.cacheType(),
- false,
- req.deploymentId());
-
- // Received statically configured cache.
- if (req.initiatingNodeId() == null)
- desc.staticallyConfigured(true);
-
- registeredCaches.put(maskNull(req.cacheName()), desc);
-
- ctx.discovery().setCacheFilter(
- req.cacheName(),
- ccfg.getNodeFilter(),
- ccfg.getNearConfiguration() != null,
- ccfg.getCacheMode() == LOCAL);
- }
}
- }
- if (!F.isEmpty(batch.clientNodes())) {
- for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) {
- String cacheName = entry.getKey();
+ if (!F.isEmpty(batch.clientNodes())) {
+ for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) {
+ String cacheName = entry.getKey();
- for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
- ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
+ for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
+ ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
+ }
}
}
}
@@ -2152,8 +2287,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
- if (!sndReqs.isEmpty())
- ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs));
+ Exception err = null;
+
+ if (!sndReqs.isEmpty()) {
+ try {
+ ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs));
+
+ if (ctx.clientDisconnected())
+ err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+ "Failed to execute dynamic cache change request, client node disconnected.");
+ }
+ catch (IgniteCheckedException e) {
+ err = e;
+ }
+ }
+
+ if (err != null) {
+ for (DynamicCacheStartFuture fut : res)
+ fut.onDone(err);
+ }
return res;
}
@@ -2666,8 +2818,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
*
* @return Utility cache.
*/
- public <K, V> GridCacheAdapter<K, V> utilityCache() {
- return internalCache(CU.UTILITY_CACHE_NAME);
+ public <K, V> IgniteInternalCache<K, V> utilityCache() {
+ return internalCacheEx(CU.UTILITY_CACHE_NAME);
}
/**
@@ -2676,7 +2828,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Utility cache for atomic data structures.
*/
public <K, V> IgniteInternalCache<K, V> atomicsCache() {
- return cache(CU.ATOMICS_CACHE_NAME);
+ return internalCacheEx(CU.ATOMICS_CACHE_NAME);
+ }
+
+ /**
+ * @param name Cache name.
+ * @return Cache.
+ */
+ private <K, V> IgniteInternalCache<K, V> internalCacheEx(String name) {
+ if (ctx.discovery().localNode().isClient()) {
+ IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)jCacheProxies.get(name);
+
+ assert proxy != null;
+
+ return proxy.internalProxy();
+ }
+
+ return internalCache(name);
}
/**
@@ -2796,7 +2964,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (old != null)
fut = old;
- ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(Collections.singleton(req)));
+ Exception err = null;
+
+ try {
+ ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(Collections.singleton(req)));
+
+ if (ctx.clientDisconnected())
+ err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+ "Failed to execute dynamic cache change request, client node disconnected.");
+ }
+ catch (IgniteCheckedException e) {
+ err = e;
+ }
+
+ if (err != null)
+ fut.onDone(err);
fut.get();
}
@@ -2856,8 +3038,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* Cancel all user operations.
*/
public void cancelUserOperations() {
- for (GridCacheAdapter<?, ?> cache : caches.values())
- cache.ctx.mvcc().cancelClientFutures();
+ sharedCtx.mvcc().cancelClientFutures();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 7f4daff..4075d79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
import org.jetbrains.annotations.*;
@@ -86,9 +87,15 @@ public class GridCacheSharedContext<K, V> {
private Collection<CacheStoreSessionListener> storeSesLsnrs;
/**
+ * @param kernalCtx Context.
* @param txMgr Transaction manager.
* @param verMgr Version manager.
* @param mvccMgr MVCC manager.
+ * @param depMgr Deployment manager.
+ * @param exchMgr Exchange manager.
+ * @param ioMgr IO manager.
+ * @param jtaMgr JTA manager.
+ * @param storeSesLsnrs Store session listeners.
*/
public GridCacheSharedContext(
GridKernalContext kernalCtx,
@@ -98,17 +105,13 @@ public class GridCacheSharedContext<K, V> {
GridCacheDeploymentManager<K, V> depMgr,
GridCachePartitionExchangeManager<K, V> exchMgr,
GridCacheIoManager ioMgr,
- Collection<CacheStoreSessionListener> storeSesLsnrs,
- CacheJtaManagerAdapter jtaMgr
+ CacheJtaManagerAdapter jtaMgr,
+ Collection<CacheStoreSessionListener> storeSesLsnrs
) {
this.kernalCtx = kernalCtx;
- this.mvccMgr = add(mvccMgr);
- this.verMgr = add(verMgr);
- this.txMgr = add(txMgr);
- this.jtaMgr = add(jtaMgr);
- this.depMgr = add(depMgr);
- this.exchMgr = add(exchMgr);
- this.ioMgr = add(ioMgr);
+
+ setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, ioMgr);
+
this.storeSesLsnrs = storeSesLsnrs;
txMetrics = new TransactionMetricsAdapter();
@@ -117,6 +120,89 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @param reconnectFut Reconnect future.
+ * @throws IgniteCheckedException If failed.
+ */
+ void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size());
+ it.hasPrevious();) {
+ GridCacheSharedManager<?, ?> mgr = it.previous();
+
+ mgr.onDisconnected(reconnectFut);
+
+ if (restartOnDisconnect(mgr))
+ mgr.onKernalStop(true);
+ }
+
+ for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) {
+ GridCacheSharedManager<?, ?> mgr = it.previous();
+
+ if (restartOnDisconnect(mgr))
+ mgr.stop(true);
+ }
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ void onReconnected() throws IgniteCheckedException {
+ List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>();
+
+ setManagers(mgrs, txMgr,
+ jtaMgr,
+ verMgr,
+ mvccMgr,
+ new GridCacheDeploymentManager<K, V>(),
+ new GridCachePartitionExchangeManager<K, V>(),
+ ioMgr);
+
+ this.mgrs = mgrs;
+
+ for (GridCacheSharedManager<K, V> mgr : mgrs) {
+ if (restartOnDisconnect(mgr))
+ mgr.start(this);
+ }
+
+ for (GridCacheSharedManager<?, ?> mgr : mgrs)
+ mgr.onKernalStart(true);
+ }
+
+ /**
+ * @param mgr Manager.
+ * @return {@code True} if manager is restarted cn reconnect.
+ */
+ private boolean restartOnDisconnect(GridCacheSharedManager<?, ?> mgr) {
+ return mgr instanceof GridCacheDeploymentManager || mgr instanceof GridCachePartitionExchangeManager;
+ }
+
+ /**
+ * @param mgrs Managers list.
+ * @param txMgr Transaction manager.
+ * @param verMgr Version manager.
+ * @param mvccMgr MVCC manager.
+ * @param depMgr Deployment manager.
+ * @param exchMgr Exchange manager.
+ * @param ioMgr IO manager.
+ * @param jtaMgr JTA manager.
+ */
+ private void setManagers(List<GridCacheSharedManager<K, V>> mgrs,
+ IgniteTxManager txMgr,
+ CacheJtaManagerAdapter jtaMgr,
+ GridCacheVersionManager verMgr,
+ GridCacheMvccManager mvccMgr,
+ GridCacheDeploymentManager<K, V> depMgr,
+ GridCachePartitionExchangeManager<K, V> exchMgr,
+ GridCacheIoManager ioMgr) {
+ this.mvccMgr = add(mgrs, mvccMgr);
+ this.verMgr = add(mgrs, verMgr);
+ this.txMgr = add(mgrs, txMgr);
+ this.jtaMgr = add(mgrs, jtaMgr);
+ this.depMgr = add(mgrs, depMgr);
+ this.exchMgr = add(mgrs, exchMgr);
+ this.ioMgr = add(mgrs, ioMgr);
+ }
+
+ /**
* Gets all cache contexts for local node.
*
* @return Collection of all cache contexts.
@@ -136,6 +222,7 @@ public class GridCacheSharedContext<K, V> {
* Adds cache context to shared cache context.
*
* @param cacheCtx Cache context to add.
+ * @throws IgniteCheckedException If cache ID conflict detected.
*/
@SuppressWarnings("unchecked")
public void addCacheContext(GridCacheContext cacheCtx) throws IgniteCheckedException {
@@ -230,7 +317,7 @@ public class GridCacheSharedContext<K, V> {
*/
public byte dataCenterId() {
// Data center ID is same for all caches, so grab the first one.
- GridCacheContext<K, V> cacheCtx = F.first(cacheContexts());
+ GridCacheContext<?, ?> cacheCtx = F.first(cacheContexts());
return cacheCtx.dataCenterId();
}
@@ -242,7 +329,7 @@ public class GridCacheSharedContext<K, V> {
if (preloadersStartFut == null) {
GridCompoundFuture<Object, Object> compound = null;
- for (GridCacheContext<K, V> cacheCtx : cacheContexts()) {
+ for (GridCacheContext<?, ?> cacheCtx : cacheContexts()) {
IgniteInternalFuture<Object> startFut = cacheCtx.preloader().startFuture();
if (!startFut.isDone()) {
@@ -551,10 +638,12 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @param mgrs Managers list.
* @param mgr Manager to add.
* @return Added manager.
*/
- @Nullable private <T extends GridCacheSharedManager<K, V>> T add(@Nullable T mgr) {
+ @Nullable private <T extends GridCacheSharedManager<K, V>> T add(List<GridCacheSharedManager<K, V>> mgrs,
+ @Nullable T mgr) {
if (mgr != null)
mgrs.add(mgr);
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
index d45052c..668bd00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
@@ -18,11 +18,12 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
/**
* Cache manager shared across all caches.
*/
-public interface GridCacheSharedManager <K, V> {
+public interface GridCacheSharedManager<K, V> {
/**
* Starts manager.
*
@@ -39,9 +40,10 @@ public interface GridCacheSharedManager <K, V> {
public void stop(boolean cancel);
/**
+ * @param reconnect {@code True} if manager restarted after client reconnect.
* @throws IgniteCheckedException If failed.
*/
- public void onKernalStart() throws IgniteCheckedException;
+ public void onKernalStart(boolean reconnect) throws IgniteCheckedException;
/**
* @param cancel Cancel flag.
@@ -49,6 +51,11 @@ public interface GridCacheSharedManager <K, V> {
public void onKernalStop(boolean cancel);
/**
+ * @param reconnectFut Reconnect future.
+ */
+ public void onDisconnected(IgniteFuture<?> reconnectFut);
+
+ /**
* Prints memory statistics (sizes of internal data structures, etc.).
*
* NOTE: this method is for testing and profiling purposes only.
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
index 2cf7051..6ad76ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import java.util.concurrent.atomic.*;
@@ -35,6 +36,9 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
/** Starting flag. */
private final AtomicBoolean starting = new AtomicBoolean(false);
+ /** */
+ private final AtomicBoolean stop = new AtomicBoolean(false);
+
/** {@inheritDoc} */
@Override public final void start(GridCacheSharedContext<K, V> cctx) throws IgniteCheckedException {
if (!starting.compareAndSet(false, true))
@@ -75,7 +79,7 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
/** {@inheritDoc} */
@Override public final void stop(boolean cancel) {
- if (!starting.get())
+ if (!starting.get() || !stop.compareAndSet(false, true))
// Ignoring attempt to stop manager that has never been started.
return;
@@ -93,10 +97,10 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
}
/** {@inheritDoc} */
- @Override public final void onKernalStart() throws IgniteCheckedException {
- onKernalStart0();
+ @Override public final void onKernalStart(boolean reconnect) throws IgniteCheckedException {
+ onKernalStart0(reconnect);
- if (log != null && log.isDebugEnabled())
+ if (!reconnect && log != null && log.isDebugEnabled())
log.debug(kernalStartInfo());
}
@@ -113,9 +117,10 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
}
/**
+ * @param reconnect {@code True} if manager restarted after client reconnect.
* @throws IgniteCheckedException If failed.
*/
- protected void onKernalStart0() throws IgniteCheckedException {
+ protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
// No-op.
}
@@ -127,6 +132,11 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
}
/** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void printMemoryStats() {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index f88e288..bd2623d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1560,6 +1560,17 @@ public class GridCacheUtils {
* @return CacheException runtime exception, never null.
*/
@NotNull public static RuntimeException convertToCacheException(IgniteCheckedException e) {
+ IgniteClientDisconnectedCheckedException disconnectedErr =
+ e instanceof IgniteClientDisconnectedCheckedException ?
+ (IgniteClientDisconnectedCheckedException)e
+ : e.getCause(IgniteClientDisconnectedCheckedException.class);
+
+ if (disconnectedErr != null) {
+ assert disconnectedErr.reconnectFuture() != null : disconnectedErr;
+
+ e = disconnectedErr;
+ }
+
if (e.hasCause(CacheWriterException.class))
return new CacheWriterException(U.convertExceptionNoWrap(e));
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
index 42e31d2..9233f24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
@@ -43,6 +43,11 @@ public class IgniteCacheFutureImpl<V> extends IgniteFutureImpl<V> {
/** {@inheritDoc} */
@Override protected RuntimeException convertException(IgniteCheckedException e) {
+ if (e instanceof IgniteFutureCancelledCheckedException ||
+ e instanceof IgniteInterruptedCheckedException ||
+ e instanceof IgniteFutureTimeoutCheckedException)
+ return U.convertException(e);
+
return CU.convertToCacheException(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index bb7714a..0b2eba0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -1751,7 +1751,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
assert false;
}
- @Override public void block() {
+ @Override public void stopped() {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index b5c5161..78bd0eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -106,6 +106,41 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
}
/**
+ * @param set Set.
+ */
+ public void onRemoved(GridCacheSetProxy set) {
+ setsMap.remove(set.delegate().id(), set);
+ }
+
+ /**
+ * @param clusterRestarted Cluster restarted flag.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+ for (Map.Entry<IgniteUuid, GridCacheSetProxy> e : setsMap.entrySet()) {
+ GridCacheSetProxy set = e.getValue();
+
+ if (clusterRestarted) {
+ set.blockOnRemove();
+
+ setsMap.remove(e.getKey(), set);
+ }
+ else
+ set.needCheckNotRemoved();
+ }
+
+ for (Map.Entry<IgniteUuid, GridCacheQueueProxy> e : queuesMap.entrySet()) {
+ GridCacheQueueProxy queue = e.getValue();
+
+ if (clusterRestarted) {
+ queue.delegate().onRemoved(false);
+
+ queuesMap.remove(e.getKey(), queue);
+ }
+ }
+ }
+
+ /**
* @throws IgniteCheckedException If thread is interrupted or manager
* was not successfully initialized.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
index 2838838..0b351b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
@@ -81,6 +82,16 @@ public class GridCacheTxFinishSync<K, V> {
}
/**
+ * @param reconnectFut Reconnect future.
+ */
+ public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ for (ThreadFinishSync threadSync : threadMap.values())
+ threadSync.onDisconnected(reconnectFut);
+
+ threadMap.clear();
+ }
+
+ /**
* Callback invoked when finish response is received from remote node.
*
* @param nodeId Node ID response was received from.
@@ -139,6 +150,11 @@ public class GridCacheTxFinishSync<K, V> {
nodeMap.remove(nodeId);
}
+ else if (cctx.kernalContext().clientDisconnected()) {
+ sync.onDisconnected(cctx.kernalContext().cluster().clientReconnectFuture());
+
+ nodeMap.remove(nodeId);
+ }
}
sync.onSend();
@@ -160,6 +176,16 @@ public class GridCacheTxFinishSync<K, V> {
}
/**
+ * @param reconnectFut Reconnect future.
+ */
+ public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ for (TxFinishSync sync : nodeMap.values())
+ sync.onDisconnected(reconnectFut);
+
+ nodeMap.clear();
+ }
+
+ /**
* @param nodeId Node ID response received from.
*/
public void onReceive(UUID nodeId) {
@@ -288,5 +314,25 @@ public class GridCacheTxFinishSync<K, V> {
}
}
}
+
+ /**
+ * Client disconnected callback.
+ *
+ * @param reconnectFut Reconnect future.
+ */
+ public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ synchronized (this) {
+ nodeLeft = true;
+
+ if (pendingFut != null) {
+ IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
+ reconnectFut,
+ "Failed to wait for transaction synchronizer, client node disconnected: " + nodeId);
+ pendingFut.onDone(err);
+
+ pendingFut = null;
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index adea9e0..22a5287 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -51,7 +51,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
private static final long serialVersionUID = 0L;
/** Topology. */
- private GridDhtPartitionTopology top;
+ private GridDhtPartitionTopologyImpl top;
/** Preloader. */
protected GridCachePreloader preldr;
@@ -134,6 +134,18 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
}
/** {@inheritDoc} */
+ @Override public void onReconnected() {
+ super.onReconnected();
+
+ ctx.affinity().onReconnected();
+
+ top.onReconnected();
+
+ if (preldr != null)
+ preldr.onReconnected();
+ }
+
+ /** {@inheritDoc} */
@Override public void onKernalStart() throws IgniteCheckedException {
super.onKernalStart();
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index de7f876..facf329 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -94,6 +94,30 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/**
+ *
+ */
+ public void onReconnected() {
+ lock.writeLock().lock();
+
+ try {
+ node2part = null;
+
+ part2node = new HashMap<>();
+
+ lastExchangeId = null;
+
+ updateSeq.set(1);
+
+ topReadyFut = null;
+
+ topVer = AffinityTopologyVersion.NONE;
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
* @return Full map string representation.
*/
@SuppressWarnings( {"ConstantConditions"})
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 79d5e75..bb3673d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -639,10 +639,17 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
if (timeout.finish()) {
cctx.kernalContext().timeout().removeTimeoutObject(timeout);
- // Remap.
- map(keys.keySet(), F.t(node, keys), updTopVer);
+ try {
+ fut.get();
- onDone(Collections.<K, V>emptyMap());
+ // Remap.
+ map(keys.keySet(), F.t(node, keys), updTopVer);
+
+ onDone(Collections.<K, V>emptyMap());
+ }
+ catch (IgniteCheckedException e) {
+ GridPartitionedGetFuture.this.onDone(e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 0355bb3..a43ebe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -67,7 +67,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
private GridDhtPartitionDemandPool demandPool;
/** Start future. */
- private final GridFutureAdapter<Object> startFut;
+ private GridFutureAdapter<Object> startFut;
/** Busy lock to prevent activities from accessing exchanger while it's stopping. */
private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
@@ -180,13 +180,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
topVer.setIfGreater(startTopVer);
- // Generate dummy discovery event for local node joining.
- DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent();
-
- assert discoEvt != null;
-
- assert discoEvt.topologyVersion() == startTopVer;
-
supplyPool.start();
demandPool.start();
}
@@ -230,7 +223,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
final CacheConfiguration cfg = cctx.config();
- if (cfg.getRebalanceDelay() >= 0) {
+ if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
demandPool.syncFuture().listen(new CI1<Object>() {
@@ -246,6 +239,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
+ @Override public void onReconnected() {
+ startFut = new GridFutureAdapter<>();
+ }
+
+ /** {@inheritDoc} */
@Override public void onExchangeFutureAdded() {
demandPool.onExchangeFutureAdded();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 351d6cd..79b7c1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -90,6 +90,14 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
public abstract GridDhtCacheAdapter<K, V> dht();
/** {@inheritDoc} */
+ @Override public void onReconnected() {
+ map = new GridCacheConcurrentMap(ctx,
+ ctx.config().getNearConfiguration().getNearStartSize(),
+ 0.75F,
+ map.getEntryFactory());
+ }
+
+ /** {@inheritDoc} */
@Override public boolean isNear() {
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 58f6fe5..0691d39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -764,10 +764,17 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
if (timeout.finish()) {
cctx.kernalContext().timeout().removeTimeoutObject(timeout);
- // Remap.
- map(keys.keySet(), F.t(node, keys), updTopVer);
+ try {
+ fut.get();
- onDone(Collections.<K, V>emptyMap());
+ // Remap.
+ map(keys.keySet(), F.t(node, keys), updTopVer);
+
+ onDone(Collections.<K, V>emptyMap());
+ }
+ catch (IgniteCheckedException e) {
+ GridNearGetFuture.this.onDone(e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
index 00ed020..7f0a568 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
@@ -20,9 +20,9 @@ package org.apache.ignite.internal.processors.cache.dr;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.dr.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.dr.*;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
/**
@@ -103,4 +103,9 @@ public class GridOsCacheDrManager implements GridCacheDrManager {
@Override public boolean receiveEnabled() {
return false;
}
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture reconnectFut) {
+ // No-op.
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 2b93144..316713f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -110,6 +110,20 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
}
/** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
+ "Query was cancelled, client node disconnected.");
+
+ for (Map.Entry<Long, GridCacheDistributedQueryFuture<?, ?, ?>> e : futs.entrySet()) {
+ GridCacheDistributedQueryFuture<?, ?, ?> fut = e.getValue();
+
+ fut.onPage(null, null, err, true);
+
+ futs.remove(e.getKey(), fut);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void printMemoryStats() {
super.printMemoryStats();
@@ -125,6 +139,14 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
*/
protected void addQueryFuture(long reqId, GridCacheDistributedQueryFuture<?, ?, ?> fut) {
futs.put(reqId, fut);
+
+ if (cctx.kernalContext().clientDisconnected()) {
+ IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
+ cctx.kernalContext().cluster().clientReconnectFuture(),
+ "Query was cancelled, client node disconnected.");
+
+ fut.onDone(err);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index c2425f0..953cb9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -578,6 +578,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/**
* @param nodes Nodes.
+ * @return Nodes for query execution.
*/
private Queue<ClusterNode> fallbacks(Collection<ClusterNode> nodes) {
Queue<ClusterNode> fallbacks = new LinkedList<>();
@@ -595,18 +596,22 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/**
*
*/
+ @SuppressWarnings("unchecked")
private void init() {
ClusterNode node = nodes.poll();
- GridCacheQueryFutureAdapter<?, ?, R> fut0 =
- (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? qryMgr.queryLocal(bean) :
- qryMgr.queryDistributed(bean, Collections.singleton(node)));
+ GridCacheQueryFutureAdapter<?, ?, R> fut0 = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ?
+ qryMgr.queryLocal(bean) :
+ qryMgr.queryDistributed(bean, Collections.singleton(node)));
fut0.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() {
@Override public void apply(IgniteInternalFuture<Collection<R>> fut) {
try {
onDone(fut.get());
}
+ catch (IgniteClientDisconnectedCheckedException e) {
+ onDone(e);
+ }
catch (IgniteCheckedException e) {
if (F.isEmpty(nodes))
onDone(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index a8bace0..53017c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -163,7 +163,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
return null;
}
catch (IgniteCheckedException e) {
- throw new IgniteException(e);
+ throw CU.convertToCacheException(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index e059760..879c30c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -396,6 +396,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheContinuousQueryHandler.class, this);
+ }
+
+ /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, cacheName);
out.writeObject(topic);
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
index 99907e4..7d9bcf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
@@ -140,32 +140,39 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
@SuppressWarnings("unchecked")
private IgniteInternalTx txStart0(TransactionConcurrency concurrency, TransactionIsolation isolation,
long timeout, int txSize, @Nullable GridCacheContext sysCacheCtx) {
- TransactionConfiguration cfg = cctx.gridConfig().getTransactionConfiguration();
-
- if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE)
- throw new IllegalArgumentException("SERIALIZABLE isolation level is disabled (to enable change " +
- "'txSerializableEnabled' configuration property)");
-
- IgniteInternalTx tx = cctx.tm().userTx(sysCacheCtx);
-
- if (tx != null)
- throw new IllegalStateException("Failed to start new transaction " +
- "(current thread already has a transaction): " + tx);
-
- tx = cctx.tm().newTx(
- false,
- false,
- sysCacheCtx,
- concurrency,
- isolation,
- timeout,
- true,
- txSize
- );
-
- assert tx != null;
-
- return tx;
+ cctx.kernalContext().gateway().readLock();
+
+ try {
+ TransactionConfiguration cfg = cctx.gridConfig().getTransactionConfiguration();
+
+ if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE)
+ throw new IllegalArgumentException("SERIALIZABLE isolation level is disabled (to enable change " +
+ "'txSerializableEnabled' configuration property)");
+
+ IgniteInternalTx tx = cctx.tm().userTx(sysCacheCtx);
+
+ if (tx != null)
+ throw new IllegalStateException("Failed to start new transaction " +
+ "(current thread already has a transaction): " + tx);
+
+ tx = cctx.tm().newTx(
+ false,
+ false,
+ sysCacheCtx,
+ concurrency,
+ isolation,
+ timeout,
+ true,
+ txSize
+ );
+
+ assert tx != null;
+
+ return tx;
+ }
+ finally {
+ cctx.kernalContext().gateway().readUnlock();
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index b6c77f6..82543c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -115,7 +115,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
new ConcurrentHashMap8<>(5120);
/** {@inheritDoc} */
- @Override protected void onKernalStart0() {
+ @Override protected void onKernalStart0(boolean reconnect) {
+ if (reconnect)
+ return;
+
cctx.gridEvents().addLocalEventListener(
new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
@@ -149,6 +152,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
txHandler = new IgniteTxHandler(cctx);
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture reconnectFut) {
+ txFinishSync.onDisconnected(reconnectFut);
+
+ for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet())
+ rollbackTx(e.getValue());
+ }
+
/**
* @return TX handler.
*/
@@ -764,11 +775,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
}
- boolean txSerializableEnabled = cctx.txConfig().isTxSerializableEnabled();
+ boolean txSerEnabled = cctx.txConfig().isTxSerializableEnabled();
// Clean up committed transactions queue.
if (tx.pessimistic() && tx.local()) {
- if (tx.enforceSerializable() && txSerializableEnabled) {
+ if (tx.enforceSerializable() && txSerEnabled) {
for (Iterator<IgniteInternalTx> it = committedQ.iterator(); it.hasNext();) {
IgniteInternalTx committedTx = it.next();
@@ -784,7 +795,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return;
}
- if (txSerializableEnabled && tx.optimistic() && tx.enforceSerializable()) {
+ if (txSerEnabled && tx.optimistic() && tx.enforceSerializable()) {
Set<IgniteTxKey> readSet = tx.readSet();
Set<IgniteTxKey> writeSet = tx.writeSet();
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
index 5099b42..9346e43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
@@ -89,7 +89,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
try {
cctx.kernalContext().gateway().readLock();
}
- catch (IllegalStateException e) {
+ catch (IllegalStateException | IgniteClientDisconnectedException e) {
throw e;
}
catch (RuntimeException | Error e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index c776361..90919c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -78,13 +78,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
}
};
- /**
- * @return Pre-generated UUID.
- */
- private IgniteUuid uuid() {
- return IgniteUuid.randomUuid();
- }
-
/** {@inheritDoc} */
@Override public void start0() throws IgniteCheckedException {
txSerEnabled = cctx.gridConfig().getTransactionConfiguration().isTxSerializableEnabled();
@@ -95,7 +88,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
}
/** {@inheritDoc} */
- @Override protected void onKernalStart0() throws IgniteCheckedException {
+ @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
for (ClusterNode n : cctx.discovery().remoteNodes())
onReceived(n.id(), n.metrics().getLastDataVersion());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index 2920176..3ac44f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -57,7 +57,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
/** Time coordinator thread. */
private volatile TimeCoordinator timeCoord;
- /** Time delta history. Constructed on coorinator. */
+ /** Time delta history. Constructed on coordinator. */
private NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> timeSyncHist =
new GridBoundedConcurrentOrderedMap<>(MAX_TIME_SYNC_HISTORY);
@@ -222,7 +222,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
minNodeOrder = node.order();
}
- ClusterNode locNode = ctx.grid().localNode();
+ ClusterNode locNode = ctx.discovery().localNode();
if (locNode.order() == minNodeOrder) {
if (log.isDebugEnabled())
@@ -295,7 +295,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
ctx.io().send(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
- if (ctx.discovery().pingNode(n.id()))
+ if (ctx.discovery().pingNodeNoError(n.id()))
U.error(log, "Failed to send time sync snapshot to remote node (did not leave grid?) " +
"[nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']');
else if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 0ee00f1..1f5589f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cluster;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.lang.*;
/**
*
@@ -43,4 +45,13 @@ public class ClusterProcessor extends GridProcessorAdapter {
public IgniteClusterImpl get() {
return cluster;
}
+
+ /**
+ * @return Client reconnect future.
+ */
+ public IgniteFuture<?> clientReconnectFuture() {
+ IgniteFuture<?> fut = cluster.clientReconnectFuture();
+
+ return fut != null ? fut : new IgniteFinishedFutureImpl<>();
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index ce9b7c0..79020da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -33,7 +33,14 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
* Listener registration status.
*/
public enum RegisterStatus {
- REGISTERED, NOT_REGISTERED, DELAYED
+ /** */
+ REGISTERED,
+
+ /** */
+ NOT_REGISTERED,
+
+ /** */
+ DELAYED
}
/**