You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/21 10:22:30 UTC
[07/10] ignite git commit: ignite-6124 Merge exchanges for multiple
discovery events
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 2ee80a6..d991c86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -38,9 +38,6 @@ import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
-import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
@@ -49,6 +46,9 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d67d81d..08eb1bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -89,6 +89,7 @@ import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchang
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridListSet;
import org.apache.ignite.internal.util.GridPartitionStateMap;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -102,6 +103,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
@@ -126,9 +128,16 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.preloa
*/
public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
/** Exchange history size. */
- private static final int EXCHANGE_HISTORY_SIZE =
+ private final int EXCHANGE_HISTORY_SIZE =
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE, 1_000);
+ /** */
+ private final long IGNITE_EXCHANGE_MERGE_DELAY =
+ IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY, 0);
+
+ /** */
+ private static final IgniteProductVersion EXCHANGE_PROTOCOL_2_SINCE = IgniteProductVersion.fromString("2.1.4");
+
/** Atomic reference for pending partition resend timeout object. */
private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
@@ -176,7 +185,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* discovery event. In case if remote node will retry partition exchange, completed future will indicate
* that full partition map should be sent to requesting node right away.
*/
- private ExchangeFutureSet exchFuts = new ExchangeFutureSet();
+ private ExchangeFutureSet exchFuts = new ExchangeFutureSet(EXCHANGE_HISTORY_SIZE);
/** */
private volatile IgniteCheckedException stopErr;
@@ -193,6 +202,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** Events received while cluster state transition was in progress. */
private final List<PendingDiscoveryEvent> pendingEvts = new ArrayList<>();
+ /** */
+ private final GridFutureAdapter<?> crdInitFut = new GridFutureAdapter();
+
+ /** For tests only. */
+ private volatile AffinityTopologyVersion exchMergeTestWaitVer;
+
/** Discovery listener. */
private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
@Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -288,7 +303,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
cctx.io().addCacheHandler(0, GridDhtPartitionsSingleMessage.class,
new MessageHandler<GridDhtPartitionsSingleMessage>() {
- @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
+ @Override public void onMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
+ if (!crdInitFut.isDone() && !msg.restoreState()) {
+ GridDhtPartitionExchangeId exchId = msg.exchangeId();
+
+ log.info("Waiting for coordinator initialization [node=" + node.id() +
+ ", nodeOrder=" + node.order() +
+ ", ver=" + (exchId != null ? exchId.topologyVersion() : null) + ']');
+
+ crdInitFut.listen(new CI1<IgniteInternalFuture>() {
+ @Override public void apply(IgniteInternalFuture fut) {
+ processSinglePartitionUpdate(node, msg);
+ }
+ });
+
+ return;
+ }
+
processSinglePartitionUpdate(node, msg);
}
});
@@ -344,6 +375,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ *
+ */
+ public void onCoordinatorInitialized() {
+ crdInitFut.onDone();
+ }
+
+ /**
* Callback for local join event (needed since regular event for local join is not generated).
*
* @param evt Event.
@@ -375,7 +413,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);
- exchFut = exchangeFuture(exchId, evt, cache,null, null);
+ exchFut = exchangeFuture(exchId, evt, cache, null, null);
}
else {
DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage();
@@ -589,6 +627,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * @param ver Node version.
+ * @return Supported exchange protocol version.
+ */
+ public static int exchangeProtocolVersion(IgniteProductVersion ver) {
+ if (ver.compareToIgnoreTimestamp(EXCHANGE_PROTOCOL_2_SINCE) >= 0)
+ return 2;
+
+ return 1;
+ }
+
+ /**
* @param idx Index.
* @return Topic for index.
*/
@@ -598,6 +647,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
+ exchWorker.onKernalStop();
+
cctx.gridEvents().removeDiscoveryEventListener(discoLsnr);
cctx.io().removeHandler(false, 0, GridDhtPartitionsSingleMessage.class);
@@ -620,6 +671,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
// Finish all exchange futures.
ExchangeFutureSet exchFuts0 = exchFuts;
+ for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+ if (task instanceof GridDhtPartitionsExchangeFuture)
+ ((GridDhtPartitionsExchangeFuture)task).onDone(stopErr);
+ }
+
if (exchFuts0 != null) {
for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
f.onDone(stopErr);
@@ -659,10 +715,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param grpId Cache group ID.
- * @param exchFut Exchange future.
* @return Topology.
*/
- public GridDhtPartitionTopology clientTopology(int grpId, GridDhtPartitionsExchangeFuture exchFut) {
+ public GridDhtPartitionTopology clientTopology(int grpId) {
GridClientPartitionTopology top = clientTops.get(grpId);
if (top != null)
@@ -670,7 +725,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
Object affKey = null;
- CacheGroupDescriptor grpDesc = cctx.cache().cacheGroupDescriptors().get(grpId);
+ CacheGroupDescriptor grpDesc = cctx.affinity().cacheGroups().get(grpId);
if (grpDesc != null) {
CacheConfiguration<?, ?> ccfg = grpDesc.config();
@@ -684,7 +739,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
GridClientPartitionTopology old = clientTops.putIfAbsent(grpId,
- top = new GridClientPartitionTopology(cctx, grpId, exchFut, affKey));
+ top = new GridClientPartitionTopology(cctx, grpId, affKey));
return old != null ? old : top;
}
@@ -705,19 +760,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * Gets topology version of last partition exchange, it is possible that last partition exchange
- * is not completed yet.
- *
- * @return Topology version.
- */
- public AffinityTopologyVersion topologyVersion() {
- GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut;
-
- return lastInitializedFut0 != null
- ? lastInitializedFut0.exchangeId().topologyVersion() : AffinityTopologyVersion.NONE;
- }
-
- /**
* @return Topology version of latest completed partition exchange.
*/
public AffinityTopologyVersion readyAffinityVersion() {
@@ -727,7 +769,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @return Last initialized topology future.
*/
- public GridDhtTopologyFuture lastTopologyFuture() {
+ public GridDhtPartitionsExchangeFuture lastTopologyFuture() {
return lastInitializedFut;
}
@@ -763,7 +805,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
@Nullable public IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion ver) {
GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut;
- if (lastInitializedFut0 != null && lastInitializedFut0.topologyVersion().compareTo(ver) == 0) {
+ if (lastInitializedFut0 != null && lastInitializedFut0.initialVersion().compareTo(ver) == 0) {
if (log.isDebugEnabled())
log.debug("Return lastInitializedFut for topology ready future " +
"[ver=" + ver + ", fut=" + lastInitializedFut0 + ']');
@@ -909,7 +951,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
// No need to send to nodes which did not finish their first exchange.
AffinityTopologyVersion rmtTopVer =
- lastFut != null ? lastFut.topologyVersion() : AffinityTopologyVersion.NONE;
+ lastFut != null ? (lastFut.isDone() ? lastFut.topologyVersion() : lastFut.initialVersion()) : AffinityTopologyVersion.NONE;
Collection<ClusterNode> rmts = CU.remoteNodes(cctx, rmtTopVer);
@@ -1076,8 +1118,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param id Exchange ID.
*/
private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
- GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node,
- id,
+ GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id,
cctx.kernalContext().clientNode(),
false);
@@ -1098,14 +1139,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param targetNode Target node.
* @param exchangeId Exchange ID.
* @param clientOnlyExchange Client exchange flag.
* @param sndCounters {@code True} if need send partition update counters.
* @return Message.
*/
- public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(ClusterNode targetNode,
- @Nullable GridDhtPartitionExchangeId exchangeId,
+ public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(@Nullable GridDhtPartitionExchangeId exchangeId,
boolean clientOnlyExchange,
boolean sndCounters)
{
@@ -1231,14 +1270,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param exchFut Exchange.
+ * @param topVer Exchange result topology version.
+ * @param initTopVer Exchange initial version.
* @param err Error.
*/
- public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, @Nullable Throwable err) {
- AffinityTopologyVersion topVer = exchFut.topologyVersion();
+ public void onExchangeDone(AffinityTopologyVersion topVer, AffinityTopologyVersion initTopVer, @Nullable Throwable err) {
+ assert topVer != null || err != null;
+ assert initTopVer != null;
if (log.isDebugEnabled())
- log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']');
+ log.debug("Exchange done [topVer=" + topVer + ", err=" + err + ']');
if (err == null) {
while (true) {
@@ -1263,10 +1304,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
else {
for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
- if (entry.getKey().compareTo(topVer) <= 0) {
+ if (entry.getKey().compareTo(initTopVer) <= 0) {
if (log.isDebugEnabled())
log.debug("Completing created topology ready future with error " +
- "[ver=" + topVer + ", fut=" + entry.getValue() + ']');
+ "[ver=" + entry.getKey() + ", fut=" + entry.getValue() + ']');
entry.getValue().onDone(err);
}
@@ -1279,7 +1320,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
int skipped = 0;
for (GridDhtPartitionsExchangeFuture fut : exchFuts0.values()) {
- if (exchFut.exchangeId().topologyVersion().compareTo(fut.exchangeId().topologyVersion()) < 0)
+ if (initTopVer.compareTo(fut.exchangeId().topologyVersion()) < 0)
continue;
skipped++;
@@ -1357,7 +1398,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
cctx.database().releaseHistoryForPreloading();
}
else
- exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg);
+ exchangeFuture(msg.exchangeId(), null, null, null, null).onReceiveFullMessage(node, msg);
}
finally {
leaveBusy();
@@ -1397,7 +1438,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top = grp.topology();
if (top != null) {
- updated |= top.update(null, entry.getValue());
+ updated |= top.update(null, entry.getValue(), false);
cctx.affinity().checkRebalanceState(top, grpId);
}
@@ -1406,24 +1447,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (updated)
scheduleResendPartitions();
}
- else {
- if (msg.client()) {
- final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
- null,
- null,
- null,
- null);
-
- exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- // Finished future should reply only to sender client node.
- exchFut.onReceive(node, msg);
- }
- });
- }
- else
- exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg);
- }
+ else
+ exchangeFuture(msg.exchangeId(), null, null, null, null).onReceiveSingleMessage(node, msg);
}
finally {
leaveBusy();
@@ -1439,7 +1464,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return;
try {
- sendLocalPartitions(node, msg.exchangeId());
+ final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
+ null,
+ null,
+ null,
+ null);
+
+ exchFut.onReceivePartitionRequest(node, msg);
}
finally {
leaveBusy();
@@ -1451,7 +1482,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @throws Exception If failed.
*/
public void dumpDebugInfo(@Nullable GridDhtPartitionsExchangeFuture exchFut) throws Exception {
- AffinityTopologyVersion exchTopVer = exchFut != null ? exchFut.topologyVersion() : null;
+ AffinityTopologyVersion exchTopVer = exchFut != null ? exchFut.initialVersion() : null;
U.warn(diagnosticLog, "Ready affinity version: " + readyTopVer.get());
@@ -1734,6 +1765,181 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * For testing only.
+ *
+ * @param exchMergeTestWaitVer Version to wait for.
+ */
+ public void mergeExchangesTestWaitVersion(AffinityTopologyVersion exchMergeTestWaitVer) {
+ this.exchMergeTestWaitVer = exchMergeTestWaitVer;
+ }
+
+ /**
+ * @param curFut Current exchange future.
+ * @param msg Message.
+ * @return {@code True} if node is stopping.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ public boolean mergeExchanges(final GridDhtPartitionsExchangeFuture curFut, GridDhtPartitionsFullMessage msg)
+ throws IgniteInterruptedCheckedException {
+ AffinityTopologyVersion resVer = msg.resultTopologyVersion();
+
+ if (exchWorker.waitForExchangeFuture(resVer))
+ return true;
+
+ for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+ if (task instanceof GridDhtPartitionsExchangeFuture) {
+ GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture) task;
+
+ if (fut.initialVersion().compareTo(resVer) > 0) {
+ log.info("Merge exchange future on finish stop [curFut=" + curFut.initialVersion() +
+ ", resVer=" + resVer +
+ ", nextFutVer=" + fut.initialVersion() + ']');
+
+ break;
+ }
+
+ log.info("Merge exchange future on finish [curFut=" + curFut.initialVersion() +
+ ", mergedFut=" + fut.initialVersion() +
+ ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
+ ", evtNode=" + fut.firstEvent().eventNode().id()+
+ ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
+
+ DiscoveryEvent evt = fut.firstEvent();
+
+ curFut.context().events().addEvent(fut.initialVersion(),
+ fut.firstEvent(),
+ fut.firstEventCache());
+
+ if (evt.type() == EVT_NODE_JOINED) {
+ final GridDhtPartitionsSingleMessage pendingMsg = fut.mergeJoinExchangeOnDone(curFut);
+
+ if (pendingMsg != null)
+ curFut.waitAndReplyToNode(evt.eventNode().id(), pendingMsg);
+ }
+ }
+ }
+
+ ExchangeDiscoveryEvents evts = curFut.context().events();
+
+ assert evts.topologyVersion().equals(resVer) : "Invalid exchange merge result [ver=" + evts.topologyVersion()
+ + ", expVer=" + resVer + ']';
+
+ return false;
+ }
+
+ /**
+ * @param curFut Current active exchange future.
+ * @return {@code False} if need wait messages for merged exchanges.
+ */
+ public boolean mergeExchangesOnCoordinator(GridDhtPartitionsExchangeFuture curFut) {
+ if (IGNITE_EXCHANGE_MERGE_DELAY > 0) {
+ try {
+ U.sleep(IGNITE_EXCHANGE_MERGE_DELAY);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ U.warn(log, "Failed to wait for exchange merge, thread interrupted: " + e);
+
+ return true;
+ }
+ }
+
+ AffinityTopologyVersion exchMergeTestWaitVer = this.exchMergeTestWaitVer;
+
+ if (exchMergeTestWaitVer != null) {
+ log.info("Exchange merge test, waiting for version [exch=" + curFut.initialVersion() +
+ ", waitVer=" + exchMergeTestWaitVer + ']');
+
+ long end = U.currentTimeMillis() + 10_000;
+
+ while (U.currentTimeMillis() < end) {
+ boolean found = false;
+
+ for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+ if (task instanceof GridDhtPartitionsExchangeFuture) {
+ GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
+
+ if (exchMergeTestWaitVer.equals(fut.initialVersion())) {
+ log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer);
+
+ found = true;
+
+ break;
+ }
+ }
+ }
+
+ if (found)
+ break;
+ else {
+ try {
+ U.sleep(100);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ break;
+ }
+ }
+ }
+
+ this.exchMergeTestWaitVer = null;
+ }
+
+ synchronized (curFut.mutex()) {
+ int awaited = 0;
+
+ for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+ if (task instanceof GridDhtPartitionsExchangeFuture) {
+ GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
+
+ DiscoveryEvent evt = fut.firstEvent();
+
+ if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+ log.info("Stop merge, custom event found: " + evt);
+
+ break;
+ }
+
+ ClusterNode node = evt.eventNode();
+
+ if (!curFut.context().supportsMergeExchanges(node)) {
+ log.info("Stop merge, node does not support merge: " + node);
+
+ break;
+ }
+ if (evt.type() == EVT_NODE_JOINED && cctx.cache().hasCachesReceivedFromJoin(node)) {
+ log.info("Stop merge, received caches from node: " + node);
+
+ break;
+ }
+
+ log.info("Merge exchange future [curFut=" + curFut.initialVersion() +
+ ", mergedFut=" + fut.initialVersion() +
+ ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
+ ", evtNode=" + fut.firstEvent().eventNode().id() +
+ ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
+
+ curFut.context().events().addEvent(fut.initialVersion(),
+ fut.firstEvent(),
+ fut.firstEventCache());
+
+ if (evt.type() == EVT_NODE_JOINED) {
+ if (fut.mergeJoinExchange(curFut))
+ awaited++;
+ }
+ }
+ else {
+ if (!task.skipForExchangeMerge()) {
+ log.info("Stop merge, custom task found: " + task);
+
+ break;
+ }
+ }
+ }
+
+ return awaited == 0;
+ }
+ }
+
+ /**
* Exchange future thread. All exchanges happen only by one thread and next
* exchange will not start until previous one completes.
*/
@@ -1742,12 +1948,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private final LinkedBlockingDeque<CachePartitionExchangeWorkerTask> futQ =
new LinkedBlockingDeque<>();
+ /** */
+ private AffinityTopologyVersion lastFutVer;
+
/** Busy flag used as performance optimization to stop current preloading. */
private volatile boolean busy;
/** */
private boolean crd;
+ /** */
+ private boolean stop;
+
/**
* Constructor.
*/
@@ -1783,10 +1995,67 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
futQ.offer(exchFut);
+ synchronized (this) {
+ lastFutVer = exchFut.initialVersion();
+
+ notifyAll();
+ }
+
if (log.isDebugEnabled())
log.debug("Added exchange future to exchange worker: " + exchFut);
}
+ /**
+ *
+ */
+ private void onKernalStop() {
+ synchronized (this) {
+ stop = true;
+
+ notifyAll();
+ }
+ }
+
+ /**
+ * @param resVer Version to wait for.
+ * @return {@code True} if node is stopping.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ private boolean waitForExchangeFuture(AffinityTopologyVersion resVer) throws IgniteInterruptedCheckedException {
+ synchronized (this) {
+ while (!stop && lastFutVer.compareTo(resVer) < 0)
+ U.wait(this);
+
+ return stop;
+ }
+ }
+
+ /**
+ * @param resVer Exchange result version.
+ * @param exchFut Exchange future.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ private void removeMergedFutures(AffinityTopologyVersion resVer, GridDhtPartitionsExchangeFuture exchFut)
+ throws IgniteInterruptedCheckedException {
+ if (resVer.compareTo(exchFut.initialVersion()) != 0) {
+ waitForExchangeFuture(resVer);
+
+ for (CachePartitionExchangeWorkerTask task : futQ) {
+ if (task instanceof GridDhtPartitionsExchangeFuture) {
+ GridDhtPartitionsExchangeFuture fut0 = (GridDhtPartitionsExchangeFuture)task;
+
+ if (resVer.compareTo(fut0.initialVersion()) >= 0) {
+ fut0.finishMerged();
+
+ futQ.remove(fut0);
+ }
+ else
+ break;
+ }
+ }
+ }
+ }
+
/** {@inheritDoc} */
@Override public void cancel() {
synchronized (interruptLock) {
@@ -1920,6 +2189,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridDhtPartitionsExchangeFuture exchFut = null;
+ AffinityTopologyVersion resVer = null;
+
try {
if (isCancelled())
break;
@@ -1946,7 +2217,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean newCrd = false;
if (!crd) {
- List<ClusterNode> srvNodes = exchFut.discoCache().serverNodes();
+ List<ClusterNode> srvNodes = exchFut.firstEventCache().serverNodes();
crd = newCrd = !srvNodes.isEmpty() && srvNodes.get(0).isLocal();
}
@@ -1961,13 +2232,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
while (true) {
try {
- exchFut.get(futTimeout, TimeUnit.MILLISECONDS);
+ resVer = exchFut.get(futTimeout, TimeUnit.MILLISECONDS);
break;
}
catch (IgniteFutureTimeoutCheckedException ignored) {
U.warn(diagnosticLog, "Failed to wait for partition map exchange [" +
- "topVer=" + exchFut.topologyVersion() +
+ "topVer=" + exchFut.initialVersion() +
", node=" + cctx.localNodeId() + "]. " +
"Dumping pending objects that might be the cause: ");
@@ -1990,6 +2261,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
+ removeMergedFutures(resVer, exchFut);
if (log.isDebugEnabled())
log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
@@ -2098,7 +2370,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (assignsCancelled) { // Pending exchange.
U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
- "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+ "[top=" + resVer + ", evt=" + exchId.discoveryEventName() +
", node=" + exchId.nodeId() + ']');
}
else if (r != null) {
@@ -2108,19 +2380,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (!hasPendingExchange()) {
U.log(log, "Rebalancing started " +
- "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+ "[top=" + resVer + ", evt=" + exchId.discoveryEventName() +
", node=" + exchId.nodeId() + ']');
r.run(); // Starts rebalancing routine.
}
else
U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
- "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+ "[top=" + resVer + ", evt=" + exchId.discoveryEventName() +
", node=" + exchId.nodeId() + ']');
}
else
U.log(log, "Skipping rebalancing (nothing scheduled) " +
- "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+ "[top=" + resVer + ", evt=" + exchId.discoveryEventName() +
", node=" + exchId.nodeId() + ']');
}
}
@@ -2205,10 +2477,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private final int histSize;
+
/**
* Creates ordered, not strict list set.
+ *
+ * @param histSize Max history size.
*/
- private ExchangeFutureSet() {
+ private ExchangeFutureSet(int histSize) {
super(new Comparator<GridDhtPartitionsExchangeFuture>() {
@Override public int compare(
GridDhtPartitionsExchangeFuture f1,
@@ -2224,6 +2501,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return t2.compareTo(t1);
}
}, /*not strict*/false);
+
+ this.histSize = histSize;
}
/**
@@ -2234,7 +2513,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridDhtPartitionsExchangeFuture fut) {
GridDhtPartitionsExchangeFuture cur = super.addx(fut);
- while (size() > EXCHANGE_HISTORY_SIZE) {
+ while (size() > histSize) {
GridDhtPartitionsExchangeFuture last = last();
if (!last.isDone())
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 2e543c7..bf3ee0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1681,6 +1681,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * @param node Joined node.
+ * @return {@code True} if there are new caches received from joined node.
+ */
+ boolean hasCachesReceivedFromJoin(ClusterNode node) {
+ return cachesInfo.hasCachesReceivedFromJoin(node.id());
+ }
+
+ /**
* Starts statically configured caches received from remote nodes during exchange.
*
* @param nodeId Joining node ID.
@@ -2016,17 +2024,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* Callback invoked when first exchange future for dynamic cache is completed.
*
- * @param topVer Completed topology version.
+ * @param cacheStartVer Started caches version to create proxy for.
* @param exchActions Change requests.
* @param err Error.
*/
@SuppressWarnings("unchecked")
public void onExchangeDone(
- AffinityTopologyVersion topVer,
+ AffinityTopologyVersion cacheStartVer,
@Nullable ExchangeActions exchActions,
@Nullable Throwable err
) {
- initCacheProxies(topVer, err);
+ initCacheProxies(cacheStartVer, err);
if (exchActions == null)
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
index 3166159..317037b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
@@ -67,12 +67,22 @@ public class ClientCacheDhtTopologyFuture extends GridDhtTopologyFutureAdapter
}
/** {@inheritDoc} */
+ @Override public AffinityTopologyVersion initialVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean exchangeDone() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion() {
return topVer;
}
/** {@inheritDoc} */
@Override public String toString() {
- return "ClientCacheDhtTopologyFuture [topVer=" + topologyVersion() + ']';
+ return "ClientCacheDhtTopologyFuture [topVer=" + topVer + ']';
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 4b9826e..745e7d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -36,6 +36,8 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -115,33 +117,24 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/**
* @param cctx Context.
* @param grpId Group ID.
- * @param exchFut Exchange ID.
* @param similarAffKey Key to find caches with similar affinity.
*/
public GridClientPartitionTopology(
- GridCacheSharedContext cctx,
+ GridCacheSharedContext<?, ?> cctx,
int grpId,
- GridDhtPartitionsExchangeFuture exchFut,
Object similarAffKey
) {
this.cctx = cctx;
this.grpId = grpId;
this.similarAffKey = similarAffKey;
- topVer = exchFut.topologyVersion();
-
- discoCache = exchFut.discoCache();
+ topVer = AffinityTopologyVersion.NONE;
log = cctx.logger(getClass());
- lock.writeLock().lock();
-
- try {
- beforeExchange0(cctx.localNode(), exchFut);
- }
- finally {
- lock.writeLock().unlock();
- }
+ node2part = new GridDhtPartitionFullMap(cctx.localNode().id(),
+ cctx.localNode().order(),
+ updateSeq.get());
}
/**
@@ -194,9 +187,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
U.writeLock(lock);
try {
- AffinityTopologyVersion exchTopVer = exchFut.topologyVersion();
+ AffinityTopologyVersion exchTopVer = exchFut.initialVersion();
- assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
+ assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [grp=" + grpId +
+ ", topVer=" + topVer +
", exchVer=" + exchTopVer + ']';
this.stopping = stopping;
@@ -214,7 +208,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public AffinityTopologyVersion topologyVersion() {
+ @Override public AffinityTopologyVersion readyTopologyVersion() {
lock.readLock().lock();
try {
@@ -228,6 +222,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public AffinityTopologyVersion lastTopologyChangeVersion() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public GridDhtTopologyFuture topologyVersionFuture() {
assert topReadyFut != null;
@@ -240,13 +239,17 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public void initPartitions(GridDhtPartitionsExchangeFuture exchFut) {
+ @Override public void initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
+ GridDhtPartitionsExchangeFuture exchFut) {
// No-op.
}
/** {@inheritDoc} */
- @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean initParts)
- throws IgniteCheckedException {
+ @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut,
+ boolean initParts,
+ boolean updateMoving)
+ throws IgniteCheckedException
+ {
ClusterNode loc = cctx.localNode();
U.writeLock(lock);
@@ -255,7 +258,19 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
if (stopping)
return;
+ discoCache = exchFut.events().discoveryCache();
+
beforeExchange0(loc, exchFut);
+
+ if (updateMoving) {
+ ExchangeDiscoveryEvents evts = exchFut.context().events();
+
+ GridAffinityAssignmentCache aff = cctx.affinity().affinity(grpId);
+
+ assert aff.lastVersion().equals(evts.topologyVersion());
+
+ createMovingPartitions(aff.readyAffinity(evts.topologyVersion()));
+ }
}
finally {
lock.writeLock().unlock();
@@ -263,17 +278,50 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/**
+ * @param aff Affinity.
+ */
+ private void createMovingPartitions(AffinityAssignment aff) {
+ for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
+ GridDhtPartitionMap map = e.getValue();
+
+ addMoving(map, aff.backupPartitions(e.getKey()));
+ addMoving(map, aff.primaryPartitions(e.getKey()));
+ }
+ }
+
+ /**
+ * @param map Node partition state map.
+ * @param parts Partitions assigned to node.
+ */
+ private void addMoving(GridDhtPartitionMap map, Set<Integer> parts) {
+ if (F.isEmpty(parts))
+ return;
+
+ for (Integer p : parts) {
+ GridDhtPartitionState state = map.get(p);
+
+ if (state == null || state == EVICTED)
+ map.put(p, MOVING);
+ }
+ }
+
+ /**
* @param loc Local node.
* @param exchFut Exchange future.
*/
private void beforeExchange0(ClusterNode loc, GridDhtPartitionsExchangeFuture exchFut) {
GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
- assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
- topVer + ", exchId=" + exchId + ']';
+ if (exchFut.context().events().hasServerLeft()) {
+ List<DiscoveryEvent> evts0 = exchFut.context().events().events();
- if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent())
- removeNode(exchId.nodeId());
+ for (int i = 0; i < evts0.size(); i++) {
+ DiscoveryEvent evt = evts0.get(i);
+
+ if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
+ removeNode(evt.eventNode().id());
+ }
+ }
// In case if node joins, get topology at the time of joining node.
ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
@@ -492,7 +540,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
List<ClusterNode> nodes = new ArrayList<>(size);
for (UUID id : nodeIds) {
- if (topVer.topologyVersion() > 0 && !allIds.contains(id))
+ if (topVer.topologyVersion() > 0 && !F.contains(allIds, id))
continue;
if (hasState(p, id, state, states)) {
@@ -747,7 +795,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Override public boolean update(
@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionMap parts
+ GridDhtPartitionMap parts,
+ boolean force
) {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
@@ -766,12 +815,14 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
if (stopping)
return false;
- if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) {
- if (log.isDebugEnabled())
- log.debug("Stale exchange id for single partition map update (will ignore) [lastExchVer=" +
- lastExchangeVer + ", exchId=" + exchId + ']');
+ if (!force) {
+ if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Stale exchange id for single partition map update (will ignore) [lastExchVer=" +
+ lastExchangeVer + ", exchId=" + exchId + ']');
- return false;
+ return false;
+ }
}
if (exchId != null)
@@ -783,7 +834,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
GridDhtPartitionMap cur = node2part.get(parts.nodeId());
- if (isStaleUpdate(cur, parts)) {
+ if (force) {
+ if (cur != null && cur.topologyVersion().initialized())
+ parts.updateSequence(cur.updateSequence(), cur.topologyVersion());
+ }
+ else if (isStaleUpdate(cur, parts)) {
if (log.isDebugEnabled())
log.debug("Stale update for single partition map update (will ignore) [exchId=" + exchId +
", curMap=" + cur + ", newMap=" + parts + ']');
@@ -845,19 +900,20 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) {
+ @Override public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, AffinityAssignment assignment,
+ boolean updateRebalanceVer) {
// No-op.
}
/** {@inheritDoc} */
- @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) {
+ @Override public boolean detectLostPartitions(AffinityTopologyVersion affVer, DiscoveryEvent discoEvt) {
assert false : "detectLostPartitions should never be called on client topology";
return false;
}
/** {@inheritDoc} */
- @Override public void resetLostPartitions() {
+ @Override public void resetLostPartitions(AffinityTopologyVersion affVer) {
assert false : "resetLostPartitions should never be called on client topology";
}
@@ -931,12 +987,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
assert nodeId != null;
assert lock.writeLock().isHeldByCurrentThread();
- ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
-
ClusterNode loc = cctx.localNode();
if (node2part != null) {
- if (oldest.equals(loc) && !node2part.nodeId().equals(loc.id())) {
+ if (!node2part.nodeId().equals(loc.id())) {
updateSeq.setIfGreater(node2part.updateSequence());
node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq.incrementAndGet(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 960b91a..1f67c1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -433,7 +433,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
// While we are holding read lock, register lock future for partition release future.
IgniteUuid lockId = IgniteUuid.fromUuid(ctx.localNodeId());
- topVer = top.topologyVersion();
+ topVer = top.readyTopologyVersion();
MultiUpdateFuture fut = new MultiUpdateFuture(topVer);
@@ -562,11 +562,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
}
// Version for all loaded entries.
- final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topology().topologyVersion());
+ final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
- final boolean replicate = ctx.isDrEnabled();
+ final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topVer);
- final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
+ final boolean replicate = ctx.isDrEnabled();
final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry();
@@ -587,13 +587,13 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
return;
}
+ final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
+
// Version for all loaded entries.
- final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topology().topologyVersion());
+ final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topVer);
final boolean replicate = ctx.isDrEnabled();
- final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
-
CacheOperationContext opCtx = ctx.operationContextPerCall();
ExpiryPolicy plc0 = opCtx != null ? opCtx.expiry() : null;
@@ -938,7 +938,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
res.setContainsValue();
}
else {
- AffinityTopologyVersion topVer = ctx.shared().exchange().lastTopologyFuture().topologyVersion();
+ AffinityTopologyVersion topVer = ctx.shared().exchange().lastTopologyFuture().initialVersion();
assert topVer.compareTo(req.topologyVersion()) > 0 : "Wrong ready topology version for " +
"invalid partitions response [topVer=" + topVer + ", req=" + req + ']';
@@ -1028,7 +1028,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
}
if (!F.isEmpty(fut.invalidPartitions()))
- res.invalidPartitions(fut.invalidPartitions(), ctx.shared().exchange().lastTopologyFuture().topologyVersion());
+ res.invalidPartitions(fut.invalidPartitions(), ctx.shared().exchange().lastTopologyFuture().initialVersion());
try {
ctx.io().send(nodeId, res, ctx.ioPolicy());
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index dbfb426..4d1bb38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -23,8 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -47,9 +45,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridIterator;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 8688c4f..0dea5e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -69,11 +69,14 @@ public interface GridDhtPartitionTopology {
) throws IgniteInterruptedCheckedException;
/**
- * Topology version.
- *
- * @return Topology version.
+ * @return Result topology version of last finished exchange.
*/
- public AffinityTopologyVersion topologyVersion();
+ public AffinityTopologyVersion readyTopologyVersion();
+
+ /**
+ * @return Start topology version of last exchange.
+ */
+ public AffinityTopologyVersion lastTopologyChangeVersion();
/**
* Gets a future that will be completed when partition exchange map for this
@@ -98,16 +101,21 @@ public interface GridDhtPartitionTopology {
*
* @param exchFut Exchange future.
* @param affReady Affinity ready flag.
+ * @param updateMoving
* @throws IgniteCheckedException If failed.
*/
- public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean affReady)
+ public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut,
+ boolean affReady,
+ boolean updateMoving)
throws IgniteCheckedException;
/**
+ * @param affVer Affinity version.
* @param exchFut Exchange future.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
- public void initPartitions(GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException;
+ public void initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture exchFut)
+ throws IgniteInterruptedCheckedException;
/**
* Post-initializes this topology.
@@ -245,7 +253,7 @@ public interface GridDhtPartitionTopology {
public void onRemoved(GridDhtCacheEntry e);
/**
- * @param exchangeVer Topology version from exchange. Value should be greater than previously passed. Null value
+ * @param exchangeResVer Result topology version for exchange. Value should be greater than previously passed. Null value
* means full map received is not related to exchange
* @param partMap Update partition map.
* @param cntrMap Partition update counters.
@@ -255,7 +263,7 @@ public interface GridDhtPartitionTopology {
* @return {@code True} if local state was changed.
*/
public boolean update(
- @Nullable AffinityTopologyVersion exchangeVer,
+ @Nullable AffinityTopologyVersion exchangeResVer,
GridDhtPartitionFullMap partMap,
@Nullable Map<Integer, T2<Long, Long>> cntrMap,
Set<Integer> partsToReload,
@@ -264,10 +272,12 @@ public interface GridDhtPartitionTopology {
/**
* @param exchId Exchange ID.
* @param parts Partitions.
+ * @param force {@code True} to skip stale update check.
* @return {@code True} if local state was changed.
*/
public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionMap parts);
+ GridDhtPartitionMap parts,
+ boolean force);
/**
* @param cntrMap Counters map.
@@ -280,15 +290,18 @@ public interface GridDhtPartitionTopology {
* <p>
* This method should be called on topology coordinator after all partition messages are received.
*
+ * @param resTopVer Exchange result version.
* @param discoEvt Discovery event for which we detect lost partitions.
* @return {@code True} if partitions state got updated.
*/
- public boolean detectLostPartitions(DiscoveryEvent discoEvt);
+ public boolean detectLostPartitions(AffinityTopologyVersion resTopVer, DiscoveryEvent discoEvt);
/**
* Resets the state of all LOST partitions to OWNING.
+ *
+ * @param resTopVer Exchange result version.
*/
- public void resetLostPartitions();
+ public void resetLostPartitions(AffinityTopologyVersion resTopVer);
/**
* @return Collection of lost partitions, if any.
@@ -349,5 +362,5 @@ public interface GridDhtPartitionTopology {
* @param assignment New affinity assignment.
* @param updateRebalanceVer {@code True} if need check rebalance state.
*/
- public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer);
+ public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, AffinityAssignment assignment, boolean updateRebalanceVer);
}