You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/02 08:22:58 UTC

[1/3] ignite git commit: ignite-4705

Repository: ignite
Updated Branches:
  refs/heads/ignite-4705-1 11d0b8423 -> 5215ed4ca


ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19c340ce
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19c340ce
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19c340ce

Branch: refs/heads/ignite-4705-1
Commit: 19c340ce21f013dce0155e93a6b7fe89adbd1def
Parents: 9e93f19
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 2 11:20:15 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 2 11:20:15 2017 +0300

----------------------------------------------------------------------
 .../GridNearAtomicAbstractUpdateFuture.java     | 54 +++--------
 .../GridNearAtomicSingleUpdateFuture.java       | 91 +++++++++---------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 98 ++++++++++----------
 3 files changed, 103 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/19c340ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 9f7512c..204e510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -212,18 +212,14 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             // Cannot remap.
             remapCnt = 1;
 
-            Long futId = addAtomicFuture(topVer);
-
-            if (futId != null)
-                map(topVer, futId);
+            map(topVer);
         }
     }
 
     /**
      * @param topVer Topology version.
-     * @param futId Future ID.
      */
-    protected abstract void map(AffinityTopologyVersion topVer, Long futId);
+    protected abstract void map(AffinityTopologyVersion topVer);
 
     /**
      * Maps future on ready topology.
@@ -248,7 +244,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     /**
      * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
      */
-    protected boolean storeFuture() {
+    final boolean storeFuture() {
         return syncMode != FULL_ASYNC;
     }
 
@@ -258,7 +254,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @param nodeId Node ID.
      * @param req Request.
      */
-    protected void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
+    final void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
         if (cctx.localNodeId().equals(nodeId)) {
             cache.updateAllAsyncInternal(nodeId, req,
                 new GridDhtAtomicCache.UpdateReplyClosure() {
@@ -318,43 +314,15 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @param req Request.
      * @param e Error.
      */
-    protected final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) {
-        synchronized (mux) {
-            GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
-                req.nodeId(),
-                req.futureId(),
-                cctx.deploymentEnabled());
-
-            res.addFailedKeys(req.keys(), e);
-
-            onPrimaryResponse(req.nodeId(), res, true);
-        }
-    }
+    final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) {
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+            req.nodeId(),
+            req.futureId(),
+            cctx.deploymentEnabled());
 
-    /**
-     * Adds future prevents topology change before operation complete.
-     * Should be invoked before topology lock released.
-     *
-     * @param topVer Topology version.
-     * @return Future ID in case future added.
-     */
-    final Long addAtomicFuture(AffinityTopologyVersion topVer) {
-        // TODO IGNITE-4705: it seems no need to add future inside read lock.
-
-        Long futId = cctx.mvcc().atomicFutureId();
-
-        synchronized (mux) {
-            assert this.futId == null : this;
-            assert this.topVer == AffinityTopologyVersion.ZERO : this;
-
-            this.topVer = topVer;
-            this.futId = futId;
-        }
-
-        if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this))
-            return null;
+        res.addFailedKeys(req.keys(), e);
 
-        return futId;
+        onPrimaryResponse(req.nodeId(), res, true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/19c340ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 7a18328..b1b951f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -454,65 +454,55 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
     /** {@inheritDoc} */
     @Override protected void mapOnTopology() {
-        // TODO IGNITE-4705: primary should block topology change, so it seems read lock is not needed.
-        cache.topology().readLock();
-
         AffinityTopologyVersion topVer;
-        Long futId;
-
-        try {
-            if (cache.topology().stopping()) {
-                onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
-                    cache.name()));
-
-                return;
-            }
 
-            GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+        if (cache.topology().stopping()) {
+            onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+                cache.name()));
 
-            if (fut.isDone()) {
-                Throwable err = fut.validateCache(cctx);
-
-                if (err != null) {
-                    onDone(err);
+            return;
+        }
 
-                    return;
-                }
+        GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
 
-                topVer = fut.topologyVersion();
+        if (fut.isDone()) {
+            Throwable err = fut.validateCache(cctx);
 
-                futId = addAtomicFuture(topVer);
-            }
-            else {
-                if (waitTopFut) {
-                    assert !topLocked : this;
-
-                    fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                                @Override public void run() {
-                                    mapOnTopology();
-                                }
-                            });
-                        }
-                    });
-                }
-                else
-                    onDone(new GridCacheTryPutFailedException());
+            if (err != null) {
+                onDone(err);
 
                 return;
             }
+
+            topVer = fut.topologyVersion();
         }
-        finally {
-            cache.topology().readUnlock();
+        else {
+            if (waitTopFut) {
+                assert !topLocked : this;
+
+                fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                            @Override public void run() {
+                                mapOnTopology();
+                            }
+                        });
+                    }
+                });
+            }
+            else
+                onDone(new GridCacheTryPutFailedException());
+
+            return;
         }
 
-        if (futId != null)
-            map(topVer, futId);
+        map(topVer);
     }
 
     /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer, Long futId) {
+    @Override protected void map(AffinityTopologyVersion topVer) {
+        Long futId = cctx.mvcc().atomicFutureId();
+
         Exception err = null;
         GridNearAtomicAbstractUpdateRequest singleReq0 = null;
 
@@ -520,11 +510,20 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             singleReq0 = mapSingleUpdate(topVer, futId);
 
             synchronized (mux) {
-                assert this.futId.equals(futId) || (this.isDone() && this.error() != null);
-                assert this.topVer == topVer;
+                assert this.futId == null : this;
+                assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+                this.topVer = topVer;
+                this.futId = futId;
 
                 reqState = new PrimaryRequestState(singleReq0);
             }
+
+            if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) {
+                assert isDone();
+
+                return;
+            }
         }
         catch (Exception e) {
             err = e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/19c340ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 89b2573..573cb40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -687,59 +687,47 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     @Override protected void mapOnTopology() {
         AffinityTopologyVersion topVer;
 
-        Long futId;
+        if (cache.topology().stopping()) {
+            onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+                cache.name()));
 
-        cache.topology().readLock();
-
-        try {
-            if (cache.topology().stopping()) {
-                onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
-                    cache.name()));
-
-                return;
-            }
-
-            GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
-
-            if (fut.isDone()) {
-                Throwable err = fut.validateCache(cctx);
-
-                if (err != null) {
-                    onDone(err);
+            return;
+        }
 
-                    return;
-                }
+        GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
 
-                topVer = fut.topologyVersion();
+        if (fut.isDone()) {
+            Throwable err = fut.validateCache(cctx);
 
-                futId = addAtomicFuture(topVer);
-            }
-            else {
-                if (waitTopFut) {
-                    assert !topLocked : this;
-
-                    fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                                @Override public void run() {
-                                    mapOnTopology();
-                                }
-                            });
-                        }
-                    });
-                }
-                else
-                    onDone(new GridCacheTryPutFailedException());
+            if (err != null) {
+                onDone(err);
 
                 return;
             }
+
+            topVer = fut.topologyVersion();
         }
-        finally {
-            cache.topology().readUnlock();
+        else {
+            if (waitTopFut) {
+                assert !topLocked : this;
+
+                fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                            @Override public void run() {
+                                mapOnTopology();
+                            }
+                        });
+                    }
+                });
+            }
+            else
+                onDone(new GridCacheTryPutFailedException());
+
+            return;
         }
 
-        if (futId != null)
-            map(topVer, futId, remapKeys);
+        map(topVer, remapKeys);
     }
 
     /**
@@ -799,18 +787,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer, Long futId) {
-        map(topVer, futId, null);
+    @Override protected void map(AffinityTopologyVersion topVer) {
+        map(topVer, null);
     }
 
     /**
      * @param topVer Topology version.
-     * @param futId Future ID.
      * @param remapKeys Keys to remap.
      */
-    void map(AffinityTopologyVersion topVer,
-        Long futId,
-        @Nullable Collection<KeyCacheObject> remapKeys) {
+    void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {
@@ -820,6 +805,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             return;
         }
 
+        Long futId = cctx.mvcc().atomicFutureId();
+
         Exception err = null;
         PrimaryRequestState singleReq0 = null;
         Map<UUID, PrimaryRequestState> mappings0 = null;
@@ -848,8 +835,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             }
 
             synchronized (mux) {
-                assert this.futId.equals(futId) || (this.isDone() && this.error() != null);
-                assert this.topVer == topVer;
+                assert this.futId == null : this;
+                assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+                this.topVer = topVer;
+                this.futId = futId;
 
                 resCnt = 0;
 
@@ -858,6 +848,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
                 this.remapKeys = null;
             }
+
+            if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) {
+                assert isDone();
+
+                return;
+            }
         }
         catch (Exception e) {
             err = e;


[2/3] ignite git commit: Merge branch 'ignite-4705' into ignite-4705-1

Posted by sb...@apache.org.
Merge branch 'ignite-4705' into ignite-4705-1


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/68eae795
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/68eae795
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/68eae795

Branch: refs/heads/ignite-4705-1
Commit: 68eae7955b766f72e1668b143d21932f6d1c4181
Parents: 11d0b84 19c340c
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 2 11:22:07 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 2 11:22:07 2017 +0300

----------------------------------------------------------------------
 .../GridNearAtomicAbstractUpdateFuture.java     | 54 +++--------
 .../GridNearAtomicSingleUpdateFuture.java       | 91 +++++++++---------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 98 ++++++++++----------
 3 files changed, 103 insertions(+), 140 deletions(-)
----------------------------------------------------------------------



[3/3] ignite git commit: ignite-4705

Posted by sb...@apache.org.
ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5215ed4c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5215ed4c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5215ed4c

Branch: refs/heads/ignite-4705-1
Commit: 5215ed4ca9e95a8afc9c5829f4d3b73d028368c6
Parents: 68eae79
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 2 11:22:51 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 2 11:22:51 2017 +0300

----------------------------------------------------------------------
 .../GridNearAtomicSingleUpdateFuture.java       | 40 +++++-----
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 82 ++++++++++----------
 2 files changed, 61 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5215ed4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index b1b951f..6152faf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -203,26 +203,26 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
     /** {@inheritDoc} */
     @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
-        GridCacheReturn opRes0;
-        CachePartialUpdateCheckedException err0;
-        AffinityTopologyVersion remapTopVer0;
-
-        synchronized (mux) {
-            if (futId == null || futId != res.futureId())
-                return;
-
-            assert reqState != null;
-
-            if (reqState.onMappingReceived(cctx, res)) {
-                opRes0 = opRes;
-                err0 = err;
-                remapTopVer0 = onAllReceived();
-            }
-            else
-                return;
-        }
-
-        finishUpdateFuture(opRes0, err0, remapTopVer0);
+//        GridCacheReturn opRes0;
+//        CachePartialUpdateCheckedException err0;
+//        AffinityTopologyVersion remapTopVer0;
+//
+//        synchronized (mux) {
+//            if (futId == null || futId != res.futureId())
+//                return;
+//
+//            assert reqState != null;
+//
+//            if (reqState.onMappingReceived(cctx, res)) {
+//                opRes0 = opRes;
+//                err0 = err;
+//                remapTopVer0 = onAllReceived();
+//            }
+//            else
+//                return;
+//        }
+//
+//        finishUpdateFuture(opRes0, err0, remapTopVer0);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5215ed4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 573cb40..44d3238 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -285,47 +285,47 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
     /** {@inheritDoc} */
     @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
-        GridCacheReturn opRes0;
-        CachePartialUpdateCheckedException err0;
-        AffinityTopologyVersion remapTopVer0;
-
-        synchronized (mux) {
-            if (futId == null || futId != res.futureId())
-                return;
-
-            PrimaryRequestState reqState;
-
-            if (singleReq != null) {
-                if (singleReq.onMappingReceived(cctx, res)) {
-                    opRes0 = opRes;
-                    err0 = err;
-                    remapTopVer0 = onAllReceived();
-                }
-                else
-                    return;
-            }
-            else {
-                reqState = mappings != null ? mappings.get(nodeId) : null;
-
-                if (reqState != null && reqState.onMappingReceived(cctx, res)) {
-                    assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
-
-                    resCnt++;
-
-                    if (mappings.size() == resCnt) {
-                        opRes0 = opRes;
-                        err0 = err;
-                        remapTopVer0 = onAllReceived();
-                    }
-                    else
-                        return;
-                }
-                else
-                    return;
-            }
-        }
-
-        finishUpdateFuture(opRes0, err0, remapTopVer0);
+//        GridCacheReturn opRes0;
+//        CachePartialUpdateCheckedException err0;
+//        AffinityTopologyVersion remapTopVer0;
+//
+//        synchronized (mux) {
+//            if (futId == null || futId != res.futureId())
+//                return;
+//
+//            PrimaryRequestState reqState;
+//
+//            if (singleReq != null) {
+//                if (singleReq.onMappingReceived(cctx, res)) {
+//                    opRes0 = opRes;
+//                    err0 = err;
+//                    remapTopVer0 = onAllReceived();
+//                }
+//                else
+//                    return;
+//            }
+//            else {
+//                reqState = mappings != null ? mappings.get(nodeId) : null;
+//
+//                if (reqState != null && reqState.onMappingReceived(cctx, res)) {
+//                    assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+//
+//                    resCnt++;
+//
+//                    if (mappings.size() == resCnt) {
+//                        opRes0 = opRes;
+//                        err0 = err;
+//                        remapTopVer0 = onAllReceived();
+//                    }
+//                    else
+//                        return;
+//                }
+//                else
+//                    return;
+//            }
+//        }
+//
+//        finishUpdateFuture(opRes0, err0, remapTopVer0);
     }
 
     /** {@inheritDoc} */