You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/02 08:22:58 UTC
[1/3] ignite git commit: ignite-4705
Repository: ignite
Updated Branches:
refs/heads/ignite-4705-1 11d0b8423 -> 5215ed4ca
ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19c340ce
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19c340ce
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19c340ce
Branch: refs/heads/ignite-4705-1
Commit: 19c340ce21f013dce0155e93a6b7fe89adbd1def
Parents: 9e93f19
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 2 11:20:15 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 2 11:20:15 2017 +0300
----------------------------------------------------------------------
.../GridNearAtomicAbstractUpdateFuture.java | 54 +++--------
.../GridNearAtomicSingleUpdateFuture.java | 91 +++++++++---------
.../dht/atomic/GridNearAtomicUpdateFuture.java | 98 ++++++++++----------
3 files changed, 103 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/19c340ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 9f7512c..204e510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -212,18 +212,14 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
// Cannot remap.
remapCnt = 1;
- Long futId = addAtomicFuture(topVer);
-
- if (futId != null)
- map(topVer, futId);
+ map(topVer);
}
}
/**
* @param topVer Topology version.
- * @param futId Future ID.
*/
- protected abstract void map(AffinityTopologyVersion topVer, Long futId);
+ protected abstract void map(AffinityTopologyVersion topVer);
/**
* Maps future on ready topology.
@@ -248,7 +244,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
/**
* @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
*/
- protected boolean storeFuture() {
+ final boolean storeFuture() {
return syncMode != FULL_ASYNC;
}
@@ -258,7 +254,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* @param nodeId Node ID.
* @param req Request.
*/
- protected void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
+ final void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
if (cctx.localNodeId().equals(nodeId)) {
cache.updateAllAsyncInternal(nodeId, req,
new GridDhtAtomicCache.UpdateReplyClosure() {
@@ -318,43 +314,15 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* @param req Request.
* @param e Error.
*/
- protected final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) {
- synchronized (mux) {
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- req.nodeId(),
- req.futureId(),
- cctx.deploymentEnabled());
-
- res.addFailedKeys(req.keys(), e);
-
- onPrimaryResponse(req.nodeId(), res, true);
- }
- }
+ final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) {
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ req.nodeId(),
+ req.futureId(),
+ cctx.deploymentEnabled());
- /**
- * Adds future prevents topology change before operation complete.
- * Should be invoked before topology lock released.
- *
- * @param topVer Topology version.
- * @return Future ID in case future added.
- */
- final Long addAtomicFuture(AffinityTopologyVersion topVer) {
- // TODO IGNITE-4705: it seems no need to add future inside read lock.
-
- Long futId = cctx.mvcc().atomicFutureId();
-
- synchronized (mux) {
- assert this.futId == null : this;
- assert this.topVer == AffinityTopologyVersion.ZERO : this;
-
- this.topVer = topVer;
- this.futId = futId;
- }
-
- if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this))
- return null;
+ res.addFailedKeys(req.keys(), e);
- return futId;
+ onPrimaryResponse(req.nodeId(), res, true);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/19c340ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 7a18328..b1b951f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -454,65 +454,55 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
/** {@inheritDoc} */
@Override protected void mapOnTopology() {
- // TODO IGNITE-4705: primary should block topology change, so it seems read lock is not needed.
- cache.topology().readLock();
-
AffinityTopologyVersion topVer;
- Long futId;
-
- try {
- if (cache.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- cache.name()));
-
- return;
- }
- GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+ if (cache.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ cache.name()));
- if (fut.isDone()) {
- Throwable err = fut.validateCache(cctx);
-
- if (err != null) {
- onDone(err);
+ return;
+ }
- return;
- }
+ GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
- topVer = fut.topologyVersion();
+ if (fut.isDone()) {
+ Throwable err = fut.validateCache(cctx);
- futId = addAtomicFuture(topVer);
- }
- else {
- if (waitTopFut) {
- assert !topLocked : this;
-
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- mapOnTopology();
- }
- });
- }
- });
- }
- else
- onDone(new GridCacheTryPutFailedException());
+ if (err != null) {
+ onDone(err);
return;
}
+
+ topVer = fut.topologyVersion();
}
- finally {
- cache.topology().readUnlock();
+ else {
+ if (waitTopFut) {
+ assert !topLocked : this;
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology();
+ }
+ });
+ }
+ });
+ }
+ else
+ onDone(new GridCacheTryPutFailedException());
+
+ return;
}
- if (futId != null)
- map(topVer, futId);
+ map(topVer);
}
/** {@inheritDoc} */
- @Override protected void map(AffinityTopologyVersion topVer, Long futId) {
+ @Override protected void map(AffinityTopologyVersion topVer) {
+ Long futId = cctx.mvcc().atomicFutureId();
+
Exception err = null;
GridNearAtomicAbstractUpdateRequest singleReq0 = null;
@@ -520,11 +510,20 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
singleReq0 = mapSingleUpdate(topVer, futId);
synchronized (mux) {
- assert this.futId.equals(futId) || (this.isDone() && this.error() != null);
- assert this.topVer == topVer;
+ assert this.futId == null : this;
+ assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+ this.topVer = topVer;
+ this.futId = futId;
reqState = new PrimaryRequestState(singleReq0);
}
+
+ if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) {
+ assert isDone();
+
+ return;
+ }
}
catch (Exception e) {
err = e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/19c340ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 89b2573..573cb40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -687,59 +687,47 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
@Override protected void mapOnTopology() {
AffinityTopologyVersion topVer;
- Long futId;
+ if (cache.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ cache.name()));
- cache.topology().readLock();
-
- try {
- if (cache.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- cache.name()));
-
- return;
- }
-
- GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
-
- if (fut.isDone()) {
- Throwable err = fut.validateCache(cctx);
-
- if (err != null) {
- onDone(err);
+ return;
+ }
- return;
- }
+ GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
- topVer = fut.topologyVersion();
+ if (fut.isDone()) {
+ Throwable err = fut.validateCache(cctx);
- futId = addAtomicFuture(topVer);
- }
- else {
- if (waitTopFut) {
- assert !topLocked : this;
-
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- mapOnTopology();
- }
- });
- }
- });
- }
- else
- onDone(new GridCacheTryPutFailedException());
+ if (err != null) {
+ onDone(err);
return;
}
+
+ topVer = fut.topologyVersion();
}
- finally {
- cache.topology().readUnlock();
+ else {
+ if (waitTopFut) {
+ assert !topLocked : this;
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology();
+ }
+ });
+ }
+ });
+ }
+ else
+ onDone(new GridCacheTryPutFailedException());
+
+ return;
}
- if (futId != null)
- map(topVer, futId, remapKeys);
+ map(topVer, remapKeys);
}
/**
@@ -799,18 +787,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
- @Override protected void map(AffinityTopologyVersion topVer, Long futId) {
- map(topVer, futId, null);
+ @Override protected void map(AffinityTopologyVersion topVer) {
+ map(topVer, null);
}
/**
* @param topVer Topology version.
- * @param futId Future ID.
* @param remapKeys Keys to remap.
*/
- void map(AffinityTopologyVersion topVer,
- Long futId,
- @Nullable Collection<KeyCacheObject> remapKeys) {
+ void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
if (F.isEmpty(topNodes)) {
@@ -820,6 +805,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
return;
}
+ Long futId = cctx.mvcc().atomicFutureId();
+
Exception err = null;
PrimaryRequestState singleReq0 = null;
Map<UUID, PrimaryRequestState> mappings0 = null;
@@ -848,8 +835,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
synchronized (mux) {
- assert this.futId.equals(futId) || (this.isDone() && this.error() != null);
- assert this.topVer == topVer;
+ assert this.futId == null : this;
+ assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+ this.topVer = topVer;
+ this.futId = futId;
resCnt = 0;
@@ -858,6 +848,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
this.remapKeys = null;
}
+
+ if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) {
+ assert isDone();
+
+ return;
+ }
}
catch (Exception e) {
err = e;
[2/3] ignite git commit: Merge branch 'ignite-4705' into ignite-4705-1
Posted by sb...@apache.org.
Merge branch 'ignite-4705' into ignite-4705-1
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/68eae795
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/68eae795
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/68eae795
Branch: refs/heads/ignite-4705-1
Commit: 68eae7955b766f72e1668b143d21932f6d1c4181
Parents: 11d0b84 19c340c
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 2 11:22:07 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 2 11:22:07 2017 +0300
----------------------------------------------------------------------
.../GridNearAtomicAbstractUpdateFuture.java | 54 +++--------
.../GridNearAtomicSingleUpdateFuture.java | 91 +++++++++---------
.../dht/atomic/GridNearAtomicUpdateFuture.java | 98 ++++++++++----------
3 files changed, 103 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
[3/3] ignite git commit: ignite-4705
Posted by sb...@apache.org.
ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5215ed4c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5215ed4c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5215ed4c
Branch: refs/heads/ignite-4705-1
Commit: 5215ed4ca9e95a8afc9c5829f4d3b73d028368c6
Parents: 68eae79
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 2 11:22:51 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 2 11:22:51 2017 +0300
----------------------------------------------------------------------
.../GridNearAtomicSingleUpdateFuture.java | 40 +++++-----
.../dht/atomic/GridNearAtomicUpdateFuture.java | 82 ++++++++++----------
2 files changed, 61 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5215ed4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index b1b951f..6152faf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -203,26 +203,26 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
/** {@inheritDoc} */
@Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
- GridCacheReturn opRes0;
- CachePartialUpdateCheckedException err0;
- AffinityTopologyVersion remapTopVer0;
-
- synchronized (mux) {
- if (futId == null || futId != res.futureId())
- return;
-
- assert reqState != null;
-
- if (reqState.onMappingReceived(cctx, res)) {
- opRes0 = opRes;
- err0 = err;
- remapTopVer0 = onAllReceived();
- }
- else
- return;
- }
-
- finishUpdateFuture(opRes0, err0, remapTopVer0);
+// GridCacheReturn opRes0;
+// CachePartialUpdateCheckedException err0;
+// AffinityTopologyVersion remapTopVer0;
+//
+// synchronized (mux) {
+// if (futId == null || futId != res.futureId())
+// return;
+//
+// assert reqState != null;
+//
+// if (reqState.onMappingReceived(cctx, res)) {
+// opRes0 = opRes;
+// err0 = err;
+// remapTopVer0 = onAllReceived();
+// }
+// else
+// return;
+// }
+//
+// finishUpdateFuture(opRes0, err0, remapTopVer0);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/5215ed4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 573cb40..44d3238 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -285,47 +285,47 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
/** {@inheritDoc} */
@Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
- GridCacheReturn opRes0;
- CachePartialUpdateCheckedException err0;
- AffinityTopologyVersion remapTopVer0;
-
- synchronized (mux) {
- if (futId == null || futId != res.futureId())
- return;
-
- PrimaryRequestState reqState;
-
- if (singleReq != null) {
- if (singleReq.onMappingReceived(cctx, res)) {
- opRes0 = opRes;
- err0 = err;
- remapTopVer0 = onAllReceived();
- }
- else
- return;
- }
- else {
- reqState = mappings != null ? mappings.get(nodeId) : null;
-
- if (reqState != null && reqState.onMappingReceived(cctx, res)) {
- assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
-
- resCnt++;
-
- if (mappings.size() == resCnt) {
- opRes0 = opRes;
- err0 = err;
- remapTopVer0 = onAllReceived();
- }
- else
- return;
- }
- else
- return;
- }
- }
-
- finishUpdateFuture(opRes0, err0, remapTopVer0);
+// GridCacheReturn opRes0;
+// CachePartialUpdateCheckedException err0;
+// AffinityTopologyVersion remapTopVer0;
+//
+// synchronized (mux) {
+// if (futId == null || futId != res.futureId())
+// return;
+//
+// PrimaryRequestState reqState;
+//
+// if (singleReq != null) {
+// if (singleReq.onMappingReceived(cctx, res)) {
+// opRes0 = opRes;
+// err0 = err;
+// remapTopVer0 = onAllReceived();
+// }
+// else
+// return;
+// }
+// else {
+// reqState = mappings != null ? mappings.get(nodeId) : null;
+//
+// if (reqState != null && reqState.onMappingReceived(cctx, res)) {
+// assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+//
+// resCnt++;
+//
+// if (mappings.size() == resCnt) {
+// opRes0 = opRes;
+// err0 = err;
+// remapTopVer0 = onAllReceived();
+// }
+// else
+// return;
+// }
+// else
+// return;
+// }
+// }
+//
+// finishUpdateFuture(opRes0, err0, remapTopVer0);
}
/** {@inheritDoc} */