You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 18:27:47 UTC
[24/31] incubator-ignite git commit: ignite-471-2: huge merge from
sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 78966d0..1d57ef7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -80,7 +80,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
private IgniteUuid futId = IgniteUuid.randomUuid();
/** Preloader. */
- private GridDhtPreloader<K, V> preloader;
+ private GridDhtPreloader preloader;
/** Trackable flag. */
private boolean trackable;
@@ -95,7 +95,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
GridCacheContext<K, V> cctx,
AffinityTopologyVersion topVer,
Collection<KeyCacheObject> keys,
- GridDhtPreloader<K, V> preloader
+ GridDhtPreloader preloader
) {
assert topVer.topologyVersion() != 0 : topVer;
assert !F.isEmpty(keys) : keys;
@@ -208,21 +208,21 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
* @return {@code True} if some mapping was added.
*/
private boolean map(Iterable<KeyCacheObject> keys, Collection<ClusterNode> exc) {
- Map<ClusterNode, Set<KeyCacheObject>> mappings = new HashMap<>();
-
- ClusterNode loc = cctx.localNode();
-
- int curTopVer = topCntr.get();
+ Map<ClusterNode, Set<KeyCacheObject>> mappings = null;
for (KeyCacheObject key : keys)
- map(key, mappings, exc);
+ mappings = map(key, mappings, exc);
if (isDone())
return false;
boolean ret = false;
- if (!mappings.isEmpty()) {
+ if (mappings != null) {
+ ClusterNode loc = cctx.localNode();
+
+ int curTopVer = topCntr.get();
+
preloader.addFuture(this);
trackable = true;
@@ -275,22 +275,27 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
* @param key Key.
* @param exc Exclude nodes.
* @param mappings Mappings.
+ * @return Mappings.
*/
- private void map(KeyCacheObject key, Map<ClusterNode, Set<KeyCacheObject>> mappings, Collection<ClusterNode> exc) {
+ private Map<ClusterNode, Set<KeyCacheObject>> map(KeyCacheObject key,
+ @Nullable Map<ClusterNode, Set<KeyCacheObject>> mappings,
+ Collection<ClusterNode> exc)
+ {
ClusterNode loc = cctx.localNode();
- int part = cctx.affinity().partition(key);
-
GridCacheEntryEx e = cctx.dht().peekEx(key);
try {
if (e != null && !e.isNewLocked()) {
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
+ int part = cctx.affinity().partition(key);
+
log.debug("Will not rebalance key (entry is not new) [cacheName=" + cctx.name() +
", key=" + key + ", part=" + part + ", locId=" + cctx.nodeId() + ']');
+ }
// Key has been rebalanced or retrieved already.
- return;
+ return mappings;
}
}
catch (GridCacheEntryRemovedException ignore) {
@@ -299,6 +304,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
", locId=" + cctx.nodeId() + ']');
}
+ int part = cctx.affinity().partition(key);
+
List<ClusterNode> owners = F.isEmpty(exc) ? top.owners(part, topVer) :
new ArrayList<>(F.view(top.owners(part, topVer), F.notIn(exc)));
@@ -308,7 +315,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
"topVer=" + topVer + ", locId=" + cctx.nodeId() + ']');
// Key is already rebalanced.
- return;
+ return mappings;
}
// Create partition.
@@ -337,9 +344,12 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
log.debug("Will not rebalance key (no nodes to request from with rebalancing disabled) [key=" +
key + ", part=" + part + ", locId=" + cctx.nodeId() + ']');
- return;
+ return mappings;
}
+ if (mappings == null)
+ mappings = U.newHashMap(keys.size());
+
Collection<KeyCacheObject> mappedKeys = F.addIfAbsent(mappings, pick, F.<KeyCacheObject>newSet());
assert mappedKeys != null;
@@ -357,6 +367,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
log.debug("Will not rebalance key (local partition is not MOVING) [cacheName=" + cctx.name() +
", key=" + key + ", part=" + locPart + ", locId=" + cctx.nodeId() + ']');
}
+
+ return mappings;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 633f237..a6e6c4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -53,12 +53,12 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
* and populating local cache.
*/
@SuppressWarnings("NonConstantFieldWithUpperCaseName")
-public class GridDhtPartitionDemandPool<K, V> {
+public class GridDhtPartitionDemandPool {
/** Dummy message to wake up a blocking queue if a node leaves. */
private final SupplyMessage DUMMY_TOP = new SupplyMessage();
/** */
- private final GridCacheContext<K, V> cctx;
+ private final GridCacheContext<?, ?> cctx;
/** */
private final IgniteLogger log;
@@ -99,7 +99,7 @@ public class GridDhtPartitionDemandPool<K, V> {
* @param cctx Cache context.
* @param busyLock Shutdown lock.
*/
- public GridDhtPartitionDemandPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) {
+ public GridDhtPartitionDemandPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
assert cctx != null;
assert busyLock != null;
@@ -108,9 +108,11 @@ public class GridDhtPartitionDemandPool<K, V> {
log = cctx.logger(getClass());
- poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
+ boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
- if (poolSize > 0) {
+ poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0;
+
+ if (enabled) {
barrier = new CyclicBarrier(poolSize);
dmdWorkers = new ArrayList<>(poolSize);
@@ -327,7 +329,7 @@ public class GridDhtPartitionDemandPool<K, V> {
* @param assigns Assignments.
* @param force {@code True} if dummy reassign.
*/
- void addAssignments(final GridDhtPreloaderAssignments<K, V> assigns, boolean force) {
+ void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
if (log.isDebugEnabled())
log.debug("Adding partition assignments: " + assigns);
@@ -399,7 +401,7 @@ public class GridDhtPartitionDemandPool<K, V> {
private int id;
/** Partition-to-node assignments. */
- private final LinkedBlockingDeque<GridDhtPreloaderAssignments<K, V>> assignQ = new LinkedBlockingDeque<>();
+ private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
/** Message queue. */
private final LinkedBlockingDeque<SupplyMessage> msgQ =
@@ -425,7 +427,7 @@ public class GridDhtPartitionDemandPool<K, V> {
/**
* @param assigns Assignments.
*/
- void addAssignments(GridDhtPreloaderAssignments<K, V> assigns) {
+ void addAssignments(GridDhtPreloaderAssignments assigns) {
assert assigns != null;
assignQ.offer(assigns);
@@ -885,7 +887,7 @@ public class GridDhtPartitionDemandPool<K, V> {
}
// Sync up all demand threads at this step.
- GridDhtPreloaderAssignments<K, V> assigns = null;
+ GridDhtPreloaderAssignments assigns = null;
while (assigns == null)
assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
@@ -995,12 +997,12 @@ public class GridDhtPartitionDemandPool<K, V> {
* @param exchFut Exchange future.
* @return Assignments of partitions to nodes.
*/
- GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) {
+ GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
// No assignments for disabled preloader.
GridDhtPartitionTopology top = cctx.dht().topology();
if (!cctx.rebalanceEnabled())
- return new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion());
+ return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
int partCnt = cctx.affinity().partitions();
@@ -1009,7 +1011,7 @@ public class GridDhtPartitionDemandPool<K, V> {
"Topology version mismatch [exchId=" + exchFut.exchangeId() +
", topVer=" + top.topologyVersion() + ']';
- GridDhtPreloaderAssignments<K, V> assigns = new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion());
+ GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
AffinityTopologyVersion topVer = assigns.topologyVersion();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
index facf7e3..faa6cf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
@@ -237,7 +237,7 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext
* @return Full string representation.
*/
public String toFullString() {
- return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", super.toString());
+ return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", map.toString());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 5d9677d..13cfef3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -43,9 +43,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
/**
* Thread pool for supplying partitions to demanding nodes.
*/
-class GridDhtPartitionSupplyPool<K, V> {
+class GridDhtPartitionSupplyPool {
/** */
- private final GridCacheContext<K, V> cctx;
+ private final GridCacheContext<?, ?> cctx;
/** */
private final IgniteLogger log;
@@ -72,7 +72,7 @@ class GridDhtPartitionSupplyPool<K, V> {
* @param cctx Cache context.
* @param busyLock Shutdown lock.
*/
- GridDhtPartitionSupplyPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) {
+ GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
assert cctx != null;
assert busyLock != null;
@@ -83,16 +83,18 @@ class GridDhtPartitionSupplyPool<K, V> {
top = cctx.dht().topology();
- int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
+ if (!cctx.kernalContext().clientNode()) {
+ int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
- for (int i = 0; i < poolSize; i++)
- workers.add(new SupplyWorker());
+ for (int i = 0; i < poolSize; i++)
+ workers.add(new SupplyWorker());
- cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
- @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
- processDemandMessage(id, m);
- }
- });
+ cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
+ @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+ processDemandMessage(id, m);
+ }
+ });
+ }
depEnabled = cctx.gridDeploy().enabled();
}
@@ -248,11 +250,6 @@ class GridDhtPartitionSupplyPool<K, V> {
boolean ack = false;
try {
- // Partition map exchange is finished which means that all near transactions with given
- // topology version are committed. We can wait for local locks here as it will not take
- // much time.
- cctx.mvcc().finishLocks(d.topologyVersion()).get();
-
for (int part : d.partitions()) {
GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4b8db00..9f18c98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -44,6 +44,8 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
/**
@@ -117,8 +119,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
private GridFutureAdapter<Boolean> initFut;
/** Topology snapshot. */
- private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot =
- new AtomicReference<>();
+ private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>();
/** Last committed cache version before next topology version use. */
private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
@@ -146,8 +147,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** Dynamic cache change requests. */
private Collection<DynamicCacheChangeRequest> reqs;
+ /** Cache validation results. */
private volatile Map<Integer, Boolean> cacheValidRes;
+ /** Skip preload flag. */
+ private boolean skipPreload;
+
/**
* Dummy future created to trigger reassignments if partition
* topology changed while preloading.
@@ -200,6 +205,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @param cctx Cache context.
* @param busyLock Busy lock.
* @param exchId Exchange ID.
+ * @param reqs Cache change requests.
*/
public GridDhtPartitionsExchangeFuture(
GridCacheSharedContext cctx,
@@ -221,16 +227,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
log = cctx.logger(getClass());
- // Grab all nodes with order of equal or less than last joined node.
- oldestNode.set(CU.oldest(cctx, exchId.topologyVersion()));
-
- assert oldestNode.get() != null;
-
initFut = new GridFutureAdapter<>();
if (log.isDebugEnabled())
- log.debug("Creating exchange future [localNode=" + cctx.localNodeId() +
- ", fut=" + this + ']');
+ log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']');
+ }
+
+ /**
+ * @param reqs Cache change requests.
+ */
+ public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) {
+ this.reqs = reqs;
}
/** {@inheritDoc} */
@@ -250,6 +257,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
+ * @return Skip preload flag.
+ */
+ public boolean skipPreload() {
+ return skipPreload;
+ }
+
+ /**
* @return Dummy flag.
*/
public boolean dummy() {
@@ -279,9 +293,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/**
* @param cacheId Cache ID to check.
+ * @param topVer Topology version.
* @return {@code True} if cache was added during this exchange.
*/
- public boolean isCacheAdded(int cacheId) {
+ public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
if (!F.isEmpty(reqs)) {
for (DynamicCacheChangeRequest req : reqs) {
if (req.start() && !req.clientStartOnly()) {
@@ -291,7 +306,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
}
- return false;
+ GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+ return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
}
/**
@@ -312,7 +329,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
- * Rechecks topology.
+ * @param cacheCtx Cache context.
+ * @throws IgniteCheckedException If failed.
*/
private void initTopology(GridCacheContext cacheCtx) throws IgniteCheckedException {
if (stopping(cacheCtx.cacheId()))
@@ -330,8 +348,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
exchId + ']');
// Fetch affinity assignment from remote node.
- GridDhtAssignmentFetchFuture fetchFut =
- new GridDhtAssignmentFetchFuture(cacheCtx, exchId.topologyVersion(), CU.affinityNodes(cacheCtx));
+ GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cacheCtx,
+ exchId.topologyVersion(),
+ CU.affinityNodes(cacheCtx, exchId.topologyVersion()));
fetchFut.init();
@@ -341,11 +360,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
log.debug("Fetched affinity from remote node, initializing affinity assignment [locNodeId=" +
cctx.localNodeId() + ", topVer=" + exchId.topologyVersion() + ']');
+ if (affAssignment == null) {
+ affAssignment = new ArrayList<>(cacheCtx.affinity().partitions());
+
+ List<ClusterNode> empty = Collections.emptyList();
+
+ for (int i = 0; i < cacheCtx.affinity().partitions(); i++)
+ affAssignment.add(empty);
+ }
+
cacheCtx.affinity().initializeAffinity(exchId.topologyVersion(), affAssignment);
}
}
/**
+ * @param cacheCtx Cache context.
* @return {@code True} if local node can calculate affinity on it's own for this partition map exchange.
*/
private boolean canCalculateAffinity(GridCacheContext cacheCtx) {
@@ -391,20 +420,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
- * @return Exchange id.
- */
- GridDhtPartitionExchangeId key() {
- return exchId;
- }
-
- /**
- * @return Oldest node.
- */
- ClusterNode oldestNode() {
- return oldestNode.get();
- }
-
- /**
* @return Exchange ID.
*/
public GridDhtPartitionExchangeId exchangeId() {
@@ -412,13 +427,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
- * @return Init future.
- */
- IgniteInternalFuture<?> initFuture() {
- return initFut;
- }
-
- /**
* @return {@code true} if entered to busy state.
*/
private boolean enterBusy() {
@@ -444,7 +452,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @throws IgniteInterruptedCheckedException If interrupted.
*/
public void init() throws IgniteInterruptedCheckedException {
- assert oldestNode.get() != null;
+ if (isDone())
+ return;
if (init.compareAndSet(false, true)) {
if (isDone())
@@ -455,10 +464,118 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
// will return corresponding nodes.
U.await(evtLatch);
+ assert discoEvt != null : this;
+ assert !dummy && !forcePreload : this;
+
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
+
+ oldestNode.set(oldest);
+
startCaches();
+ // True if client node joined or failed.
+ boolean clientNodeEvt;
+
+ if (F.isEmpty(reqs)) {
+ int type = discoEvt.type();
+
+ assert type == EVT_NODE_JOINED || type == EVT_NODE_LEFT || type == EVT_NODE_FAILED : discoEvt;
+
+ clientNodeEvt = CU.clientNode(discoEvt.eventNode());
+ }
+ else {
+ assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt;
+
+ boolean clientOnlyStart = true;
+
+ for (DynamicCacheChangeRequest req : reqs) {
+ if (!req.clientStartOnly()) {
+ clientOnlyStart = false;
+
+ break;
+ }
+ }
+
+ clientNodeEvt = clientOnlyStart;
+ }
+
+ if (clientNodeEvt) {
+ ClusterNode node = discoEvt.eventNode();
+
+ // Client need to initialize affinity for local join event or for stated client caches.
+ if (!node.isLocal()) {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
+
+ GridDhtPartitionTopology top = cacheCtx.topology();
+
+ top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
+
+ if (cacheCtx.affinity().affinityTopologyVersion() == AffinityTopologyVersion.NONE) {
+ initTopology(cacheCtx);
+
+ top.beforeExchange(this);
+ }
+ else
+ cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion());
+ }
+
+ if (exchId.isLeft())
+ cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
+
+ onDone(exchId.topologyVersion());
+
+ skipPreload = cctx.kernalContext().clientNode();
+
+ return;
+ }
+ }
+
+ if (cctx.kernalContext().clientNode()) {
+ skipPreload = true;
+
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
+
+ GridDhtPartitionTopology top = cacheCtx.topology();
+
+ top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
+ }
+
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
+
+ initTopology(cacheCtx);
+ }
+
+ if (oldestNode.get() != null) {
+ rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx,
+ exchId.topologyVersion()));
+
+ rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
+
+ ready.set(true);
+
+ initFut.onDone(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Initialized future: " + this);
+
+ sendPartitions();
+ }
+ else
+ onDone(exchId.topologyVersion());
+
+ return;
+ }
+
+ assert oldestNode.get() != null;
+
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (isCacheAdded(cacheCtx.cacheId())) {
+ if (isCacheAdded(cacheCtx.cacheId(), exchId.topologyVersion())) {
if (cacheCtx.discovery().cacheAffinityNodes(cacheCtx.name(), topologyVersion()).isEmpty())
U.quietAndWarn(log, "No server nodes found for cache client: " + cacheCtx.namex());
}
@@ -468,8 +585,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
List<String> cachesWithoutNodes = null;
- for (String name : cctx.cache().cacheNames()) {
- if (exchId.isLeft()) {
+ if (exchId.isLeft()) {
+ for (String name : cctx.cache().cacheNames()) {
if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) {
if (cachesWithoutNodes == null)
cachesWithoutNodes = new ArrayList<>();
@@ -505,7 +622,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
if (cachesWithoutNodes != null) {
- StringBuilder sb = new StringBuilder("All server nodes for the following caches have left the cluster: ");
+ StringBuilder sb =
+ new StringBuilder("All server nodes for the following caches have left the cluster: ");
for (int i = 0; i < cachesWithoutNodes.size(); i++) {
String cache = cachesWithoutNodes.get(i);
@@ -537,7 +655,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
// Grab all alive remote nodes with order of equal or less than last joined node.
- rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx,
+ rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx,
exchId.topologyVersion()));
rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
@@ -591,6 +709,28 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (exchId.isLeft())
cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
+ IgniteInternalFuture<?> locksFut = cctx.mvcc().finishLocks(exchId.topologyVersion());
+
+ while (true) {
+ try {
+ locksFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
+
+ break;
+ }
+ catch (IgniteFutureTimeoutCheckedException ignored) {
+ U.warn(log, "Failed to wait for locks release future. " +
+ "Dumping pending objects that might be the cause: " + cctx.localNodeId());
+
+ U.warn(log, "Locked entries:");
+
+ Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks =
+ cctx.mvcc().unfinishedLocks(exchId.topologyVersion());
+
+ for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet())
+ U.warn(log, "Locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']');
+ }
+ }
+
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
continue;
@@ -650,36 +790,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (log.isDebugEnabled())
log.debug("Initialized future: " + this);
- if (canSkipExchange())
- onDone(exchId.topologyVersion());
+ // If this node is not oldest.
+ if (!oldestNode.get().id().equals(cctx.localNodeId()))
+ sendPartitions();
else {
- // If this node is not oldest.
- if (!oldestNode.get().id().equals(cctx.localNodeId()))
- sendPartitions();
- else {
- boolean allReceived = allReceived();
+ boolean allReceived = allReceived();
- if (allReceived && replied.compareAndSet(false, true)) {
- if (spreadPartitions())
- onDone(exchId.topologyVersion());
- }
+ if (allReceived && replied.compareAndSet(false, true)) {
+ if (spreadPartitions())
+ onDone(exchId.topologyVersion());
}
-
- scheduleRecheck();
}
+
+ scheduleRecheck();
}
else
assert false : "Skipped init future: " + this;
}
/**
- * @return {@code True} if no distributed exchange is needed.
- */
- private boolean canSkipExchange() {
- return false; // TODO ignite-23;
- }
-
- /**
*
*/
private void dumpPendingObjects() {
@@ -755,7 +884,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @throws IgniteCheckedException If failed.
*/
private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException {
- GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last());
+ GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
+ cctx.kernalContext().clientNode(),
+ cctx.versions().last());
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.isLocal())
@@ -780,8 +911,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
id.topologyVersion());
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal())
- m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+ if (!cacheCtx.isLocal()) {
+ AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
+
+ boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0;
+
+ if (ready)
+ m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+ }
}
// It is important that client topologies be added after contexts.
@@ -839,14 +976,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** {@inheritDoc} */
@Override public boolean onDone(AffinityTopologyVersion res, Throwable err) {
- Map<Integer, Boolean> m = new HashMap<>();
+ Map<Integer, Boolean> m = null;
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name()))
+ if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name())) {
+ if (m == null)
+ m = new HashMap<>();
+
m.put(cacheCtx.cacheId(), cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes()));
+ }
}
- cacheValidRes = m;
+ cacheValidRes = m != null ? m : Collections.<Integer, Boolean>emptyMap();
cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err);
@@ -864,8 +1005,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (timeoutObj != null)
cctx.kernalContext().timeout().removeTimeoutObject(timeoutObj);
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (exchId.event() == EventType.EVT_NODE_FAILED || exchId.event() == EventType.EVT_NODE_LEFT)
+ if (exchId.isLeft()) {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts())
cacheCtx.config().getAffinity().removeNode(exchId.nodeId());
}
@@ -1018,39 +1159,39 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
return;
}
- ClusterNode curOldest = oldestNode.get();
+ if (log.isDebugEnabled())
+ log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']');
- if (!nodeId.equals(curOldest.id())) {
- if (log.isDebugEnabled())
- log.debug("Received full partition map from unexpected node [oldest=" + curOldest.id() +
- ", unexpectedNodeId=" + nodeId + ']');
+ assert exchId.topologyVersion().equals(msg.topologyVersion());
- ClusterNode sender = cctx.discovery().node(nodeId);
+ initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> t) {
+ ClusterNode curOldest = oldestNode.get();
- if (sender == null) {
- if (log.isDebugEnabled())
- log.debug("Sender node left grid, will ignore message from unexpected node [nodeId=" + nodeId +
- ", exchId=" + msg.exchangeId() + ']');
+ if (!nodeId.equals(curOldest.id())) {
+ if (log.isDebugEnabled())
+ log.debug("Received full partition map from unexpected node [oldest=" + curOldest.id() +
+ ", unexpectedNodeId=" + nodeId + ']');
- return;
- }
+ ClusterNode snd = cctx.discovery().node(nodeId);
- // Will process message later if sender node becomes oldest node.
- if (sender.order() > curOldest.order())
- fullMsgs.put(nodeId, msg);
+ if (snd == null) {
+ if (log.isDebugEnabled())
+ log.debug("Sender node left grid, will ignore message from unexpected node [nodeId=" + nodeId +
+ ", exchId=" + msg.exchangeId() + ']');
- return;
- }
+ return;
+ }
- assert msg.exchangeId().equals(exchId);
+ // Will process message later if sender node becomes oldest node.
+ if (snd.order() > curOldest.order())
+ fullMsgs.put(nodeId, msg);
- if (log.isDebugEnabled())
- log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']');
+ return;
+ }
- assert exchId.topologyVersion().equals(msg.topologyVersion());
+ assert msg.exchangeId().equals(exchId);
- initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
- @Override public void apply(IgniteInternalFuture<Boolean> t) {
assert msg.lastVersion() != null;
cctx.versions().onReceived(nodeId, msg.lastVersion());
@@ -1075,8 +1216,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (cacheCtx != null)
cacheCtx.topology().update(exchId, entry.getValue());
- else if (CU.oldest(cctx).isLocal())
- cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue());
+ else {
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+
+ if (oldest != null && oldest.isLocal())
+ cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue());
+ }
}
}
@@ -1135,40 +1280,42 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
boolean set = false;
- ClusterNode newOldest = CU.oldest(cctx, exchId.topologyVersion());
-
- // If local node is now oldest.
- if (newOldest.id().equals(cctx.localNodeId())) {
- synchronized (mux) {
- if (oldestNode.compareAndSet(oldest, newOldest)) {
- // If local node is just joining.
- if (exchId.nodeId().equals(cctx.localNodeId())) {
- try {
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal())
- cacheCtx.topology().beforeExchange(
- GridDhtPartitionsExchangeFuture.this);
+ ClusterNode newOldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
+
+ if (newOldest != null) {
+ // If local node is now oldest.
+ if (newOldest.id().equals(cctx.localNodeId())) {
+ synchronized (mux) {
+ if (oldestNode.compareAndSet(oldest, newOldest)) {
+ // If local node is just joining.
+ if (exchId.nodeId().equals(cctx.localNodeId())) {
+ try {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (!cacheCtx.isLocal())
+ cacheCtx.topology().beforeExchange(
+ GridDhtPartitionsExchangeFuture.this);
+ }
}
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ catch (IgniteCheckedException e) {
+ onDone(e);
- return;
+ return;
+ }
}
- }
- set = true;
+ set = true;
+ }
}
}
- }
- else {
- synchronized (mux) {
- set = oldestNode.compareAndSet(oldest, newOldest);
- }
+ else {
+ synchronized (mux) {
+ set = oldestNode.compareAndSet(oldest, newOldest);
+ }
- if (set && log.isDebugEnabled())
- log.debug("Reassigned oldest node [this=" + cctx.localNodeId() +
- ", old=" + oldest.id() + ", new=" + newOldest.id() + ']');
+ if (set && log.isDebugEnabled())
+ log.debug("Reassigned oldest node [this=" + cctx.localNodeId() +
+ ", old=" + oldest.id() + ", new=" + newOldest.id() + ']');
+ }
}
if (set) {
@@ -1190,9 +1337,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert rmtNodes != null;
- for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); )
+ for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) {
if (it.next().id().equals(nodeId))
it.remove();
+ }
if (allReceived() && ready.get() && replied.compareAndSet(false, true))
if (spreadPartitions())
@@ -1254,30 +1402,34 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
GridTimeoutObject timeoutObj = new GridTimeoutObjectAdapter(
cctx.gridConfig().getNetworkTimeout() * Math.max(1, cctx.gridConfig().getCacheConfiguration().length)) {
@Override public void onTimeout() {
- if (isDone())
- return;
-
- if (!enterBusy())
- return;
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ if (isDone())
+ return;
+
+ if (!enterBusy())
+ return;
+
+ try {
+ U.warn(log,
+ "Retrying preload partition exchange due to timeout [done=" + isDone() +
+ ", dummy=" + dummy + ", exchId=" + exchId + ", rcvdIds=" + F.id8s(rcvdIds) +
+ ", rmtIds=" + F.id8s(rmtIds) + ", remaining=" + F.id8s(remaining()) +
+ ", init=" + init + ", initFut=" + initFut.isDone() +
+ ", ready=" + ready + ", replied=" + replied + ", added=" + added +
+ ", oldest=" + U.id8(oldestNode.get().id()) + ", oldestOrder=" +
+ oldestNode.get().order() + ", evtLatch=" + evtLatch.getCount() +
+ ", locNodeOrder=" + cctx.localNode().order() +
+ ", locNodeId=" + cctx.localNode().id() + ']',
+ "Retrying preload partition exchange due to timeout.");
- try {
- U.warn(log,
- "Retrying preload partition exchange due to timeout [done=" + isDone() +
- ", dummy=" + dummy + ", exchId=" + exchId + ", rcvdIds=" + F.id8s(rcvdIds) +
- ", rmtIds=" + F.id8s(rmtIds) + ", remaining=" + F.id8s(remaining()) +
- ", init=" + init + ", initFut=" + initFut.isDone() +
- ", ready=" + ready + ", replied=" + replied + ", added=" + added +
- ", oldest=" + U.id8(oldestNode.get().id()) + ", oldestOrder=" +
- oldestNode.get().order() + ", evtLatch=" + evtLatch.getCount() +
- ", locNodeOrder=" + cctx.localNode().order() +
- ", locNodeId=" + cctx.localNode().id() + ']',
- "Retrying preload partition exchange due to timeout.");
-
- recheck();
- }
- finally {
- leaveBusy();
- }
+ recheck();
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+ });
}
};
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 8256274..73794ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -59,8 +59,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/**
* @param id Exchange ID.
* @param lastVer Last version.
+ * @param topVer Topology version.
*/
- public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer,
+ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id,
+ @Nullable GridCacheVersion lastVer,
@NotNull AffinityTopologyVersion topVer) {
super(id, lastVer);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 66140cd..713a80b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -45,6 +45,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** Serialized partitions. */
private byte[] partsBytes;
+ /** */
+ private boolean client;
+
/**
* Required by {@link Externalizable}.
*/
@@ -54,10 +57,22 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/**
* @param exchId Exchange ID.
+ * @param client Client message flag.
* @param lastVer Last version.
*/
- public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer) {
+ public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
+ boolean client,
+ @Nullable GridCacheVersion lastVer) {
super(exchId, lastVer);
+
+ this.client = client;
+ }
+
+ /**
+ * @return {@code True} if sent from client node.
+ */
+ public boolean client() {
+ return client;
}
/**
@@ -110,6 +125,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
switch (writer.state()) {
case 5:
+ if (!writer.writeBoolean("client", client))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
if (!writer.writeByteArray("partsBytes", partsBytes))
return false;
@@ -132,6 +153,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
switch (reader.state()) {
case 5:
+ client = reader.readBoolean("client");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
partsBytes = reader.readByteArray("partsBytes");
if (!reader.isLastRead())
@@ -151,7 +180,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 6;
+ return 7;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index d6373f0..51010ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
@@ -46,7 +47,7 @@ import static org.apache.ignite.internal.util.GridConcurrentFactory.*;
/**
* DHT cache preloader.
*/
-public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
+public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** Default preload resend timeout. */
public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500;
@@ -57,13 +58,13 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
private final GridAtomicLong topVer = new GridAtomicLong();
/** Force key futures. */
- private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<K, V>> forceKeyFuts = newMap();
+ private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap();
/** Partition suppliers. */
- private GridDhtPartitionSupplyPool<K, V> supplyPool;
+ private GridDhtPartitionSupplyPool supplyPool;
/** Partition demanders. */
- private GridDhtPartitionDemandPool<K, V> demandPool;
+ private GridDhtPartitionDemandPool demandPool;
/** Start future. */
private final GridFutureAdapter<Object> startFut;
@@ -92,7 +93,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
assert !loc.id().equals(n.id());
- for (GridDhtForceKeysFuture<K, V> f : forceKeyFuts.values())
+ for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values())
f.onDiscoveryEvent(e);
assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " +
@@ -117,7 +118,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
/**
* @param cctx Cache context.
*/
- public GridDhtPreloader(GridCacheContext<K, V> cctx) {
+ public GridDhtPreloader(GridCacheContext<?, ?> cctx) {
super(cctx);
top = cctx.dht().topology();
@@ -158,8 +159,8 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
}
});
- supplyPool = new GridDhtPartitionSupplyPool<>(cctx, busyLock);
- demandPool = new GridDhtPartitionDemandPool<>(cctx, busyLock);
+ supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock);
+ demandPool = new GridDhtPartitionDemandPool(cctx, busyLock);
cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
@@ -227,12 +228,14 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
final long start = U.currentTimeMillis();
- if (cctx.config().getRebalanceDelay() >= 0) {
- U.log(log, "Starting rebalancing in " + cctx.config().getRebalanceMode() + " mode: " + cctx.name());
+ final CacheConfiguration cfg = cctx.config();
+
+ if (cfg.getRebalanceDelay() >= 0) {
+ U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
demandPool.syncFuture().listen(new CI1<Object>() {
@Override public void apply(Object t) {
- U.log(log, "Completed rebalancing in " + cctx.config().getRebalanceMode() + " mode " +
+ U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " +
"[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
}
});
@@ -253,12 +256,12 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) {
+ @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
return demandPool.assign(exchFut);
}
/** {@inheritDoc} */
- @Override public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload) {
+ @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
demandPool.addAssignments(assignments, forcePreload);
}
@@ -271,7 +274,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> syncFuture() {
- return demandPool.syncFuture();
+ return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
}
/**
@@ -406,7 +409,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
return;
try {
- GridDhtForceKeysFuture<K, V> f = forceKeyFuts.get(msg.futureId());
+ GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId());
if (f != null)
f.onResult(node.id(), msg);
@@ -491,7 +494,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
*/
@SuppressWarnings( {"unchecked", "RedundantCast"})
@Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
- final GridDhtForceKeysFuture<K, V> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
+ final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer);
@@ -543,7 +546,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
*
* @param fut Future to add.
*/
- void addFuture(GridDhtForceKeysFuture<K, V> fut) {
+ void addFuture(GridDhtForceKeysFuture<?, ?> fut) {
forceKeyFuts.put(fut.futureId(), fut);
}
@@ -552,7 +555,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
*
* @param fut Future to remove.
*/
- void remoteFuture(GridDhtForceKeysFuture<K, V> fut) {
+ void remoteFuture(GridDhtForceKeysFuture<?, ?> fut) {
forceKeyFuts.remove(fut.futureId(), fut);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 369fc68..2f6ef6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -27,8 +27,7 @@ import java.util.concurrent.*;
/**
* Partition to node assignments.
*/
-public class GridDhtPreloaderAssignments<K, V> extends
- ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> {
+public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index ba3357d..041f83a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -433,6 +433,11 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
}
/** {@inheritDoc} */
+ @Nullable @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException {
+ return dht.tryPutIfAbsent(key, val);
+ }
+
+ /** {@inheritDoc} */
@Override public V getAndReplace(K key, V val) throws IgniteCheckedException {
return dht.getAndReplace(key, val);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 8258b14..351d6cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -95,7 +95,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
}
/** {@inheritDoc} */
- @Override public GridCachePreloader<K, V> preloader() {
+ @Override public GridCachePreloader preloader() {
return dht().preloader();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index fc178e3..74438bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -274,7 +274,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
if (affNodes.isEmpty()) {
assert !cctx.affinityNode();
- onDone(new ClusterTopologyCheckedException("Failed to map keys for near-only cache (all partition " +
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for near-only cache (all partition " +
"nodes left the grid)."));
return;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 0ffb4e5..3d28018 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -45,7 +45,7 @@ import static org.apache.ignite.events.EventType.*;
/**
* Cache lock future.
*/
-public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean>
implements GridCacheMvccFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
@@ -58,7 +58,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
/** Cache registry. */
@GridToStringExclude
- private GridCacheContext<K, V> cctx;
+ private GridCacheContext<?, ?> cctx;
/** Lock owner thread. */
@GridToStringInclude
@@ -135,7 +135,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
* @param skipStore skipStore
*/
public GridNearLockFuture(
- GridCacheContext<K, V> cctx,
+ GridCacheContext<?, ?> cctx,
Collection<KeyCacheObject> keys,
@Nullable GridNearTxLocal tx,
boolean read,
@@ -184,15 +184,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
* @return Participating nodes.
*/
@Override public Collection<? extends ClusterNode> nodes() {
- return
- F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
- if (isMini(f))
- return ((MiniFuture)f).node();
+ return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
+ @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
+ if (isMini(f))
+ return ((MiniFuture)f).node();
- return cctx.discovery().localNode();
- }
- });
+ return cctx.discovery().localNode();
+ }
+ });
}
/** {@inheritDoc} */
@@ -350,13 +349,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
* Undoes all locks.
*
* @param dist If {@code true}, then remove locks from remote nodes as well.
+ * @param rollback {@code True} if should rollback tx.
*/
- private void undoLocks(boolean dist) {
+ private void undoLocks(boolean dist, boolean rollback) {
// Transactions will undo during rollback.
if (dist && tx == null)
cctx.nearTx().removeLocks(lockVer, keys);
else {
- if (tx != null) {
+ if (rollback && tx != null) {
if (tx.setRollbackOnly()) {
if (log.isDebugEnabled())
log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx);
@@ -397,7 +397,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
* @param dist {@code True} if need to distribute lock release.
*/
private void onFailed(boolean dist) {
- undoLocks(dist);
+ undoLocks(dist, true);
complete(false);
}
@@ -607,7 +607,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
", fut=" + this + ']');
if (!success)
- undoLocks(distribute);
+ undoLocks(distribute, true);
if (tx != null)
cctx.tm().txContext(tx);
@@ -682,7 +682,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
// Continue mapping on the same topology version as it was before.
this.topVer.compareAndSet(null, topVer);
- map(keys);
+ map(keys, false);
markInitialized();
@@ -690,14 +690,16 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
}
// Must get topology snapshot and map on that version.
- mapOnTopology();
+ mapOnTopology(false);
}
/**
* Acquires topology future and checks it completeness under the read lock. If it is not complete,
* will asynchronously wait for it's completeness and then try again.
+ *
+ * @param remap Remap flag.
*/
- void mapOnTopology() {
+ void mapOnTopology(final boolean remap) {
// We must acquire topology snapshot from the topology version future.
cctx.topology().readLock();
@@ -721,19 +723,27 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
AffinityTopologyVersion topVer = fut.topologyVersion();
- if (tx != null)
- tx.topologyVersion(topVer);
+ if (remap) {
+ if (tx != null)
+ tx.onRemap(topVer);
- this.topVer.compareAndSet(null, topVer);
+ this.topVer.set(topVer);
+ }
+ else {
+ if (tx != null)
+ tx.topologyVersion(topVer);
+
+ this.topVer.compareAndSet(null, topVer);
+ }
- map(keys);
+ map(keys, remap);
markInitialized();
}
else {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- mapOnTopology();
+ mapOnTopology(remap);
}
});
}
@@ -749,14 +759,15 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
* groups belonging to one primary node and locks for these groups are acquired sequentially.
*
* @param keys Keys.
+ * @param remap Remap flag.
*/
- private void map(Iterable<KeyCacheObject> keys) {
+ private void map(Iterable<KeyCacheObject> keys, boolean remap) {
try {
AffinityTopologyVersion topVer = this.topVer.get();
assert topVer != null;
- assert topVer.topologyVersion() > 0;
+ assert topVer.topologyVersion() > 0 : topVer;
if (CU.affinityNodes(cctx, topVer).isEmpty()) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for near-only cache (all " +
@@ -765,8 +776,11 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
return;
}
- ConcurrentLinkedDeque8<GridNearLockMapping> mappings =
- new ConcurrentLinkedDeque8<>();
+ boolean clientNode = cctx.kernalContext().clientNode();
+
+ assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
+
+ ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
// Assign keys to primary nodes.
GridNearLockMapping map = null;
@@ -795,6 +809,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
if (log.isDebugEnabled())
log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
+ boolean first = true;
+
// Create mini futures.
for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
GridNearLockMapping mapping = iter.next();
@@ -872,6 +888,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
if (!cand.reentry()) {
if (req == null) {
+ boolean clientFirst = false;
+
+ if (first) {
+ clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks());
+
+ first = false;
+ }
+
req = new GridNearLockRequest(
cctx.cacheId(),
topVer,
@@ -893,7 +917,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
inTx() ? tx.subjectId() : null,
inTx() ? tx.taskNameHash() : 0,
read ? accessTtl : -1L,
- skipStore);
+ skipStore,
+ clientFirst);
mapping.request(req);
}
@@ -1197,7 +1222,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
/**
* @return DHT cache.
*/
- private GridDhtTransactionalCacheAdapter<K, V> dht() {
+ private GridDhtTransactionalCacheAdapter<?, ?> dht() {
return cctx.nearTx().dht();
}
@@ -1356,110 +1381,146 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
return;
}
- int i = 0;
+ if (res.clientRemapVersion() != null) {
+ assert cctx.kernalContext().clientNode();
- AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
+ IgniteInternalFuture<?> affFut =
+ cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
- for (KeyCacheObject k : keys) {
- while (true) {
- GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
+ if (affFut != null && !affFut.isDone()) {
+ affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ remap();
+ }
+ });
+ }
+ else
+ remap();
+ }
+ else {
+ int i = 0;
- try {
- if (res.dhtVersion(i) == null) {
- onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
- "(will fail the lock): " + res));
+ AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
- return;
- }
+ for (KeyCacheObject k : keys) {
+ while (true) {
+ GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
- IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
+ try {
+ if (res.dhtVersion(i) == null) {
+ onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+ "(will fail the lock): " + res));
- CacheObject oldVal = entry.rawGet();
- boolean hasOldVal = false;
- CacheObject newVal = res.value(i);
+ return;
+ }
- boolean readRecordable = false;
+ IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
- if (retval) {
- readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+ CacheObject oldVal = entry.rawGet();
+ boolean hasOldVal = false;
+ CacheObject newVal = res.value(i);
- if (readRecordable)
- hasOldVal = entry.hasValue();
- }
+ boolean readRecordable = false;
- GridCacheVersion dhtVer = res.dhtVersion(i);
- GridCacheVersion mappedVer = res.mappedVersion(i);
+ if (retval) {
+ readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+
+ if (readRecordable)
+ hasOldVal = entry.hasValue();
+ }
- if (newVal == null) {
- if (oldValTup != null) {
- if (oldValTup.get1().equals(dhtVer))
- newVal = oldValTup.get2();
+ GridCacheVersion dhtVer = res.dhtVersion(i);
+ GridCacheVersion mappedVer = res.mappedVersion(i);
- oldVal = oldValTup.get2();
+ if (newVal == null) {
+ if (oldValTup != null) {
+ if (oldValTup.get1().equals(dhtVer))
+ newVal = oldValTup.get2();
+
+ oldVal = oldValTup.get2();
+ }
}
- }
- // Lock is held at this point, so we can set the
- // returned value if any.
- entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
+ // Lock is held at this point, so we can set the
+ // returned value if any.
+ entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
- if (inTx() && implicitTx() && tx.onePhaseCommit()) {
- boolean pass = res.filterResult(i);
+ if (inTx()) {
+ tx.hasRemoteLocks(true);
- tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
- }
+ if (implicitTx() && tx.onePhaseCommit()) {
+ boolean pass = res.filterResult(i);
- entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(),
- res.pending());
-
- if (retval) {
- if (readRecordable)
- cctx.events().addEvent(
- entry.partition(),
- entry.key(),
- tx,
- null,
- EVT_CACHE_OBJECT_READ,
- newVal,
- newVal != null,
- oldVal,
- hasOldVal,
- CU.subjectId(tx, cctx.shared()),
- null,
- inTx() ? tx.resolveTaskName() : null);
-
- if (cctx.cache().configuration().isStatisticsEnabled())
- cctx.cache().metrics0().onRead(false);
- }
+ tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
+ }
+ }
- if (log.isDebugEnabled())
- log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+ entry.readyNearLock(lockVer,
+ mappedVer,
+ res.committedVersions(),
+ res.rolledbackVersions(),
+ res.pending());
+
+ if (retval) {
+ if (readRecordable)
+ cctx.events().addEvent(
+ entry.partition(),
+ entry.key(),
+ tx,
+ null,
+ EVT_CACHE_OBJECT_READ,
+ newVal,
+ newVal != null,
+ oldVal,
+ hasOldVal,
+ CU.subjectId(tx, cctx.shared()),
+ null,
+ inTx() ? tx.resolveTaskName() : null);
+
+ if (cctx.cache().configuration().isStatisticsEnabled())
+ cctx.cache().metrics0().onRead(false);
+ }
- break; // Inner while loop.
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to add candidates because entry was removed (will renew).");
+ if (log.isDebugEnabled())
+ log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
- // Replace old entry with new one.
- entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+ break; // Inner while loop.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to add candidates because entry was removed (will renew).");
+
+ // Replace old entry with new one.
+ entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+ }
}
+
+ i++;
}
- i++;
- }
+ try {
+ proceedMapping(mappings);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
- try {
- proceedMapping(mappings);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ onDone(true);
}
-
- onDone(true);
}
}
+ /**
+ *
+ */
+ private void remap() {
+ undoLocks(false, false);
+
+ mapOnTopology(true);
+
+ onDone(true);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString());