You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/01/11 12:54:27 UTC
ignite git commit: Must use finished exchange future to call
validateCache.
Repository: ignite
Updated Branches:
refs/heads/ignite-3477 3f4a2ee58 -> 949e4c0ac
Must use finished exchange future to call validateCache.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/949e4c0a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/949e4c0a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/949e4c0a
Branch: refs/heads/ignite-3477
Commit: 949e4c0ac6a9ebea877bbc336e95f5a8a5cc41f9
Parents: 3f4a2ee
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jan 11 15:54:30 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jan 11 15:54:30 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 7 +++--
.../GridCachePartitionExchangeManager.java | 33 +++++++++++++++++---
.../dht/GridPartitionedGetFuture.java | 4 ++-
.../dht/GridPartitionedSingleGetFuture.java | 4 ++-
.../GridDhtPartitionsExchangeFuture.java | 30 ++++++++++--------
5 files changed, 56 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/949e4c0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 93270ea..9356f31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -86,7 +86,7 @@ import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
@@ -1967,8 +1967,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
try {
int keysSize = keys.size();
- Throwable ex = ctx.shared().exchange().lastTopologyFuture()
- .validateCache(ctx, recovery, /*read*/true, null, keys);
+ GridDhtTopologyFuture topFut = ctx.shared().exchange().lastFinishedFuture();
+
+ Throwable ex = topFut != null ? topFut.validateCache(ctx, recovery, /*read*/true, null, keys) : null;
if (ex != null)
return new GridFinishedFuture<>(ex);
http://git-wip-us.apache.org/repos/asf/ignite/blob/949e4c0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1e7689f..46a20ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -145,6 +145,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private volatile GridDhtPartitionsExchangeFuture lastInitializedFut;
/** */
+ private final AtomicReference<GridDhtTopologyFuture> lastFinishedFut = new AtomicReference<>();
+
+ /** */
private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>();
/** */
@@ -159,9 +162,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private GridFutureAdapter<?> reconnectExchangeFut;
/** */
- private final Queue<Callable<Boolean>> rebalanceQ = new ConcurrentLinkedDeque8<>();
-
- /** */
private final Object interruptLock = new Object();
/**
@@ -610,13 +610,38 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @return Last completed topology future.
+ * @return Last initialized topology future.
*/
public GridDhtTopologyFuture lastTopologyFuture() {
return lastInitializedFut;
}
/**
+ * @return Last finished topology future.
+ */
+ @Nullable public GridDhtTopologyFuture lastFinishedFuture() {
+ return lastFinishedFut.get();
+ }
+
+ /**
+ * @param fut Finished future.
+ */
+ public void lastFinishedFuture(GridDhtTopologyFuture fut) {
+ assert fut != null && fut.isDone() : fut;
+
+ while (true) {
+ GridDhtTopologyFuture cur = lastFinishedFut.get();
+
+ if (cur == null || fut.topologyVersion().compareTo(cur.topologyVersion()) > 0) {
+ if (lastFinishedFut.compareAndSet(cur, fut))
+ break;
+ }
+ else
+ break;
+ }
+ }
+
+ /**
* @param ver Topology version.
* @return Future or {@code null} is future is already completed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/949e4c0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index cf329ef..6c4da68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -246,7 +246,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
return;
}
- Throwable err = cctx.topology().topologyVersionFuture().validateCache(cctx, recovery, true, null, keys);
+ GridDhtTopologyFuture topFut = cctx.shared().exchange().lastFinishedFuture();
+
+ Throwable err = topFut != null ? topFut.validateCache(cctx, recovery, true, null, keys) : null;
if (err != null) {
onDone(err);
http://git-wip-us.apache.org/repos/asf/ignite/blob/949e4c0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 1744cbd..ea69743 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -198,7 +198,9 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer :
canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
- Throwable err = cctx.topology().topologyVersionFuture().validateCache(cctx, recovery, true, key, null);
+ GridDhtTopologyFuture topFut = cctx.shared().exchange().lastFinishedFuture();
+
+ Throwable err = topFut != null ? topFut.validateCache(cctx, recovery, true, key, null) : null;
if (err != null) {
onDone(err);
http://git-wip-us.apache.org/repos/asf/ignite/blob/949e4c0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index b7f6680..75fd3e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -205,7 +205,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
private boolean centralizedAff;
/** Change global state exception. */
- private Exception changeGlobalStateException;
+ private Exception changeGlobalStateE;
/** Change global state exceptions. */
private final Map<UUID, Exception> changeGlobalStateExceptions = new ConcurrentHashMap8<>();
@@ -488,9 +488,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert !dummy && !forcePreload : this;
try {
- AffinityTopologyVersion topVersion = topologyVersion();
+ AffinityTopologyVersion topVer = topologyVersion();
- srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topVersion));
+ srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topVer));
remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
@@ -522,7 +522,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
else {
if (discoEvt.type() == EVT_NODE_JOINED) {
- Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(topVersion);
+ Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(topVer);
if (!discoEvt.eventNode().isLocal())
cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
@@ -568,7 +568,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
case NONE: {
initTopologies();
- onDone(topVersion);
+ onDone(topVer);
break;
}
@@ -654,10 +654,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
GridClusterStateProcessor stateProc = cctx.kernalContext().state();
if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(reqs, topologyVersion())) {
- changeGlobalStateException = stateProc.onChangeGlobalState();
+ changeGlobalStateE = stateProc.onChangeGlobalState();
- if (crd && changeGlobalStateException != null)
- changeGlobalStateExceptions.put(cctx.localNodeId(), changeGlobalStateException);
+ if (crd && changeGlobalStateE != null)
+ changeGlobalStateExceptions.put(cctx.localNodeId(), changeGlobalStateE);
}
boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, reqs);
@@ -1044,8 +1044,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(
node, exchangeId(), clientOnlyExchange, true);
- if (exchangeOnChangeGlobalState && changeGlobalStateException != null)
- m.setException(changeGlobalStateException);
+ if (exchangeOnChangeGlobalState && changeGlobalStateE != null)
+ m.setException(changeGlobalStateE);
if (log.isDebugEnabled())
log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']');
@@ -1199,6 +1199,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (discoEvt instanceof DiscoveryCustomEvent)
((DiscoveryCustomEvent)discoEvt).customMessage(null);
+ cctx.exchange().lastFinishedFuture(this);
+
return true;
}
@@ -1213,6 +1215,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
@Nullable Object key,
@Nullable Collection<?> keys
) {
+ assert isDone() : this;
+
Throwable err = error();
if (err != null)
@@ -1318,7 +1322,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
changeGlobalStateExceptions.clear();
crd = null;
partReleaseFut = null;
- changeGlobalStateException = null;
+ changeGlobalStateE = null;
}
/**
@@ -1920,8 +1924,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
if (crd0.isLocal()) {
- if (exchangeOnChangeGlobalState && changeGlobalStateException!=null)
- changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateException);
+ if (exchangeOnChangeGlobalState && changeGlobalStateE !=null)
+ changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateE);
if (allReceived) {
onAllReceived();