You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/27 16:39:25 UTC

[12/15] ignite git commit: 5578

5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/18f4929a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/18f4929a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/18f4929a

Branch: refs/heads/ignite-5578
Commit: 18f4929a615ca20c3107a1a76be8e2de6b78ad73
Parents: da0c884
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 27 15:27:07 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 27 15:27:07 2017 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 125 ++++++++++++++-----
 1 file changed, 94 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/18f4929a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index be4aace..75d060f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -38,6 +38,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.pagemem.wal.StorageException;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -106,6 +108,7 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.thread.IgniteThread;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 
@@ -1683,7 +1686,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 return;
             }
 
-            updateAllAsyncInternal0(node, req, completionCb);
+            for (;;) {
+                if (updateAllAsyncInternal0(node, req, completionCb))
+                    break;
+            }
         }
         else {
             forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
@@ -1700,7 +1706,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         return;
                     }
 
-                    updateAllAsyncInternal0(node, req, completionCb);
+                    for (;;) {
+                        if (updateAllAsyncInternal0(node, req, completionCb))
+                            break;
+                    }
                 }
             });
         }
@@ -1735,12 +1744,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param node Node.
      * @param req Update request.
      * @param completionCb Completion callback.
+     * @return {@code True} if update was executed, {@code false} if need retry update.
      */
-    private void updateAllAsyncInternal0(
+    private boolean updateAllAsyncInternal0(
         ClusterNode node,
         GridNearAtomicAbstractUpdateRequest req,
         UpdateReplyClosure completionCb
     ) {
+        if (waitForTopologyFuture(node, req, completionCb))
+            return true;
+
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
             node.id(),
             req.futureId(),
@@ -1750,9 +1763,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
 
-        GridDhtAtomicAbstractUpdateFuture dhtFut = null;
+        GridDhtTopologyFuture topFut = null;
 
-        boolean remap = false;
+        GridDhtAtomicAbstractUpdateFuture dhtFut = null;
 
         IgniteCacheExpiryPolicy expiry = null;
 
@@ -1778,23 +1791,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         completionCb.apply(req, res);
 
-                        return;
+                        return true;
                     }
 
-                    // Do not check topology version if topology was locked on near node by
-                    // external transaction or explicit lock.
-                    if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
-                        DhtAtomicUpdateResult updRes = update(node, locked, req, res);
+                    topFut = top.topologyVersionFuture();
 
-                        dhtFut = updRes.dhtFuture();
-                        deleted = updRes.deleted();
-                        expiry = updRes.expiryPolicy();
-                    }
-                    else {
-                        // Should remap all keys.
-                        remap = true;
+                    if (topFut.isDone()) {
+                        topFut = null;
+
+                        // Do not check topology version if topology was locked on near node by
+                        // external transaction or explicit lock.
+                        if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+                            DhtAtomicUpdateResult updRes = update(node, locked, req, res);
 
-                        res.remapTopologyVersion(top.topologyVersion());
+                            dhtFut = updRes.dhtFuture();
+                            deleted = updRes.deleted();
+                            expiry = updRes.expiryPolicy();
+                        }
+                        else
+                            // Should remap all keys.
+                            res.remapTopologyVersion(top.topologyVersion());
                     }
                 }
                 finally {
@@ -1829,8 +1845,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             if (log.isDebugEnabled())
                 log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req);
 
-            remap = true;
-
             res.remapTopologyVersion(ctx.topology().topologyVersion());
         }
         catch (Throwable e) {
@@ -1845,26 +1859,75 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             if (e instanceof Error)
                 throw (Error)e;
 
-            return;
+            return true;
         }
         finally {
             ctx.shared().database().checkpointReadUnlock();
         }
 
-        if (remap) {
-            assert dhtFut == null;
+        if (topFut == null) {
+            if (res.remapTopologyVersion() != null) {
+                assert dhtFut == null;
 
-            completionCb.apply(req, res);
-        }
-        else {
-            if (dhtFut != null)
-                dhtFut.map(node, res.returnValue(), res, completionCb);
+                completionCb.apply(req, res);
+            }
+            else {
+                if (dhtFut != null)
+                    dhtFut.map(node, res.returnValue(), res, completionCb);
+            }
+
+            if (req.writeSynchronizationMode() != FULL_ASYNC)
+                req.cleanup(!node.isLocal());
+
+            sendTtlUpdateRequest(expiry);
+
+            return true;
         }
+        else
+            return waitForTopologyFuture(node, req, completionCb);
+    }
+
+    /**
+     * @param node Sender node.
+     * @param req Request.
+     * @param completionCb Completion callback.
+     * @return {@code True} if update will be retried from future listener.
+     */
+    private boolean waitForTopologyFuture(final ClusterNode node,
+        final GridNearAtomicAbstractUpdateRequest req,
+        final UpdateReplyClosure completionCb) {
+        GridDhtTopologyFuture topFut = ctx.group().topology().topologyVersionFuture();
+
+        if (!topFut.isDone()) {
+            Thread curThread = Thread.currentThread();
+
+            if (curThread instanceof IgniteThread) {
+                IgniteThread thread = (IgniteThread)curThread;
+
+                if (thread.policy() == GridIoPolicy.SYSTEM_POOL) {
+                    topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                            ctx.closures().runLocalSafe(new Runnable() {
+                                @Override public void run() {
+                                    updateAllAsyncInternal0(node, req, completionCb);
+                                }
+                            });
+                        }
+                    });
+
+                    return true;
+                }
+            }
 
-        if (req.writeSynchronizationMode() != FULL_ASYNC)
-            req.cleanup(!node.isLocal());
+            try {
+                topFut.get();
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Topology future failed: " + e, e);
+            }
+        }
 
-        sendTtlUpdateRequest(expiry);
+        return false;
     }
 
     /**