You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/22 13:13:30 UTC
[03/16] ignite git commit: ignite-1452 Cancel cache operations on
node stop
ignite-1452 Cancel cache operations on node stop
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/585761f2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/585761f2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/585761f2
Branch: refs/heads/ignite-1093-2
Commit: 585761f28e8b70487eaf2198d6ea39f7232b088d
Parents: b8c0b30
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 17 16:26:02 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 17 16:26:02 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 7 ---
.../processors/cache/GridCacheContext.java | 6 +--
.../cache/GridCacheEvictionManager.java | 6 +--
.../cache/GridCacheEvictionResponse.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 47 +++++++++++++-------
.../processors/cache/GridCacheMessage.java | 7 +++
.../processors/cache/GridCacheMvccManager.java | 34 +++++++++++---
.../GridCachePartitionExchangeManager.java | 41 +++++++++++++----
.../processors/cache/GridCacheProcessor.java | 28 ++++++++----
.../GridDistributedLockResponse.java | 6 +--
.../GridDistributedTxPrepareResponse.java | 6 +--
.../distributed/dht/GridDhtTopologyFuture.java | 6 ++-
.../dht/GridDhtTransactionalCacheAdapter.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 12 +++--
.../dht/atomic/GridNearAtomicUpdateFuture.java | 16 ++++---
.../dht/atomic/GridNearAtomicUpdateRequest.java | 2 +
.../atomic/GridNearAtomicUpdateResponse.java | 11 ++---
.../colocated/GridDhtColocatedLockFuture.java | 44 ++++++++++++++----
.../dht/preloader/GridDhtForceKeysFuture.java | 2 +-
.../dht/preloader/GridDhtForceKeysResponse.java | 6 +--
.../GridDhtPartitionsExchangeFuture.java | 19 ++++++--
.../distributed/near/GridNearGetResponse.java | 6 +--
.../distributed/near/GridNearLockFuture.java | 26 ++++++++---
.../near/GridNearOptimisticTxPrepareFuture.java | 20 +++++++--
.../near/GridNearTxFinishResponse.java | 6 +--
.../cache/query/GridCacheQueryResponse.java | 6 +--
.../continuous/CacheContinuousQueryHandler.java | 12 +++--
.../transactions/IgniteTxLocalAdapter.java | 4 +-
.../ignite/internal/util/GridSpinBusyLock.java | 10 +++++
.../IgniteCacheEntryProcessorNodeJoinTest.java | 24 +++++++---
.../loadtests/hashmap/GridCacheTestContext.java | 4 +-
.../IgniteCacheQueryNodeRestartSelfTest2.java | 2 -
32 files changed, 292 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index daf7d23..82db059 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1806,8 +1806,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
notifyLifecycleBeansEx(LifecycleEventType.BEFORE_NODE_STOP);
}
- GridCacheProcessor cacheProcessor = ctx.cache();
-
List<GridComponent> comps = ctx.components();
ctx.marshallerContext().onKernalStop();
@@ -1856,11 +1854,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
// Note that interrupted flag is cleared.
interrupted = true;
}
- finally {
- // Cleanup even on successful acquire.
- if (cacheProcessor != null)
- cacheProcessor.cancelUserOperations();
- }
}
if (interrupted)
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 86ba3e6..5385dec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -283,12 +283,12 @@ public class GridCacheContext<K, V> implements Externalizable {
GridCacheEvictionManager evictMgr,
GridCacheQueryManager<K, V> qryMgr,
CacheContinuousQueryManager contQryMgr,
- GridCacheAffinityManager affMgr,
CacheDataStructuresManager dataStructuresMgr,
GridCacheTtlManager ttlMgr,
GridCacheDrManager drMgr,
CacheConflictResolutionManager<K, V> rslvrMgr,
- CachePluginManager pluginMgr
+ CachePluginManager pluginMgr,
+ GridCacheAffinityManager affMgr
) {
assert ctx != null;
assert sharedCtx != null;
@@ -323,12 +323,12 @@ public class GridCacheContext<K, V> implements Externalizable {
this.evictMgr = add(evictMgr);
this.qryMgr = add(qryMgr);
this.contQryMgr = add(contQryMgr);
- this.affMgr = add(affMgr);
this.dataStructuresMgr = add(dataStructuresMgr);
this.ttlMgr = add(ttlMgr);
this.drMgr = add(drMgr);
this.rslvrMgr = add(rslvrMgr);
this.pluginMgr = add(pluginMgr);
+ this.affMgr = add(affMgr);
log = ctx.log(getClass());
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 3e0e2f9..1c34c76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -1943,7 +1943,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
lock.readLock().unlock();
}
- if (res.error())
+ if (res.evictError())
// Complete future, since there was a class loading error on at least one node.
complete(false);
else
@@ -1985,14 +1985,14 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
boolean err = F.forAny(resMap.values(), new P1<GridCacheEvictionResponse>() {
@Override public boolean apply(GridCacheEvictionResponse res) {
- return res.error();
+ return res.evictError();
}
});
if (err) {
Collection<UUID> ids = F.view(resMap.keySet(), new P1<UUID>() {
@Override public boolean apply(UUID e) {
- return resMap.get(e).error();
+ return resMap.get(e).evictError();
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
index 4d40c8d..aa3911b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
@@ -116,7 +116,7 @@ public class GridCacheEvictionResponse extends GridCacheMessage {
/**
* @return {@code True} if request processing has finished with error.
*/
- boolean error() {
+ boolean evictError() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index b55c84d..421ec82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -182,8 +182,15 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass()));
if (c == null) {
- U.warn(log, "Received message without registered handler (will ignore) [msg=" + cacheMsg +
- ", nodeId=" + nodeId + ']');
+ if (cctx.kernalContext().isStopping()) {
+ if (log.isDebugEnabled())
+ log.debug("Received message without registered handler (will ignore) [msg=" + cacheMsg +
+ ", nodeId=" + nodeId + ']');
+ }
+ else {
+ U.warn(log, "Received message without registered handler (will ignore) [msg=" + cacheMsg +
+ ", nodeId=" + nodeId + ']');
+ }
return;
}
@@ -596,9 +603,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
*
* @param msg Message to send.
* @param destNodeId Destination node ID.
+ * @return {@code True} if should send message.
* @throws IgniteCheckedException If failed.
*/
- private void onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws IgniteCheckedException {
+ private boolean onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws IgniteCheckedException {
+ if (msg.error() != null && cctx.kernalContext().isStopping())
+ return false;
+
if (msg.messageId() < 0)
// Generate and set message ID.
msg.messageId(idGen.incrementAndGet());
@@ -609,6 +620,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (depEnabled && msg instanceof GridCacheDeployable)
cctx.deploy().prepare((GridCacheDeployable)msg);
}
+
+ return true;
}
/**
@@ -624,7 +637,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
public void send(ClusterNode node, GridCacheMessage msg, byte plc) throws IgniteCheckedException {
assert !node.isLocal();
- onSend(msg, node.id());
+ if (!onSend(msg, node.id()))
+ return;
if (log.isDebugEnabled())
log.debug("Sending cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
@@ -663,12 +677,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @param msg Message to send.
* @param plc IO policy.
* @param fallback Callback for failed nodes.
- * @return {@code True} if nodes are empty or message was sent, {@code false} if
- * all nodes have left topology while sending this message.
* @throws IgniteCheckedException If send failed.
*/
@SuppressWarnings({"BusyWait", "unchecked"})
- public boolean safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage msg, byte plc,
+ public void safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage msg, byte plc,
@Nullable IgnitePredicate<ClusterNode> fallback) throws IgniteCheckedException {
assert nodes != null;
assert msg != null;
@@ -677,10 +689,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (log.isDebugEnabled())
log.debug("Message will not be sent as collection of nodes is empty: " + msg);
- return true;
+ return;
}
- onSend(msg, null);
+ if (!onSend(msg, null))
+ return;
if (log.isDebugEnabled())
log.debug("Sending cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']');
@@ -709,7 +722,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (fallback != null && !fallback.apply(n))
// If fallback signalled to stop.
- return false;
+ return;
added = true;
}
@@ -721,7 +734,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
log.debug("Message will not be sent because all nodes left topology [msg=" + msg +
", nodes=" + U.toShortString(nodes) + ']');
- return false;
+ return;
}
}
@@ -737,7 +750,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (fallback != null && !fallback.apply(n))
// If fallback signalled to stop.
- return false;
+ return;
added = true;
}
@@ -757,7 +770,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
log.debug("Message will not be sent because all nodes left topology [msg=" + msg + ", nodes=" +
U.toShortString(nodes) + ']');
- return false;
+ return;
}
if (log.isDebugEnabled())
@@ -768,8 +781,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (log.isDebugEnabled())
log.debug("Sent cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']');
-
- return true;
}
/**
@@ -800,7 +811,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
*/
public void sendOrderedMessage(ClusterNode node, Object topic, GridCacheMessage msg, byte plc,
long timeout) throws IgniteCheckedException {
- onSend(msg, node.id());
+ if (!onSend(msg, node.id()))
+ return;
int cnt = 0;
@@ -854,7 +866,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
assert node != null;
assert msg != null;
- onSend(msg, null);
+ if (!onSend(msg, null))
+ return;
try {
cctx.gridIO().send(node, TOPIC_CACHE, msg, plc);
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 4e737a0..55688e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -77,6 +77,13 @@ public abstract class GridCacheMessage implements Message {
protected int cacheId;
/**
+ * @return Error, if any.
+ */
+ @Nullable public Throwable error() {
+ return null;
+ }
+
+ /**
* Gets next ID for indexed message ID.
*
* @return Message ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 555bbda..e2d0302 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -120,6 +120,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
@SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
private IgniteLogger exchLog;
+ /** */
+ private volatile boolean stopping;
+
/** Lock callback. */
@GridToStringExclude
private final GridCacheMvccCallback cb = new GridCacheMvccCallback() {
@@ -325,8 +328,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
/**
* Cancels all client futures.
*/
- public void cancelClientFutures() {
- cancelClientFutures(new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
+ public void onStop() {
+ stopping = true;
+
+ cancelClientFutures(stopError());
}
/** {@inheritDoc} */
@@ -362,6 +367,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @return Node stop exception.
+ */
+ private IgniteCheckedException stopError() {
+ return new IgniteCheckedException("Operation has been cancelled (node is stopping).");
+ }
+
+ /**
* @param from From version.
* @return To version.
*/
@@ -385,8 +397,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']';
- if (cctx.kernalContext().clientDisconnected())
- ((GridFutureAdapter)fut).onDone(disconnectedError(null));
+ onFutureAdded(fut);
}
/**
@@ -507,17 +518,26 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
fut.onNodeLeft(n.id());
}
- if (cctx.kernalContext().clientDisconnected())
- ((GridFutureAdapter)fut).onDone(disconnectedError(null));
-
// Just in case if future was completed before it was added.
if (fut.isDone())
removeFuture(fut);
+ else
+ onFutureAdded(fut);
return true;
}
/**
+ * @param fut Future.
+ */
+ private void onFutureAdded(IgniteInternalFuture<?> fut) {
+ if (stopping)
+ ((GridFutureAdapter)fut).onDone(stopError());
+ else if (cctx.kernalContext().clientDisconnected())
+ ((GridFutureAdapter)fut).onDone(disconnectedError(null));
+ }
+
+ /**
* @param fut Future to remove.
* @return {@code True} if removed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 20340d1..34c571c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -147,6 +147,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
private ExchangeFutureSet exchFuts = new ExchangeFutureSet();
+ /** */
+ private volatile IgniteCheckedException stopErr;
+
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
@@ -381,7 +384,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class);
cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class);
- IgniteCheckedException err = cctx.kernalContext().clientDisconnected() ?
+ stopErr = cctx.kernalContext().clientDisconnected() ?
new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(),
"Client node disconnected: " + cctx.gridName()) :
new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName());
@@ -391,11 +394,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (exchFuts0 != null) {
for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
- f.onDone(err);
+ f.onDone(stopErr);
}
for (AffinityReadyFuture f : readyFuts.values())
- f.onDone(err);
+ f.onDone(stopErr);
+
+ for (GridDhtPartitionsExchangeFuture f : pendingExchangeFuts)
+ f.onDone(stopErr);
+
+ if (locExchFut != null)
+ locExchFut.onDone(stopErr);
U.cancel(exchWorker);
@@ -519,6 +528,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
fut.onDone(topVer);
}
+ else if (stopErr != null)
+ fut.onDone(stopErr);
return fut;
}
@@ -791,6 +802,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (discoEvt != null)
fut.onEvent(exchId, discoEvt);
+ if (stopErr != null)
+ fut.onDone(stopErr);
+
return fut;
}
@@ -799,12 +813,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param err Error.
*/
public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, @Nullable Throwable err) {
- if (err == null) {
- AffinityTopologyVersion topVer = exchFut.topologyVersion();
+ AffinityTopologyVersion topVer = exchFut.topologyVersion();
- if (log.isDebugEnabled())
- log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ']');
+ if (log.isDebugEnabled())
+ log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']');
+ if (err == null) {
while (true) {
AffinityTopologyVersion readyVer = readyTopVer.get();
@@ -825,8 +839,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
}
- else if (log.isDebugEnabled())
- log.debug("Exchange done with error [fut=" + exchFut + ", err=" + err + ']');
+ else {
+ for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
+ if (entry.getKey().compareTo(topVer) <= 0) {
+ if (log.isDebugEnabled())
+ log.debug("Completing created topology ready future with error " +
+ "[ver=" + topVer + ", fut=" + entry.getValue() + ']');
+
+ entry.getValue().onDone(err);
+ }
+ }
+ }
ExchangeFutureSet exchFuts0 = exchFuts;
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 4ae0baa..c92de7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -960,6 +960,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
+ cancelFutures();
+
List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers();
for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = sharedMgrs.listIterator(sharedMgrs.size());
@@ -1323,12 +1325,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
evictMgr,
qryMgr,
contQryMgr,
- affMgr,
dataStructuresMgr,
ttlMgr,
drMgr,
rslvrMgr,
- pluginMgr
+ pluginMgr,
+ affMgr
);
cacheCtx.cacheObjectContext(cacheObjCtx);
@@ -1452,12 +1454,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
evictMgr,
qryMgr,
contQryMgr,
- affMgr,
dataStructuresMgr,
ttlMgr,
drMgr,
rslvrMgr,
- pluginMgr
+ pluginMgr,
+ affMgr
);
cacheCtx.cacheObjectContext(cacheObjCtx);
@@ -2325,9 +2327,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
try {
ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs));
- if (ctx.clientDisconnected())
+ if (ctx.isStopping()) {
+ err = new IgniteCheckedException("Failed to execute dynamic cache change request, " +
+ "node is stopping.");
+ }
+ else if (ctx.clientDisconnected()) {
err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
"Failed to execute dynamic cache change request, client node disconnected.");
+ }
}
catch (IgniteCheckedException e) {
err = e;
@@ -3036,9 +3043,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
try {
ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(Collections.singleton(req)));
- if (ctx.clientDisconnected())
+ if (ctx.isStopping()) {
+ err = new IgniteCheckedException("Failed to execute dynamic cache change request, " +
+ "node is stopping.");
+ }
+ else if (ctx.clientDisconnected()) {
err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
"Failed to execute dynamic cache change request, client node disconnected.");
+ }
}
catch (IgniteCheckedException e) {
err = e;
@@ -3104,8 +3116,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* Cancel all user operations.
*/
- public void cancelUserOperations() {
- sharedCtx.mvcc().cancelClientFutures();
+ private void cancelFutures() {
+ sharedCtx.mvcc().onStop();
Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index cdb878d..8a95b14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -137,10 +137,8 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage {
return futId;
}
- /**
- * @return Error.
- */
- public Throwable error() {
+ /** {@inheritDoc} */
+ @Override public Throwable error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index 4264830..e798458 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -67,10 +67,8 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
this.err = err;
}
- /**
- * @return Error.
- */
- public Throwable error() {
+ /** {@inheritDoc} */
+ @Override public Throwable error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
index c11a3d7..6ade26f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.jetbrains.annotations.Nullable;
/**
* Future that implements a barrier after which dht topology is safe to use. Topology is considered to be
@@ -38,9 +39,10 @@ public interface GridDhtTopologyFuture extends IgniteInternalFuture<AffinityTopo
public AffinityTopologyVersion topologyVersion();
/**
- * Returns is cache topology valid.
+ * Returns error is cache topology is not valid.
+ *
* @param cctx Cache context.
* @return valid ot not.
*/
- public boolean isCacheTopologyValid(GridCacheContext cctx);
+ @Nullable public Throwable validateCache(GridCacheContext cctx);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index b9514a9..1a869e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -1217,7 +1217,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
Throwable err = res.error();
// Log error before sending reply.
- if (err != null && !(err instanceof GridCacheLockTimeoutException))
+ if (err != null && !(err instanceof GridCacheLockTimeoutException) && !ctx.kernalContext().isStopping())
U.error(log, "Failed to acquire lock for request: " + req, err);
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index 33651bc..04d36e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -97,16 +97,15 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
/**
* Sets update error.
- * @param err
+ *
+ * @param err Error.
*/
public void onError(IgniteCheckedException err){
this.err = err;
}
- /**
- * @return Gets update error.
- */
- public IgniteCheckedException error() {
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
return err;
}
@@ -154,8 +153,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
nearEvicted.add(key);
}
- /** {@inheritDoc}
- * @param ctx*/
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index d93f68f..fb2c5ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -385,9 +385,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
if (fut.isDone()) {
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
return;
}
@@ -811,6 +812,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
Exception err = null;
+ GridNearAtomicUpdateRequest singleReq0 = null;
Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
int size = keys.size();
@@ -837,13 +839,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (size == 1 && !fastMap) {
assert remapKeys == null || remapKeys.size() == 1;
- singleReq = mapSingleUpdate();
+ singleReq0 = singleReq = mapSingleUpdate();
}
else {
pendingMappings = mapUpdate(topNodes);
if (pendingMappings.size() == 1)
- singleReq = F.firstValue(pendingMappings);
+ singleReq0 = singleReq = F.firstValue(pendingMappings);
else {
if (syncMode == PRIMARY_SYNC) {
mappings = U.newHashMap(pendingMappings.size());
@@ -874,8 +876,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
// Optimize mapping for single key.
- if (singleReq != null)
- mapSingle(singleReq.nodeId(), singleReq);
+ if (singleReq0 != null)
+ mapSingle(singleReq0.nodeId(), singleReq0);
else {
assert pendingMappings != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 5f5fbb5..ccb67d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -198,6 +198,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
boolean skipStore,
boolean clientReq
) {
+ assert futVer != null;
+
this.cacheId = cacheId;
this.nodeId = nodeId;
this.futVer = futVer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 8bc145c..376f4ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -116,6 +116,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
* @param futVer Future version.
*/
public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer) {
+ assert futVer != null;
+
this.cacheId = cacheId;
this.nodeId = nodeId;
this.futVer = futVer;
@@ -149,16 +151,15 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/**
* Sets update error.
- * @param err
+ *
+ * @param err Error.
*/
public void error(IgniteCheckedException err){
this.err = err;
}
- /**
- * @return Update error, if any.
- */
- public IgniteCheckedException error() {
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 596ec77..1a08265 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
@@ -524,7 +525,22 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDhtColocatedLockFuture.class, this, "inTx", inTx(), "super", super.toString());
+ Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
+ @Override public String apply(IgniteInternalFuture<?> f) {
+ if (isMini(f)) {
+ MiniFuture m = (MiniFuture)f;
+
+ return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
+ }
+ else
+ return "[loc=true, done=" + f.isDone() + "]";
+ }
+ });
+
+ return S.toString(GridDhtColocatedLockFuture.class, this,
+ "innerFuts", futs,
+ "inTx", inTx(),
+ "super", super.toString());
}
/**
@@ -565,9 +581,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
if (topVer != null) {
for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()){
if (fut.topologyVersion().equals(topVer)){
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
return;
}
@@ -612,9 +629,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
if (fut.isDone()) {
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
return;
}
@@ -643,10 +661,15 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
}
else {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
try {
+ fut.get();
+
mapOnTopology(remap, c);
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
finally {
cctx.shared().txContextReset();
}
@@ -1327,8 +1350,13 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
affFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
try {
+ fut.get();
+
remap();
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
finally {
cctx.shared().txContextReset();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 36a2da1..eaed424 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -283,7 +283,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
// Fail the whole thing.
if (e instanceof ClusterTopologyCheckedException)
fut.onResult((ClusterTopologyCheckedException)e);
- else
+ else if (!cctx.kernalContext().isStopping())
fut.onResult(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
index d31f096..93e39ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
@@ -98,10 +98,8 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
this.err = err;
}
- /**
- * @return Error, if any.
- */
- public IgniteCheckedException error() {
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 865bbdc..a1b03c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1081,9 +1081,22 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/** {@inheritDoc} */
- @Override public boolean isCacheTopologyValid(GridCacheContext cctx) {
- return cctx.config().getTopologyValidator() != null && cacheValidRes.containsKey(cctx.cacheId()) ?
- cacheValidRes.get(cctx.cacheId()) : true;
+ @Override public Throwable validateCache(GridCacheContext cctx) {
+ Throwable err = error();
+
+ if (err != null)
+ return err;
+
+ if (cctx.config().getTopologyValidator() != null) {
+ Boolean res = cacheValidRes.get(cctx.cacheId());
+
+ if (res != null && !res) {
+ return new IgniteCheckedException("Failed to perform cache operation " +
+ "(cache topology is not valid): " + cctx.name());
+ }
+ }
+
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index 3276377..d4493a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -163,10 +163,8 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
return topVer != null ? topVer : super.topologyVersion();
}
- /**
- * @return Error.
- */
- public IgniteCheckedException error() {
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index f3e5ca3..dcc8da6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -703,9 +703,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
if (topVer != null) {
for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()){
if (fut.topologyVersion().equals(topVer)){
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
return;
}
@@ -749,9 +750,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
if (fut.isDone()) {
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
return;
}
@@ -777,10 +779,15 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
}
else {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
try {
+ fut.get();
+
mapOnTopology(remap);
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
finally {
cctx.shared().txContextReset();
}
@@ -1435,8 +1442,13 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
affFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
try {
+ fut.get();
+
remap();
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
finally {
cctx.shared().txContextReset();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 2048fdf..25028c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -319,7 +319,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
assert ctx != null : cacheId;
- if (!topFut.isCacheTopologyValid(ctx)) {
+ Throwable err = topFut.validateCache(ctx);
+
+ if (err != null) {
if (invalidCaches != null)
invalidCaches.append(", ");
else
@@ -343,12 +345,17 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
}
else {
topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
try {
+ fut.get();
+
prepareOnTopology(remap, c);
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
finally {
cctx.txContextReset();
}
@@ -841,7 +848,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
if (affFut != null && !affFut.isDone()) {
affFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
- remap();
+ try {
+ fut.get();
+
+ remap();
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index cec7d73..c860baa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -75,10 +75,8 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
this.err = err;
}
- /**
- * @return Error.
- */
- @Nullable public Throwable error() {
+ /** {@inheritDoc} */
+ @Nullable @Override public Throwable error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
index 3e4cdeb..78e2ac7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
@@ -193,10 +193,8 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
return reqId;
}
- /**
- * @return Error.
- */
- public Throwable error() {
+ /** {@inheritDoc} */
+ @Override public Throwable error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index df6b4b7..c99e07f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -97,6 +98,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/** Whether to skip primary check for REPLICATED cache. */
private transient boolean skipPrimaryCheck;
+ /** */
+ private transient int cacheId;
+
/**
* Required by {@link Externalizable}.
*/
@@ -145,6 +149,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
this.ignoreExpired = ignoreExpired;
this.taskHash = taskHash;
this.skipPrimaryCheck = skipPrimaryCheck;
+
+ cacheId = CU.cacheId(cacheName);
}
/** {@inheritDoc} */
@@ -457,6 +463,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
sync = in.readBoolean();
ignoreExpired = in.readBoolean();
taskHash = in.readInt();
+
+ cacheId = CU.cacheId(cacheName);
}
/**
@@ -466,9 +474,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) {
assert ctx != null;
- GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName);
-
- return cache == null ? null : cache.context();
+ return ctx.cache().<K, V>context().cacheContext(cacheId);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 00b91dd..6ca1f72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1105,6 +1105,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/**
* Commits transaction to transaction manager. Used for one-phase commit transactions only.
+ *
+ * @param commit If {@code true} commits transaction, otherwise rollbacks.
*/
public void tmFinish(boolean commit) {
assert onePhaseCommit();
@@ -1118,7 +1120,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
state(commit ? COMMITTED : ROLLED_BACK);
- boolean needsCompletedVersions = needsCompletedVersions();
+ boolean needsCompletedVersions = commit && needsCompletedVersions();
assert !needsCompletedVersions || completedBase != null;
assert !needsCompletedVersions || committedVers != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
index 2aae6ef..6bfd4fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.util;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
/**
@@ -76,6 +77,15 @@ public class GridSpinBusyLock {
}
/**
+ * @param millis Timeout.
+ * @return {@code True} if lock was acquired.
+ * @throws InterruptedException If interrupted.
+ */
+ public boolean tryBlock(long millis) throws InterruptedException {
+ return lock.tryWriteLock(millis, TimeUnit.MILLISECONDS);
+ }
+
+ /**
* Makes possible for activities entering busy state again.
*/
public void unblock() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
index 6b4d473..151167a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -184,20 +185,29 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
String val = "value-" + k;
- cache.invoke(key, new Processor(val));
+ procs.put(key, new Processor(val));
}
- cache.invokeAll(procs);
+ Map<String, EntryProcessorResult<Integer>> resMap = cache.invokeAll(procs);
+
+ for (String key : procs.keySet()) {
+ EntryProcessorResult<Integer> res = resMap.get(key);
+
+ assertNotNull(res);
+ assertEquals(k + 1, (Object) res.get());
+ }
}
else {
+ IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+
for (int i = 0; i < NUM_SETS; i++) {
String key = "set-" + i;
String val = "value-" + k;
- IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+ Integer valsCnt = cache.invoke(key, new Processor(val));
- cache.invoke(key, new Processor(val));
+ assertEquals(k + 1, (Object)valsCnt);
}
}
}
@@ -275,7 +285,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
}
/** */
- private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable {
+ private static class Processor implements EntryProcessor<String, Set<String>, Integer>, Serializable {
/** */
private String val;
@@ -287,7 +297,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
}
/** {@inheritDoc} */
- @Override public Void process(MutableEntry<String, Set<String>> e, Object... args) {
+ @Override public Integer process(MutableEntry<String, Set<String>> e, Object... args) {
Set<String> vals = e.getValue();
if (vals == null)
@@ -297,7 +307,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
e.setValue(vals);
- return null;
+ return vals.size();
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 7aae48c..88605b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -79,12 +79,12 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
new GridCacheEvictionManager(),
new GridCacheLocalQueryManager<K, V>(),
new CacheContinuousQueryManager(),
- new GridCacheAffinityManager(),
new CacheDataStructuresManager(),
new GridCacheTtlManager(),
new GridOsCacheDrManager(),
new CacheOsConflictResolutionManager<K, V>(),
- new CachePluginManager(ctx, new CacheConfiguration())
+ new CachePluginManager(ctx, new CacheConfiguration()),
+ new GridCacheAffinityManager()
);
store().initialize(null, new IdentityHashMap<CacheStore, ThreadLocal>());
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
index 1276405..e00611b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
@@ -185,8 +185,6 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest
* @throws Exception If failed.
*/
public void testRestarts() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-1452");
-
int duration = 90 * 1000;
int qryThreadNum = 4;
int restartThreadsNum = 2; // 4 + 2 = 6 nodes