You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/27 16:39:25 UTC
[12/15] ignite git commit: 5578
5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/18f4929a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/18f4929a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/18f4929a
Branch: refs/heads/ignite-5578
Commit: 18f4929a615ca20c3107a1a76be8e2de6b78ad73
Parents: da0c884
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 27 15:27:07 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 27 15:27:07 2017 +0300
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 125 ++++++++++++++-----
1 file changed, 94 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/18f4929a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index be4aace..75d060f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -38,6 +38,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.pagemem.wal.StorageException;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -106,6 +108,7 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
@@ -1683,7 +1686,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- updateAllAsyncInternal0(node, req, completionCb);
+ for (;;) {
+ if (updateAllAsyncInternal0(node, req, completionCb))
+ break;
+ }
}
else {
forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
@@ -1700,7 +1706,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- updateAllAsyncInternal0(node, req, completionCb);
+ for (;;) {
+ if (updateAllAsyncInternal0(node, req, completionCb))
+ break;
+ }
}
});
}
@@ -1735,12 +1744,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param node Node.
* @param req Update request.
* @param completionCb Completion callback.
+ * @return {@code True} if update was executed, {@code false} if need retry update.
*/
- private void updateAllAsyncInternal0(
+ private boolean updateAllAsyncInternal0(
ClusterNode node,
GridNearAtomicAbstractUpdateRequest req,
UpdateReplyClosure completionCb
) {
+ if (waitForTopologyFuture(node, req, completionCb))
+ return true;
+
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
node.id(),
req.futureId(),
@@ -1750,9 +1763,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
- GridDhtAtomicAbstractUpdateFuture dhtFut = null;
+ GridDhtTopologyFuture topFut = null;
- boolean remap = false;
+ GridDhtAtomicAbstractUpdateFuture dhtFut = null;
IgniteCacheExpiryPolicy expiry = null;
@@ -1778,23 +1791,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb.apply(req, res);
- return;
+ return true;
}
- // Do not check topology version if topology was locked on near node by
- // external transaction or explicit lock.
- if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
- DhtAtomicUpdateResult updRes = update(node, locked, req, res);
+ topFut = top.topologyVersionFuture();
- dhtFut = updRes.dhtFuture();
- deleted = updRes.deleted();
- expiry = updRes.expiryPolicy();
- }
- else {
- // Should remap all keys.
- remap = true;
+ if (topFut.isDone()) {
+ topFut = null;
+
+ // Do not check topology version if topology was locked on near node by
+ // external transaction or explicit lock.
+ if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+ DhtAtomicUpdateResult updRes = update(node, locked, req, res);
- res.remapTopologyVersion(top.topologyVersion());
+ dhtFut = updRes.dhtFuture();
+ deleted = updRes.deleted();
+ expiry = updRes.expiryPolicy();
+ }
+ else
+ // Should remap all keys.
+ res.remapTopologyVersion(top.topologyVersion());
}
}
finally {
@@ -1829,8 +1845,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (log.isDebugEnabled())
log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req);
- remap = true;
-
res.remapTopologyVersion(ctx.topology().topologyVersion());
}
catch (Throwable e) {
@@ -1845,26 +1859,75 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (e instanceof Error)
throw (Error)e;
- return;
+ return true;
}
finally {
ctx.shared().database().checkpointReadUnlock();
}
- if (remap) {
- assert dhtFut == null;
+ if (topFut == null) {
+ if (res.remapTopologyVersion() != null) {
+ assert dhtFut == null;
- completionCb.apply(req, res);
- }
- else {
- if (dhtFut != null)
- dhtFut.map(node, res.returnValue(), res, completionCb);
+ completionCb.apply(req, res);
+ }
+ else {
+ if (dhtFut != null)
+ dhtFut.map(node, res.returnValue(), res, completionCb);
+ }
+
+ if (req.writeSynchronizationMode() != FULL_ASYNC)
+ req.cleanup(!node.isLocal());
+
+ sendTtlUpdateRequest(expiry);
+
+ return true;
}
+ else
+ return waitForTopologyFuture(node, req, completionCb);
+ }
+
+ /**
+ * @param node Sender node.
+ * @param req Request.
+ * @param completionCb Completion callback.
+ * @return {@code True} if update will be retried from future listener.
+ */
+ private boolean waitForTopologyFuture(final ClusterNode node,
+ final GridNearAtomicAbstractUpdateRequest req,
+ final UpdateReplyClosure completionCb) {
+ GridDhtTopologyFuture topFut = ctx.group().topology().topologyVersionFuture();
+
+ if (!topFut.isDone()) {
+ Thread curThread = Thread.currentThread();
+
+ if (curThread instanceof IgniteThread) {
+ IgniteThread thread = (IgniteThread)curThread;
+
+ if (thread.policy() == GridIoPolicy.SYSTEM_POOL) {
+ topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ ctx.closures().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ updateAllAsyncInternal0(node, req, completionCb);
+ }
+ });
+ }
+ });
+
+ return true;
+ }
+ }
- if (req.writeSynchronizationMode() != FULL_ASYNC)
- req.cleanup(!node.isLocal());
+ try {
+ topFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Topology future failed: " + e, e);
+ }
+ }
- sendTtlUpdateRequest(expiry);
+ return false;
}
/**