You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/21 10:22:26 UTC
[03/10] ignite git commit: ignite-6124 Merge exchanges for multiple
discovery events
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index acc4dbe..a164e85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -99,6 +100,17 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@GridDirectTransient
private transient boolean compress;
+ /** */
+ private AffinityTopologyVersion resTopVer;
+
+ /** */
+ @GridDirectMap(keyType = Integer.class, valueType = CacheGroupAffinityMessage.class)
+ private Map<Integer, CacheGroupAffinityMessage> joinedNodeAff;
+
+ /** */
+ @GridDirectMap(keyType = Integer.class, valueType = CacheGroupAffinityMessage.class)
+ private Map<Integer, CacheGroupAffinityMessage> idealAffDiff;
+
/**
* Required by {@link Externalizable}.
*/
@@ -128,6 +140,83 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
/** {@inheritDoc} */
+ @Override void copyStateTo(GridDhtPartitionsAbstractMessage msg) {
+ super.copyStateTo(msg);
+
+ GridDhtPartitionsFullMessage cp = (GridDhtPartitionsFullMessage)msg;
+
+ cp.parts = parts;
+ cp.dupPartsData = dupPartsData;
+ cp.partsBytes = partsBytes;
+ cp.partCntrs = partCntrs;
+ cp.partCntrsBytes = partCntrsBytes;
+ cp.partHistSuppliers = partHistSuppliers;
+ cp.partHistSuppliersBytes = partHistSuppliersBytes;
+ cp.partsToReload = partsToReload;
+ cp.partsToReloadBytes = partsToReloadBytes;
+ cp.topVer = topVer;
+ cp.errs = errs;
+ cp.errsBytes = errsBytes;
+ cp.compress = compress;
+ cp.resTopVer = resTopVer;
+ cp.joinedNodeAff = joinedNodeAff;
+ cp.idealAffDiff = idealAffDiff;
+ }
+
+ /**
+ * @return Message copy.
+ */
+ GridDhtPartitionsFullMessage copy() {
+ GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage();
+
+ copyStateTo(cp);
+
+ return cp;
+ }
+
+ /**
+ * @param resTopVer Result topology version.
+ */
+ public void resultTopologyVersion(AffinityTopologyVersion resTopVer) {
+ this.resTopVer = resTopVer;
+ }
+
+ /**
+ * @return Result topology version.
+ */
+ public AffinityTopologyVersion resultTopologyVersion() {
+ return resTopVer;
+ }
+
+ /**
+ * @return Caches affinity for joining nodes.
+ */
+ @Nullable public Map<Integer, CacheGroupAffinityMessage> joinedNodeAffinity() {
+ return joinedNodeAff;
+ }
+
+ /**
+ * @param joinedNodeAff Caches affinity for joining nodes.
+ */
+ void joinedNodeAffinity(Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) {
+ this.joinedNodeAff = joinedNodeAff;
+ }
+
+ /**
+ * @return Difference with ideal affinity.
+ */
+ @Nullable public Map<Integer, CacheGroupAffinityMessage> idealAffinityDiff() {
+ return idealAffDiff;
+ }
+
+ /**
+ * @param idealAffDiff Difference with ideal affinity.
+ */
+ void idealAffinityDiff(Map<Integer, CacheGroupAffinityMessage> idealAffDiff) {
+ this.idealAffDiff = idealAffDiff;
+ }
+
+ /** {@inheritDoc} */
@Override public int handlerId() {
return 0;
}
@@ -243,11 +332,11 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- boolean marshal = (parts != null && partsBytes == null) ||
- (partCntrs != null && partCntrsBytes == null) ||
+ boolean marshal = (!F.isEmpty(parts) && partsBytes == null) ||
+ (partCntrs != null && !partCntrs.empty() && partCntrsBytes == null) ||
(partHistSuppliers != null && partHistSuppliersBytes == null) ||
(partsToReload != null && partsToReloadBytes == null) ||
- (errs != null && errsBytes == null);
+ (!F.isEmpty(errs) && errsBytes == null);
if (marshal) {
byte[] partsBytes0 = null;
@@ -256,10 +345,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
byte[] partsToReloadBytes0 = null;
byte[] errsBytes0 = null;
- if (parts != null && partsBytes == null)
+ if (!F.isEmpty(parts) && partsBytes == null)
partsBytes0 = U.marshal(ctx, parts);
- if (partCntrs != null && partCntrsBytes == null)
+ if (partCntrs != null && !partCntrs.empty() && partCntrsBytes == null)
partCntrsBytes0 = U.marshal(ctx, partCntrs);
if (partHistSuppliers != null && partHistSuppliersBytes == null)
@@ -268,7 +357,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partsToReload != null && partsToReloadBytes == null)
partsToReloadBytes0 = U.marshal(ctx, partsToReload);
- if (errs != null && errsBytes == null)
+ if (!F.isEmpty(errs) && errsBytes == null)
errsBytes0 = U.marshal(ctx, errs);
if (compress) {
@@ -420,30 +509,48 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
writer.incrementState();
case 7:
- if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+ if (!writer.writeMap("idealAffDiff", idealAffDiff, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 8:
- if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
+ if (!writer.writeMap("joinedNodeAff", joinedNodeAff, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 9:
- if (!writer.writeByteArray("partsBytes", partsBytes))
+ if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
return false;
writer.incrementState();
case 10:
- if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
+ if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
return false;
writer.incrementState();
case 11:
+ if (!writer.writeByteArray("partsBytes", partsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
+ if (!writer.writeMessage("resTopVer", resTopVer))
+ return false;
+
+ writer.incrementState();
+
+ case 14:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -482,7 +589,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 7:
- partCntrsBytes = reader.readByteArray("partCntrsBytes");
+ idealAffDiff = reader.readMap("idealAffDiff", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
if (!reader.isLastRead())
return false;
@@ -490,7 +597,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 8:
- partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
+ joinedNodeAff = reader.readMap("joinedNodeAff", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
if (!reader.isLastRead())
return false;
@@ -498,7 +605,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 9:
- partsBytes = reader.readByteArray("partsBytes");
+ partCntrsBytes = reader.readByteArray("partCntrsBytes");
if (!reader.isLastRead())
return false;
@@ -506,7 +613,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 10:
- partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
+ partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
if (!reader.isLastRead())
return false;
@@ -514,6 +621,30 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 11:
+ partsBytes = reader.readByteArray("partsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
+ resTopVer = reader.readMessage("resTopVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 14:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -533,7 +664,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 12;
+ return 15;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index b4d25c4..bc7d314 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -17,12 +17,14 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+import java.util.Collection;
import java.util.Map;
import java.util.HashMap;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.io.Externalizable;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -88,6 +90,16 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
@GridDirectTransient
private transient boolean compress;
+ /** */
+ @GridDirectCollection(Integer.class)
+ private Collection<Integer> grpsAffRequest;
+
+ /**
+ * Exchange finish message, sent to new coordinator when it tries to
+ * restore state after previous coordinator failed during exchange.
+ */
+ private GridDhtPartitionsFullMessage finishMsg;
+
/**
* Required by {@link Externalizable}.
*/
@@ -111,6 +123,34 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
this.compress = compress;
}
+ /**
+ * @param finishMsg Exchange finish message (used to restore exchange state on new coordinator).
+ */
+ void finishMessage(GridDhtPartitionsFullMessage finishMsg) {
+ this.finishMsg = finishMsg;
+ }
+
+ /**
+ * @return Exchange finish message (used to restore exchange state on new coordinator).
+ */
+ GridDhtPartitionsFullMessage finishMessage() {
+ return finishMsg;
+ }
+
+ /**
+ * @param grpsAffRequest Cache groups to get affinity for (affinity is requested when node joins cluster).
+ */
+ void cacheGroupsAffinityRequest(Collection<Integer> grpsAffRequest) {
+ this.grpsAffRequest = grpsAffRequest;
+ }
+
+ /**
+ * @return Cache groups to get affinity for (affinity is requested when node joins cluster).
+ */
+ @Nullable public Collection<Integer> cacheGroupsAffinityRequest() {
+ return grpsAffRequest;
+ }
+
/** {@inheritDoc} */
@Override public int handlerId() {
return 0;
@@ -374,18 +414,30 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
writer.incrementState();
case 8:
- if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+ if (!writer.writeMessage("finishMsg", finishMsg))
return false;
writer.incrementState();
case 9:
- if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes))
+ if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, MessageCollectionItemType.INT))
return false;
writer.incrementState();
case 10:
+ if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
if (!writer.writeByteArray("partsBytes", partsBytes))
return false;
@@ -432,7 +484,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
case 8:
- partCntrsBytes = reader.readByteArray("partCntrsBytes");
+ finishMsg = reader.readMessage("finishMsg");
if (!reader.isLastRead())
return false;
@@ -440,7 +492,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
case 9:
- partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes");
+ grpsAffRequest = reader.readCollection("grpsAffRequest", MessageCollectionItemType.INT);
if (!reader.isLastRead())
return false;
@@ -448,6 +500,22 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
case 10:
+ partCntrsBytes = reader.readByteArray("partCntrsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
partsBytes = reader.readByteArray("partsBytes");
if (!reader.isLastRead())
@@ -467,7 +535,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 11;
+ return 13;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
index 4b80ee0..6317fbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
@@ -33,6 +33,9 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private GridDhtPartitionExchangeId restoreExchId;
+
/**
* Required by {@link Externalizable}.
*/
@@ -47,6 +50,28 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
super(id, null);
}
+ /**
+ * @param msgExchId Exchange ID for message.
+ * @param restoreExchId Initial exchange ID for current exchange.
+ * @return Message.
+ */
+ static GridDhtPartitionsSingleRequest restoreStateRequest(GridDhtPartitionExchangeId msgExchId, GridDhtPartitionExchangeId restoreExchId) {
+ GridDhtPartitionsSingleRequest msg = new GridDhtPartitionsSingleRequest(msgExchId);
+
+ msg.restoreState(true);
+
+ msg.restoreExchId = restoreExchId;
+
+ return msg;
+ }
+
+ /**
+ * @return ID of current exchange on new coordinator.
+ */
+ GridDhtPartitionExchangeId restoreExchangeId() {
+ return restoreExchId;
+ }
+
/** {@inheritDoc} */
@Override public int handlerId() {
return 0;
@@ -71,6 +96,15 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
writer.onHeaderWritten();
}
+ switch (writer.state()) {
+ case 5:
+ if (!writer.writeMessage("restoreExchId", restoreExchId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
return true;
}
@@ -84,6 +118,17 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
if (!super.readFrom(buf, reader))
return false;
+ switch (reader.state()) {
+ case 5:
+ restoreExchId = reader.readMessage("restoreExchId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
return reader.afterMessageRead(GridDhtPartitionsSingleRequest.class);
}
@@ -94,7 +139,7 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 5;
+ return 6;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 305da92..c8d1041 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -29,7 +29,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
@@ -173,29 +172,31 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) {
- supplier.onTopologyChanged(lastFut.topologyVersion());
+ supplier.onTopologyChanged(lastFut.initialVersion());
demander.onTopologyChanged(lastFut);
}
/** {@inheritDoc} */
@Override public GridDhtPreloaderAssignments assign(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut) {
+ assert exchFut == null || exchFut.isDone();
+
// No assignments for disabled preloader.
GridDhtPartitionTopology top = grp.topology();
if (!grp.rebalanceEnabled())
- return new GridDhtPreloaderAssignments(exchId, top.topologyVersion());
+ return new GridDhtPreloaderAssignments(exchId, top.readyTopologyVersion());
int partCnt = grp.affinity().partitions();
- assert exchFut == null || exchFut.topologyVersion().equals(top.topologyVersion()) :
- "Topology version mismatch [exchId=" + exchId +
- ", grp=" + grp.name() +
- ", topVer=" + top.topologyVersion() + ']';
+ AffinityTopologyVersion topVer = top.readyTopologyVersion();
- GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchId, top.topologyVersion());
+ assert exchFut == null || exchFut.context().events().topologyVersion().equals(top.readyTopologyVersion()) :
+ "Topology version mismatch [exchId=" + exchId +
+ ", grp=" + grp.name() +
+ ", topVer=" + top.readyTopologyVersion() + ']';
- AffinityTopologyVersion topVer = assigns.topologyVersion();
+ GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchId, topVer);
AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
@@ -242,7 +243,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
if (msg == null) {
assigns.put(histSupplier, msg = new GridDhtPartitionDemandMessage(
top.updateSequence(),
- exchId.topologyVersion(),
+ assigns.topologyVersion(),
grp.groupId()));
}
@@ -306,12 +307,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
GridDhtPartitionDemandMessage msg = assigns.get(n);
- if (msg == null) {
- assigns.put(n, msg = new GridDhtPartitionDemandMessage(
- top.updateSequence(),
- exchId.topologyVersion(),
- grp.groupId()));
- }
+ if (msg == null) {
+ assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+ top.updateSequence(),
+ assigns.topologyVersion(),
+ grp.groupId()));
+ }
msg.addPartition(p, false);
}
@@ -612,8 +613,15 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
}
finally {
- if (!partsToEvict.isEmpty())
- locked = true;
+ if (!partsToEvict.isEmpty()) {
+ if (ctx.kernalContext().isStopping()) {
+ partsToEvict.clear();
+
+ locked = false;
+ }
+ else
+ locked = true;
+ }
else {
boolean res = partsEvictOwning.compareAndSet(1, 0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
index 9db80ae..dc2fbf8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
@@ -35,6 +35,13 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
private Map<Integer, Map<Integer, T2<Long, Long>>> map;
/**
+ * @return {@code True} if map is empty.
+ */
+ public synchronized boolean empty() {
+ return map == null || map.isEmpty();
+ }
+
+ /**
* @param cacheId Cache ID.
* @param cntrMap Counters map.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
new file mode 100644
index 0000000..42ce9b9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.*;
+
+/**
+ *
+ */
+public class InitNewCoordinatorFuture extends GridCompoundFuture {
+ /** */
+ private GridDhtPartitionsFullMessage fullMsg;
+
+ /** */
+ private Set<UUID> awaited = new HashSet<>();
+
+ /** */
+ private Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs = new HashMap<>();
+
+ /** */
+ private Map<UUID, GridDhtPartitionsSingleMessage> joinExchMsgs;
+
+ /** */
+ private GridFutureAdapter restoreStateFut;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private AffinityTopologyVersion initTopVer;
+
+ /** */
+ private Map<UUID, GridDhtPartitionExchangeId> joinedNodes;
+
+ /** */
+ private boolean restoreState;
+
+ /**
+ * @param cctx Context.
+ */
+ InitNewCoordinatorFuture(GridCacheSharedContext cctx) {
+ this.log = cctx.logger(getClass());
+ }
+
+ /**
+ * @param exchFut Current future.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void init(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
+ initTopVer = exchFut.initialVersion();
+
+ GridCacheSharedContext cctx = exchFut.sharedContext();
+
+ restoreState = exchangeProtocolVersion(exchFut.context().events().discoveryCache().minimumNodeVersion()) > 1;
+
+ boolean newAff = exchFut.localJoinExchange();
+
+ IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(exchFut, newAff);
+
+ if (fut != null)
+ add(fut);
+
+ if (restoreState) {
+ DiscoCache curDiscoCache = cctx.discovery().discoCache();
+
+ DiscoCache discoCache = exchFut.events().discoveryCache();
+
+ List<ClusterNode> nodes = new ArrayList<>();
+
+ synchronized (this) {
+ for (ClusterNode node : discoCache.allNodes()) {
+ if (!node.isLocal() && cctx.discovery().alive(node)) {
+ awaited.add(node.id());
+
+ nodes.add(node);
+ }
+ }
+
+ if (exchFut.context().mergeExchanges() && !curDiscoCache.version().equals(discoCache.version())) {
+ for (ClusterNode node : curDiscoCache.allNodes()) {
+ if (discoCache.node(node.id()) == null) {
+ if (exchangeProtocolVersion(node.version()) == 1)
+ break;
+
+ awaited.add(node.id());
+
+ nodes.add(node);
+
+ if (joinedNodes == null)
+ joinedNodes = new HashMap<>();
+
+ GridDhtPartitionExchangeId exchId = new GridDhtPartitionExchangeId(node.id(),
+ EVT_NODE_JOINED,
+ new AffinityTopologyVersion(node.order()));
+
+ joinedNodes.put(node.id(), exchId);
+ }
+ }
+ }
+
+ if (joinedNodes == null)
+ joinedNodes = Collections.emptyMap();
+
+ if (!awaited.isEmpty()) {
+ restoreStateFut = new GridFutureAdapter();
+
+ add(restoreStateFut);
+ }
+ }
+
+ log.info("Try restore exchange result [allNodes=" + awaited +
+ ", joined=" + joinedNodes.keySet() + ']');
+
+ if (!nodes.isEmpty()) {
+ GridDhtPartitionsSingleRequest req = GridDhtPartitionsSingleRequest.restoreStateRequest(exchFut.exchangeId(),
+ exchFut.exchangeId());
+
+ for (ClusterNode node : nodes) {
+ try {
+ GridDhtPartitionsSingleRequest sndReq = req;
+
+ if (joinedNodes.containsKey(node.id())) {
+ sndReq = GridDhtPartitionsSingleRequest.restoreStateRequest(
+ joinedNodes.get(node.id()),
+ exchFut.exchangeId());
+ }
+
+ cctx.io().send(node, sndReq, GridIoPolicy.SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send partitions request, node failed: " + node);
+
+ onNodeLeft(node.id());
+ }
+ }
+ }
+ }
+
+ markInitialized();
+ }
+
+ boolean restoreState() {
+ return restoreState;
+ }
+
+ /**
+ * @return Received messages.
+ */
+ Map<ClusterNode, GridDhtPartitionsSingleMessage> messages() {
+ return msgs;
+ }
+
+ /**
+ * @return Full message is some of nodes received it from previous coordinator.
+ */
+ GridDhtPartitionsFullMessage fullMessage() {
+ return fullMsg;
+ }
+
+ /**
+ * @param node Node.
+ * @param msg Message.
+ */
+ public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
+ log.info("Init new coordinator, received response [node=" + node.id() +
+ ", fullMsg=" + (msg.finishMessage() != null) +
+ ", affReq=" + !F.isEmpty(msg.cacheGroupsAffinityRequest()) + ']');
+
+ assert msg.restoreState() : msg;
+
+ boolean done = false;
+
+ synchronized (this) {
+ if (awaited.remove(node.id())) {
+ GridDhtPartitionsFullMessage fullMsg0 = msg.finishMessage();
+
+ if (fullMsg0 != null) {
+ assert fullMsg == null || fullMsg.resultTopologyVersion().equals(fullMsg0.resultTopologyVersion());
+
+ fullMsg = fullMsg0;
+ }
+ else
+ msgs.put(node, msg);
+
+ done = awaited.isEmpty();
+ }
+
+ if (done)
+ onAllReceived();
+ }
+
+ if (done)
+ restoreStateFut.onDone();
+ }
+
+ /**
+ *
+ */
+ private void onAllReceived() {
+ if (fullMsg != null) {
+ AffinityTopologyVersion resVer = fullMsg.resultTopologyVersion();
+
+ for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>> it = msgs.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e = it.next();
+
+ GridDhtPartitionExchangeId msgVer = joinedNodes.get(e.getKey().id());
+
+ if (msgVer != null) {
+ assert msgVer.topologyVersion().compareTo(initTopVer) > 0 : msgVer;
+
+ log.info("Process joined node message [resVer=" + resVer +
+ ", initTopVer=" + initTopVer +
+ ", msgVer=" + msgVer.topologyVersion() + ']');
+
+ if (msgVer.topologyVersion().compareTo(resVer) > 0)
+ it.remove();
+ else
+ e.getValue().exchangeId(msgVer);
+ }
+ }
+ }
+ else {
+ for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>> it = msgs.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e = it.next();
+
+ GridDhtPartitionExchangeId msgVer = joinedNodes.get(e.getKey().id());
+
+ if (msgVer != null) {
+ it.remove();
+
+ assert msgVer.topologyVersion().compareTo(initTopVer) > 0 : msgVer;
+
+ log.info("Process joined node message [initTopVer=" + initTopVer +
+ ", msgVer=" + msgVer.topologyVersion() + ']');
+
+ if (joinExchMsgs == null)
+ joinExchMsgs = new HashMap<>();
+
+ e.getValue().exchangeId(msgVer);
+
+ joinExchMsgs.put(e.getKey().id(), e.getValue());
+ }
+ }
+
+ }
+ }
+
+ @Nullable GridDhtPartitionsSingleMessage joinExchangeMessage(UUID nodeId) {
+ return joinExchMsgs != null ? joinExchMsgs.get(nodeId) : null;
+ }
+
+ /**
+ * @param nodeId Failed node ID.
+ */
+ public void onNodeLeft(UUID nodeId) {
+ log.info("Init new coordinator, node left [node=" + nodeId + ']');
+
+ boolean done;
+
+ synchronized (this) {
+ done = awaited.remove(nodeId) && awaited.isEmpty();
+ }
+
+ if (done)
+ restoreStateFut.onDone();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
index 9a76a8e..7e473be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
@@ -35,6 +35,11 @@ public class RebalanceReassignExchangeTask implements CachePartitionExchangeWork
this.exchId = exchId;
}
+ /** {@inheritDoc} */
+ @Override public boolean skipForExchangeMerge() {
+ return true;
+ }
+
/**
* @return Exchange ID.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index b27591e..428355c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -323,7 +323,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
* @return Near entries.
*/
public Set<Cache.Entry<K, V>> nearEntries() {
- final AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+ final AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion();
return super.entrySet(new CacheEntryPredicateAdapter() {
@Override public boolean apply(GridCacheEntryEx entry) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index a49812e..9d9c682 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -333,7 +333,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
remapKeys.add(key);
}
- AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx();
+ AffinityTopologyVersion updTopVer = cctx.shared().exchange().readyAffinityVersion();
assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " +
"not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
@@ -626,7 +626,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
return true;
}
else {
- boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
+ boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().lastTopologyChangeVersion());
// Entry not found, do not continue search if topology did not change and there is no store.
return !cctx.readThroughConfigured() && (topStable || partitionOwned(part));
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index db030b0..bb71337 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -787,7 +787,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
if (topVer != null) {
for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()) {
- if (fut.topologyVersion().equals(topVer)){
+ if (fut.exchangeDone() && fut.topologyVersion().equals(topVer)){
Throwable err = fut.validateCache(cctx, recovery, read, null, keys);
if (err != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 1995e2e..69d0940 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -554,6 +554,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
tx.subjectId(),
tx.taskNameHash(),
m.clientFirst(),
+ txNodes.size() == 1,
tx.activeCachesDeploymentEnabled());
for (IgniteTxEntry txEntry : writes) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index b763f02..2c23a7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -540,6 +540,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
tx.subjectId(),
tx.taskNameHash(),
m.clientFirst(),
+ true,
tx.activeCachesDeploymentEnabled());
for (IgniteTxEntry txEntry : m.entries()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index db065a7..d017d7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -211,6 +211,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
tx.subjectId(),
tx.taskNameHash(),
false,
+ true,
tx.activeCachesDeploymentEnabled());
for (IgniteTxEntry txEntry : writes) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 875f397..7deceb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -54,6 +54,9 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** */
private static final int EXPLICIT_LOCK_FLAG_MASK = 0x08;
+ /** */
+ private static final int ALLOW_WAIT_TOP_FUT_FLAG_MASK = 0x10;
+
/** Future ID. */
private IgniteUuid futId;
@@ -97,6 +100,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
* @param subjId Subject ID.
* @param taskNameHash Task name hash.
* @param firstClientReq {@code True} if first optimistic tx prepare request sent from client node.
+ * @param {@code True} if it is safe for first client request to wait for topology future.
* @param addDepInfo Deployment info flag.
*/
public GridNearTxPrepareRequest(
@@ -116,6 +120,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
@Nullable UUID subjId,
int taskNameHash,
boolean firstClientReq,
+ boolean allowWaitTopFut,
boolean addDepInfo
) {
super(tx,
@@ -140,6 +145,15 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
setFlag(implicitSingle, IMPLICIT_SINGLE_FLAG_MASK);
setFlag(explicitLock, EXPLICIT_LOCK_FLAG_MASK);
setFlag(firstClientReq, FIRST_CLIENT_REQ_FLAG_MASK);
+ setFlag(allowWaitTopFut, ALLOW_WAIT_TOP_FUT_FLAG_MASK);
+ }
+
+ /**
+ * @return {@code True} if it is safe for first client request to wait for topology future
+ * completion.
+ */
+ public boolean allowWaitTopologyFuture() {
+ return isFlag(ALLOW_WAIT_TOP_FUT_FLAG_MASK);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 3ef9e61..e32ecc6 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -721,7 +721,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** {@inheritDoc} */
@Override public void beforeExchange(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
- DiscoveryEvent discoEvt = fut.discoveryEvent();
+ DiscoveryEvent discoEvt = fut.firstEvent();
boolean joinEvt = discoEvt.type() == EventType.EVT_NODE_JOINED;
@@ -737,7 +737,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (cctx.kernalContext().query().moduleEnabled()) {
for (GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) {
- if (cacheCtx.startTopologyVersion().equals(fut.topologyVersion()) &&
+ if (cacheCtx.startTopologyVersion().equals(fut.initialVersion()) &&
!cctx.pageStore().hasIndexStore(cacheCtx.groupId()) && cacheCtx.affinityNode()) {
final int cacheId = cacheCtx.cacheId();
@@ -1558,6 +1558,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
long partMetaId = pageMem.partitionMetaPageId(grpId, i);
long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
+
try {
long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
@@ -1622,6 +1623,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
* @param cacheCtx Cache context to apply an update.
* @param dataEntry Data entry to apply.
+ * @throws IgniteCheckedException If failed to restore.
*/
private void applyUpdate(GridCacheContext cacheCtx, DataEntry dataEntry) throws IgniteCheckedException {
int partId = dataEntry.partitionId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 87eed82..023c03c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -668,7 +668,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
this.cctx = cctx;
this.part = part;
- nodes = fallbacks(cctx.discovery().topologyVersionEx());
+ nodes = fallbacks(cctx.shared().exchange().readyAffinityVersion());
if (F.isEmpty(nodes))
throw new ClusterTopologyException("Failed to execute the query " +
@@ -826,7 +826,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
if (retryFut != null)
retryFut.get();
- nodes = fallbacks(unreservedTopVer == null ? cctx.discovery().topologyVersionEx() : unreservedTopVer);
+ nodes = fallbacks(unreservedTopVer == null ? cctx.shared().exchange().readyAffinityVersion() : unreservedTopVer);
unreservedTopVer = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index eccb9c1..4d85db5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -478,7 +478,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
return topVer;
}
- return cctx.exchange().topologyVersion();
+ return cctx.exchange().readyAffinityVersion();
}
return res;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index bb85a1e..beeb184 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
@@ -74,6 +75,7 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFutureCancelledException;
+import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
@@ -109,10 +111,10 @@ public class IgniteTxHandler {
private GridCacheSharedContext<?, ?> ctx;
/**
- * @param nearNodeId Node ID.
+ * @param nearNodeId Sender node ID.
* @param req Request.
*/
- private void processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) {
+ private void processNearTxPrepareRequest(UUID nearNodeId, GridNearTxPrepareRequest req) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() +
", node=" + nearNodeId + ']');
@@ -130,7 +132,29 @@ public class IgniteTxHandler {
return;
}
- IgniteInternalFuture<GridNearTxPrepareResponse> fut = prepareNearTx(nearNode, req);
+ processNearTxPrepareRequest0(nearNode, req);
+ }
+
+ /**
+ * @param nearNode Sender node.
+ * @param req Request.
+ */
+ private void processNearTxPrepareRequest0(ClusterNode nearNode, GridNearTxPrepareRequest req) {
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut;
+
+ if (req.firstClientRequest() && req.allowWaitTopologyFuture()) {
+ for (;;) {
+ if (waitForExchangeFuture(nearNode, req))
+ return;
+
+ fut = prepareNearTx(nearNode, req);
+
+ if (fut != null)
+ break;
+ }
+ }
+ else
+ fut = prepareNearTx(nearNode, req);
assert req.txState() != null || fut == null || fut.error() != null ||
(ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null);
@@ -303,9 +327,9 @@ public class IgniteTxHandler {
/**
* @param nearNode Node that initiated transaction.
* @param req Near prepare request.
- * @return Prepare future.
+ * @return Prepare future or {@code null} if need retry operation.
*/
- private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
+ @Nullable private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
final ClusterNode nearNode,
final GridNearTxPrepareRequest req
) {
@@ -348,55 +372,90 @@ public class IgniteTxHandler {
top = firstEntry.context().topology();
top.readLock();
+
+ if (req.allowWaitTopologyFuture()) {
+ GridDhtTopologyFuture topFut = top.topologyVersionFuture();
+
+ if (!topFut.isDone()) {
+ top.readUnlock();
+
+ return null;
+ }
+ }
}
try {
- if (top != null && needRemap(req.topologyVersion(), top.topologyVersion(), req)) {
- if (txPrepareMsgLog.isDebugEnabled()) {
- txPrepareMsgLog.debug("Topology version mismatch for near prepare, need remap transaction [" +
- "txId=" + req.version() +
- ", node=" + nearNode.id() +
- ", reqTopVer=" + req.topologyVersion() +
- ", locTopVer=" + top.topologyVersion() +
- ", req=" + req + ']');
- }
+ if (top != null ) {
+ boolean retry = false;
- GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
- req.partition(),
- req.version(),
- req.futureId(),
- req.miniId(),
- req.version(),
- req.version(),
- null,
- null,
- top.topologyVersion(),
- req.onePhaseCommit(),
- req.deployInfo() != null);
+ GridDhtTopologyFuture topFut = top.topologyVersionFuture();
- try {
- ctx.io().send(nearNode, res, req.policy());
+ if (!req.allowWaitTopologyFuture() && !topFut.isDone()) {
+ retry = true;
if (txPrepareMsgLog.isDebugEnabled()) {
- txPrepareMsgLog.debug("Sent remap response for near prepare [txId=" + req.version() +
- ", node=" + nearNode.id() + ']');
+ txPrepareMsgLog.debug("Topology change is in progress, need remap transaction [" +
+ "txId=" + req.version() +
+ ", node=" + nearNode.id() +
+ ", reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.readyTopologyVersion() +
+ ", req=" + req + ']');
}
}
- catch (ClusterTopologyCheckedException ignored) {
+
+ if (!retry && needRemap(req.topologyVersion(), top.readyTopologyVersion(), req)) {
+ retry = true;
+
if (txPrepareMsgLog.isDebugEnabled()) {
- txPrepareMsgLog.debug("Failed to send remap response for near prepare, node failed [" +
+ txPrepareMsgLog.debug("Topology version mismatch for near prepare, need remap transaction [" +
"txId=" + req.version() +
- ", node=" + nearNode.id() + ']');
+ ", node=" + nearNode.id() +
+ ", reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.readyTopologyVersion() +
+ ", req=" + req + ']');
}
}
- catch (IgniteCheckedException e) {
- U.error(txPrepareMsgLog, "Failed to send remap response for near prepare " +
- "[txId=" + req.version() +
- ", node=" + nearNode.id() +
- ", req=" + req + ']', e);
+
+ if (retry) {
+ GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.version(),
+ req.version(),
+ null,
+ null,
+ top.lastTopologyChangeVersion(),
+ req.onePhaseCommit(),
+ req.deployInfo() != null);
+
+ try {
+ ctx.io().send(nearNode, res, req.policy());
+
+ if (txPrepareMsgLog.isDebugEnabled()) {
+ txPrepareMsgLog.debug("Sent remap response for near prepare [txId=" + req.version() +
+ ", node=" + nearNode.id() + ']');
+ }
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ if (txPrepareMsgLog.isDebugEnabled()) {
+ txPrepareMsgLog.debug("Failed to send remap response for near prepare, node failed [" +
+ "txId=" + req.version() +
+ ", node=" + nearNode.id() + ']');
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(txPrepareMsgLog, "Failed to send remap response for near prepare " +
+ "[txId=" + req.version() +
+ ", node=" + nearNode.id() +
+ ", req=" + req + ']', e);
+ }
+
+ return new GridFinishedFuture<>(res);
}
- return new GridFinishedFuture<>(res);
+ assert topFut.isDone();
}
tx = new GridDhtTxLocal(
@@ -499,6 +558,53 @@ public class IgniteTxHandler {
}
/**
+ * @param node Sender node.
+ * @param req Request.
+ * @return {@code True} if update will be retried from future listener.
+ */
+ private boolean waitForExchangeFuture(final ClusterNode node, final GridNearTxPrepareRequest req) {
+ assert req.firstClientRequest() : req;
+
+ GridDhtTopologyFuture topFut = ctx.exchange().lastTopologyFuture();
+
+ if (!topFut.isDone()) {
+ Thread curThread = Thread.currentThread();
+
+ if (curThread instanceof IgniteThread) {
+ final IgniteThread thread = (IgniteThread)curThread;
+
+ if (thread.cachePoolThread()) {
+ topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ ctx.kernalContext().closure().runLocalWithThreadPolicy(thread, new Runnable() {
+ @Override public void run() {
+ try {
+ processNearTxPrepareRequest0(node, req);
+ }
+ finally {
+ ctx.io().onMessageProcessed(req);
+ }
+ }
+ });
+ }
+ });
+
+ return true;
+ }
+ }
+
+ try {
+ topFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Topology future failed: " + e, e);
+ }
+ }
+
+ return false;
+ }
+
+ /**
* @param expVer Expected topology version.
* @param curVer Current topology version.
* @param req Request.
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 01207e3..2ecea07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -69,6 +69,7 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.resources.LoadBalancerResource;
+import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -502,7 +503,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
final String cacheName = F.first(cacheNames);
- final AffinityTopologyVersion mapTopVer = ctx.discovery().topologyVersionEx();
+ final AffinityTopologyVersion mapTopVer = ctx.cache().context().exchange().readyAffinityVersion();
final ClusterNode node = ctx.affinity().mapPartitionToNode(cacheName, partId, mapTopVer);
if (node == null)
@@ -542,7 +543,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
final String cacheName = F.first(cacheNames);
- final AffinityTopologyVersion mapTopVer = ctx.discovery().topologyVersionEx();
+ final AffinityTopologyVersion mapTopVer = ctx.cache().context().exchange().readyAffinityVersion();
final ClusterNode node = ctx.affinity().mapPartitionToNode(cacheName, partId, mapTopVer);
if (node == null)
@@ -779,13 +780,22 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- * @param c Closure to execute.
- * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used.
- * @return Future.
- * @throws IgniteCheckedException Thrown in case of any errors.
+ * @param thread Thread.
+ * @param c Closure.
*/
- private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, boolean sys) throws IgniteCheckedException {
- return runLocal(c, sys ? GridIoPolicy.SYSTEM_POOL : GridIoPolicy.PUBLIC_POOL);
+ public void runLocalWithThreadPolicy(IgniteThread thread, Runnable c) {
+ assert thread.stripe() >= 0 || thread.policy() != GridIoPolicy.UNDEFINED : thread;
+
+ if (thread.stripe() >= 0)
+ ctx.getStripedExecutorService().execute(thread.stripe(), c);
+ else {
+ try {
+ ctx.pools().poolForPolicy(thread.policy()).execute(c);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to get pool for policy: " + thread.policy(), e);
+ }
+ }
}
/**
@@ -852,8 +862,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- * Executes closure on system pool. Companion to {@link #runLocal(Runnable, boolean)} but
- * in case of rejected execution re-runs the closure in the current thread (blocking).
+ * Executes closure on system pool. In case of rejected execution re-runs the closure in the current
+ * thread (blocking).
*
* @param c Closure to execute.
* @return Future.
@@ -863,8 +873,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- * Companion to {@link #runLocal(Runnable, boolean)} but in case of rejected execution re-runs
- * the closure in the current thread (blocking).
+ * In case of rejected execution re-runs the closure in the current thread (blocking).
*
* @param c Closure to execute.
* @param sys If {@code true}, then system pool will be used, otherwise public pool will be used.
@@ -875,8 +884,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- * Companion to {@link #runLocal(Runnable, boolean)} but in case of rejected execution re-runs
- * the closure in the current thread (blocking).
+ * In case of rejected execution re-runs the closure in the current thread (blocking).
*
* @param c Closure to execute.
* @param plc Policy to choose executor pool.
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 13a889c..3ad75cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -501,7 +501,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
log.info("Sending " + prettyStr(activate) + " request from node [id=" + ctx.localNodeId() +
", topVer=" + topVer +
", client=" + ctx.clientNode() +
- ", daemon" + ctx.isDaemon() + "]");
+ ", daemon=" + ctx.isDaemon() + "]");
}
IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 84d536f..8b984c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -321,8 +321,10 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
try {
GridCacheAdapter cache = ctx.cache().internalCache(req.cacheName());
- if (cache == null)
- throw new IgniteCheckedException("Cache not created or already destroyed.");
+ if (cache == null) {
+ throw new IgniteCheckedException("Cache not created or already destroyed: " +
+ req.cacheName());
+ }
GridCacheContext cctx = cache.context();
@@ -336,18 +338,33 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
GridDhtTopologyFuture topWaitFut = null;
try {
- GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
+ Exception remapErr = null;
+
+ AffinityTopologyVersion streamerFutTopVer = null;
+
+ if (!allowOverwrite) {
+ GridDhtTopologyFuture topFut = cctx.topologyVersionFuture();
- AffinityTopologyVersion topVer = fut.topologyVersion();
+ AffinityTopologyVersion topVer = topFut.isDone() ? topFut.topologyVersion() :
+ topFut.initialVersion();
+
+ if (topVer.compareTo(req.topologyVersion()) > 0) {
+ remapErr = new ClusterTopologyCheckedException("DataStreamer will retry " +
+ "data transfer at stable topology [reqTop=" + req.topologyVersion() +
+ ", topVer=" + topFut.initialVersion() + ", node=remote]");
+ }
+ else if (!topFut.isDone())
+ topWaitFut = topFut;
+ else
+ streamerFutTopVer = topFut.topologyVersion();
+ }
- if (!allowOverwrite && !topVer.equals(req.topologyVersion())) {
- Exception err = new ClusterTopologyCheckedException(
- "DataStreamer will retry data transfer at stable topology " +
- "[reqTop=" + req.topologyVersion() + ", topVer=" + topVer + ", node=remote]");
+ if (remapErr != null) {
+ sendResponse(nodeId, topic, req.requestId(), remapErr, req.forceLocalDeployment());
- sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment());
+ return;
}
- else if (allowOverwrite || fut.isDone()) {
+ else if (topWaitFut == null) {
job = new DataStreamerUpdateJob(ctx,
log,
req.cacheName(),
@@ -357,10 +374,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
req.keepBinary(),
updater);
- waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(topVer);
+ waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(streamerFutTopVer);
}
- else
- topWaitFut = fut;
}
finally {
if (!allowOverwrite)
@@ -378,16 +393,14 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
return;
}
- if (job != null) {
- try {
- job.call();
+ try {
+ job.call();
- sendResponse(nodeId, topic, req.requestId(), null, req.forceLocalDeployment());
- }
- finally {
- if (waitFut != null)
- waitFut.onDone();
- }
+ sendResponse(nodeId, topic, req.requestId(), null, req.forceLocalDeployment());
+ }
+ finally {
+ if (waitFut != null)
+ waitFut.onDone();
}
}
catch (Throwable e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index eaced66..c11288c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -771,9 +771,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
try {
- AffinityTopologyVersion topVer = allowOverwrite() || cctx.isLocal() ?
- ctx.cache().context().exchange().readyAffinityVersion() :
- cctx.topology().topologyVersion();
+ AffinityTopologyVersion topVer;
+
+ if (!cctx.isLocal())
+ topVer = ctx.cache().context().exchange().lastTopologyFuture().get();
+ else
+ topVer = ctx.cache().context().exchange().readyAffinityVersion();
for (DataStreamerEntry entry : entries) {
List<ClusterNode> nodes;
@@ -1536,23 +1539,36 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
try {
GridCacheContext cctx = ctx.cache().internalCache(cacheName).context();
- final boolean allowOverwrite = allowOverwrite();
- final boolean loc = cctx.isLocal();
+ final boolean lockTop = !cctx.isLocal() && !allowOverwrite();
+
+ GridDhtTopologyFuture topWaitFut = null;
- if (!loc && !allowOverwrite)
+ if (lockTop)
cctx.topology().readLock();
try {
- GridDhtTopologyFuture fut = loc ? null : cctx.topologyVersionFuture();
+ AffinityTopologyVersion streamerFutTopVer = null;
- AffinityTopologyVersion topVer = loc ? reqTopVer : fut.topologyVersion();
+ if (lockTop) {
+ GridDhtTopologyFuture topFut = cctx.topologyVersionFuture();
- if (!allowOverwrite && !topVer.equals(reqTopVer)) {
- curFut.onDone(new IgniteCheckedException(
- "DataStreamer will retry data transfer at stable topology. " +
- "[reqTop=" + reqTopVer + " ,topVer=" + topVer + ", node=local]"));
+ AffinityTopologyVersion topVer = topFut.isDone() ? topFut.topologyVersion() :
+ topFut.initialVersion();
+
+ if (topVer.compareTo(reqTopVer) > 0) {
+ curFut.onDone(new IgniteCheckedException("DataStreamer will retry data transfer " +
+ "at stable topology. reqTop=" + reqTopVer + ", topVer=" + topFut.initialVersion() +
+ ", node=local]"));
+
+ return;
+ }
+ else if (!topFut.isDone())
+ topWaitFut = topFut;
+ else
+ streamerFutTopVer = topFut.topologyVersion();
}
- else if (loc || allowOverwrite || fut.isDone()) {
+
+ if (topWaitFut == null) {
IgniteInternalFuture<Object> callFut = ctx.closure().callLocalSafe(
new DataStreamerUpdateJob(
ctx,
@@ -1567,9 +1583,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
locFuts.add(callFut);
- final GridFutureAdapter waitFut = (loc || allowOverwrite) ?
- null :
- cctx.mvcc().addDataStreamerFuture(topVer);
+ final GridFutureAdapter waitFut =
+ lockTop ? cctx.mvcc().addDataStreamerFuture(streamerFutTopVer) : null;
callFut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() {
@Override public void apply(IgniteInternalFuture<Object> t) {
@@ -1590,18 +1605,20 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
});
}
- else {
- fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
- localUpdate(entries, reqTopVer, curFut, plc);
- }
- });
- }
}
finally {
- if (!loc && !allowOverwrite)
+ if (lockTop)
cctx.topology().readUnlock();
}
+
+ if (topWaitFut != null) {
+ // Need call 'listen' after topology read lock is released.
+ topWaitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
+ localUpdate(entries, reqTopVer, curFut, plc);
+ }
+ });
+ }
}
catch (Throwable ex) {
curFut.onDone(new IgniteCheckedException("DataStreamer data handling failed.", ex));
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
index 7d71a9e..fba0a4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -209,7 +209,8 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
GridDiscoveryManager discoMgr = platformCtx.kernalContext().discovery();
- AffinityTopologyVersion topVer = discoMgr.topologyVersionEx();
+ AffinityTopologyVersion topVer =
+ platformCtx.kernalContext().cache().context().exchange().lastTopologyFuture().get();
int topSize = discoMgr.cacheNodes(cacheName, topVer).size();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java
index f97f931..c16a7ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java
@@ -39,6 +39,11 @@ public class SchemaExchangeWorkerTask implements CachePartitionExchangeWorkerTas
this.msg = msg;
}
+ /** {@inheritDoc} */
+ @Override public boolean skipForExchangeMerge() {
+ return false;
+ }
+
/**
* @return Message.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java
index 79fbfcd..668a2c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java
@@ -39,6 +39,11 @@ public class SchemaNodeLeaveExchangeWorkerTask implements CachePartitionExchange
this.node = node;
}
+ /** {@inheritDoc} */
+ @Override public boolean skipForExchangeMerge() {
+ return true;
+ }
+
/**
* @return Node.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index ec95001..74fe57d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -863,8 +863,8 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
assert affCacheIds != null;
retry = true;
- mapTopVer = U.max(res.getRetryTopologyVersion(), ctx.discovery().topologyVersionEx());
- affFut = ctx.cache().context().exchange().affinityReadyFuture(mapTopVer);
+ mapTopVer = U.max(res.getRetryTopologyVersion(), ctx.cache().context().exchange().readyAffinityVersion());
+ affFut = ctx.cache().context().exchange().lastTopologyFuture();
if (affFut != null && !affFut.isDone()) {
waitForAffTop = true;
@@ -900,9 +900,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
case FAILOVER: {
if (affCacheIds != null) {
- mapTopVer = ctx.discovery().topologyVersionEx();
+ mapTopVer = ctx.cache().context().exchange().readyAffinityVersion();
- affFut = ctx.cache().context().exchange().affinityReadyFuture(mapTopVer);
+ affFut = ctx.cache().context().exchange().lastTopologyFuture();
}
if (affFut != null && !affFut.isDone()) {