You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/11 08:29:35 UTC
[2/2] ignite git commit: IGNITE-7871 Implemented additional
synchronization phase for correct partition counters update
IGNITE-7871 Implemented additional synchronization phase for correct partition counters update
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/da77b981
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/da77b981
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/da77b981
Branch: refs/heads/master
Commit: da77b9818a70495b7afdf6899ebd9180dadd7f68
Parents: f4de6df
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Wed Apr 11 11:23:46 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Apr 11 11:23:46 2018 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/GridTopic.java | 5 +-
.../communication/GridIoMessageFactory.java | 6 +
.../discovery/GridDiscoveryManager.java | 10 +
.../MetaPageUpdatePartitionDataRecord.java | 2 +-
.../processors/cache/CacheMetricsImpl.java | 2 +-
.../processors/cache/GridCacheMvccManager.java | 38 +
.../GridCachePartitionExchangeManager.java | 17 +
.../cache/GridCacheSharedContext.java | 9 +-
.../processors/cache/GridCacheUtils.java | 2 +-
.../cache/IgniteCacheOffheapManager.java | 8 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 10 +-
.../dht/GridClientPartitionTopology.java | 5 +
.../distributed/dht/GridDhtLocalPartition.java | 9 +-
.../dht/GridDhtPartitionTopology.java | 6 +
.../dht/GridDhtPartitionTopologyImpl.java | 26 +-
.../dht/GridDhtPartitionsStateValidator.java | 255 +++++++
.../cache/distributed/dht/GridDhtTxLocal.java | 5 +
.../GridDhtPartitionsExchangeFuture.java | 96 ++-
.../GridDhtPartitionsSingleMessage.java | 68 +-
.../dht/preloader/InitNewCoordinatorFuture.java | 2 +-
.../preloader/latch/ExchangeLatchManager.java | 695 +++++++++++++++++++
.../distributed/dht/preloader/latch/Latch.java | 52 ++
.../dht/preloader/latch/LatchAckMessage.java | 165 +++++
.../cache/distributed/near/GridNearTxLocal.java | 10 +
.../persistence/GridCacheOffheapManager.java | 10 +-
.../cache/transactions/IgniteTxAdapter.java | 2 +-
.../cache/transactions/IgniteTxManager.java | 36 +-
...cheDhtLocalPartitionAfterRemoveSelfTest.java | 2 +-
.../processors/cache/IgniteCacheGroupsTest.java | 1 +
...ExchangeLatchManagerCoordinatorFailTest.java | 244 +++++++
.../GridCachePartitionsStateValidationTest.java | 316 +++++++++
...idCachePartitionsStateValidatorSelfTest.java | 158 +++++
.../TxOptimisticOnPartitionExchangeTest.java | 322 +++++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 4 +
.../testsuites/IgniteCacheTestSuite6.java | 6 +
35 files changed, 2568 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 1227e8c..0b2d41a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -124,7 +124,10 @@ public enum GridTopic {
TOPIC_METRICS,
/** */
- TOPIC_AUTH;
+ TOPIC_AUTH,
+
+ /** */
+ TOPIC_EXCHANGE;
/** Enum values. */
private static final GridTopic[] VALS = values();
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5616fd0..581c32e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.WalStateAckMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
@@ -921,6 +922,11 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case 135:
+ msg = new LatchAckMessage();
+
+ break;
+
// [-3..119] [124..129] [-23..-27] [-36..-55]- this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index a1d84e5..400bb5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -793,6 +793,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
((IgniteKernal)ctx.grid()).onDisconnected();
+ if (!locJoin.isDone())
+ locJoin.onDone(new IgniteCheckedException("Node disconnected"));
+
locJoin = new GridFutureAdapter<>();
registeredCaches.clear();
@@ -2142,6 +2145,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * @return Local join future.
+ */
+ public GridFutureAdapter<DiscoveryLocalJoinData> localJoinFuture() {
+ return locJoin;
+ }
+
+ /**
* @param msg Custom message.
* @throws IgniteCheckedException If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
index bafbf47..e5bd343 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
@@ -32,7 +32,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord {
/** */
private long globalRmvId;
- /** */
+ /** TODO: Partition size may be long */
private int partSize;
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 6fae8fe..b402ff2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -792,7 +792,7 @@ public class CacheMetricsImpl implements CacheMetrics {
if (cctx.cache() == null)
continue;
- int cacheSize = part.dataStore().cacheSize(cctx.cacheId());
+ long cacheSize = part.dataStore().cacheSize(cctx.cacheId());
offHeapEntriesCnt += cacheSize;
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index a9fa3c7..fade833 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -44,6 +44,8 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -314,6 +316,42 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * Creates a future that will wait for finishing all remote transactions (primary -> backup)
+ * with topology version less or equal to {@code topVer}.
+ *
+ * @param topVer Topology version.
+ * @return Compound future of all {@link GridDhtTxFinishFuture} futures.
+ */
+ public IgniteInternalFuture<?> finishRemoteTxs(AffinityTopologyVersion topVer) {
+ GridCompoundFuture<?, ?> res = new CacheObjectsReleaseFuture<>("RemoteTx", topVer);
+
+ for (GridCacheFuture<?> fut : futs.values()) {
+ if (fut instanceof GridDhtTxFinishFuture) {
+ GridDhtTxFinishFuture finishTxFuture = (GridDhtTxFinishFuture) fut;
+
+ if (cctx.tm().needWaitTransaction(finishTxFuture.tx(), topVer))
+ res.add(ignoreErrors(finishTxFuture));
+ }
+ }
+
+ res.markInitialized();
+
+ return res;
+ }
+
+ /**
+ * Future wrapper which ignores any underlying future errors.
+ *
+ * @param f Underlying future.
+ * @return Future wrapper which ignore any underlying future errors.
+ */
+ private IgniteInternalFuture ignoreErrors(IgniteInternalFuture<?> f) {
+ GridFutureAdapter<?> wrapper = new GridFutureAdapter();
+ f.listen(future -> wrapper.onDone());
+ return wrapper;
+ }
+
+ /**
* @param leftNodeId Left node ID.
* @param topVer Topology version.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1a0e65f..20a3ccb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -216,6 +217,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** For tests only. */
private volatile AffinityTopologyVersion exchMergeTestWaitVer;
+ /** Distributed latch manager. */
+ private ExchangeLatchManager latchMgr;
+
/** Discovery listener. */
private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
@Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -309,6 +313,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchWorker = new ExchangeWorker();
+ latchMgr = new ExchangeLatchManager(cctx.kernalContext());
+
cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
EVT_DISCOVERY_CUSTOM_EVT);
@@ -1255,6 +1261,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
m.addPartitionUpdateCounters(grp.groupId(),
newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+
+ m.addPartitionSizes(grp.groupId(), grp.topology().partitionSizes());
}
}
}
@@ -1277,6 +1285,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
m.addPartitionUpdateCounters(top.groupId(),
newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+
+ m.addPartitionSizes(top.groupId(), top.partitionSizes());
}
}
@@ -1570,6 +1580,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * @return Latch manager instance.
+ */
+ public ExchangeLatchManager latch() {
+ return latchMgr;
+ }
+
+ /**
* @param exchFut Optional current exchange future.
* @throws Exception If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index c2f9229..b3b4f0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -26,7 +26,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
-import java.util.function.BiFunction;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
@@ -711,7 +710,7 @@ public class GridCacheSharedContext<K, V> {
/**
* @return Ttl cleanup manager.
- * */
+ */
public GridCacheSharedTtlCleanupManager ttl() {
return ttlMgr;
}
@@ -854,10 +853,14 @@ public class GridCacheSharedContext<K, V> {
GridCompoundFuture f = new CacheObjectsReleaseFuture("Partition", topVer);
f.add(mvcc().finishExplicitLocks(topVer));
- f.add(tm().finishTxs(topVer));
f.add(mvcc().finishAtomicUpdates(topVer));
f.add(mvcc().finishDataStreamerUpdates(topVer));
+ IgniteInternalFuture<?> finishLocalTxsFuture = tm().finishLocalTxs(topVer);
+ // To properly track progress of finishing local tx updates we explicitly add this future to compound set.
+ f.add(finishLocalTxsFuture);
+ f.add(tm().finishAllTxs(finishLocalTxsFuture, topVer));
+
f.markInitialized();
return f;
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index a5169d2..d672420 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1732,7 +1732,7 @@ public class GridCacheUtils {
ver,
expiryPlc == null ? 0 : expiryPlc.forCreate(),
expiryPlc == null ? 0 : toExpireTime(expiryPlc.forCreate()),
- false,
+ true,
topVer,
GridDrType.DR_BACKUP,
true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 3d83f87..a12c033 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -22,11 +22,11 @@ import javax.cache.Cache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
import org.apache.ignite.internal.processors.cache.persistence.RowStore;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
import org.apache.ignite.internal.util.GridAtomicLong;
@@ -344,7 +344,7 @@ public interface IgniteCacheOffheapManager {
* @param part Partition.
* @return Number of entries.
*/
- public int totalPartitionEntriesCount(int part);
+ public long totalPartitionEntriesCount(int part);
/**
*
@@ -381,7 +381,7 @@ public interface IgniteCacheOffheapManager {
* @param cacheId Cache ID.
* @return Size.
*/
- int cacheSize(int cacheId);
+ long cacheSize(int cacheId);
/**
* @return Cache sizes if store belongs to group containing multiple caches.
@@ -391,7 +391,7 @@ public interface IgniteCacheOffheapManager {
/**
* @return Total size.
*/
- int fullSize();
+ long fullSize();
/**
* @return Update counter.
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index b201935..f8cc86f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -252,7 +252,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public int totalPartitionEntriesCount(int p) {
+ @Override public long totalPartitionEntriesCount(int p) {
if (grp.isLocal())
return locCacheDataStore.fullSize();
else {
@@ -1152,14 +1152,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public int cacheSize(int cacheId) {
+ @Override public long cacheSize(int cacheId) {
if (grp.sharedGroup()) {
AtomicLong size = cacheSizes.get(cacheId);
return size != null ? (int)size.get() : 0;
}
- return (int)storageSize.get();
+ return storageSize.get();
}
/** {@inheritDoc} */
@@ -1176,8 +1176,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public int fullSize() {
- return (int)storageSize.get();
+ @Override public long fullSize() {
+ return storageSize.get();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5bbbb31..3e3bb0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -1196,6 +1196,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public Map<Integer, Long> partitionSizes() {
+ return Collections.emptyMap();
+ }
+
+ /** {@inheritDoc} */
@Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
assert false : "Should not be called on non-affinity node";
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 7a47f31..ea20dbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -929,7 +929,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
/**
* @return Initial update counter.
*/
- public Long initialUpdateCounter() {
+ public long initialUpdateCounter() {
return store.initialUpdateCounter();
}
@@ -948,6 +948,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
/**
+ * @return Total size of all caches.
+ */
+ public long fullSize() {
+ return store.fullSize();
+ }
+
+ /**
* Removes all entries and rows from this partition.
*
* @return Number of rows cleared from page memory.
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 7f900cb..6f68dbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
@@ -345,6 +346,11 @@ public interface GridDhtPartitionTopology {
public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros);
/**
+ * @return Partition cache sizes.
+ */
+ public Map<Integer, Long> partitionSizes();
+
+ /**
* @param part Partition to own.
* @return {@code True} if owned.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 538c57e..740903e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -31,6 +31,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -2526,6 +2528,28 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public Map<Integer, Long> partitionSizes() {
+ lock.readLock().lock();
+
+ try {
+ Map<Integer, Long> partitionSizes = new HashMap<>();
+
+ for (int p = 0; p < locParts.length(); p++) {
+ GridDhtLocalPartition part = locParts.get(p);
+ if (part == null || part.fullSize() == 0)
+ continue;
+
+ partitionSizes.put(part.id(), part.fullSize());
+ }
+
+ return partitionSizes;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
AffinityTopologyVersion curTopVer = this.readyTopVer;
@@ -2587,7 +2611,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (part == null)
continue;
- int size = part.dataStore().fullSize();
+ long size = part.dataStore().fullSize();
if (size >= threshold)
X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
new file mode 100644
index 0000000..92a0584
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.lang.IgniteProductVersion;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+
+/**
+ * Class to validate partitions update counters and cache sizes during exchange process.
+ */
+public class GridDhtPartitionsStateValidator {
+ /** Version since node is able to send cache sizes in {@link GridDhtPartitionsSingleMessage}. */
+ private static final IgniteProductVersion SIZES_VALIDATION_AVAILABLE_SINCE = IgniteProductVersion.fromString("2.5.0");
+
+ /** Cache shared context. */
+ private final GridCacheSharedContext<?, ?> cctx;
+
+ /**
+ * Constructor.
+ *
+ * @param cctx Cache shared context.
+ */
+ public GridDhtPartitionsStateValidator(GridCacheSharedContext<?, ?> cctx) {
+ this.cctx = cctx;
+ }
+
+ /**
+ * Validates partition states - update counters and cache sizes for all nodes.
+ * If update counter value or cache size for the same partitions are different on some nodes
+ * method throws exception with full information about inconsistent partitions.
+ *
+ * @param fut Current exchange future.
+ * @param top Topology to validate.
+ * @param messages Single messages received from all nodes.
+ * @throws IgniteCheckedException If validation failed. Exception message contains
+ * full information about all partitions which update counters or cache sizes are not consistent.
+ */
+ public void validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture fut,
+ GridDhtPartitionTopology top,
+ Map<UUID, GridDhtPartitionsSingleMessage> messages) throws IgniteCheckedException {
+ // Ignore just joined nodes.
+ final Set<UUID> ignoringNodes = new HashSet<>();
+
+ for (DiscoveryEvent evt : fut.events().events())
+ if (evt.type() == EVT_NODE_JOINED)
+ ignoringNodes.add(evt.eventNode().id());
+
+ AffinityTopologyVersion topVer = fut.context().events().topologyVersion();
+
+ // Validate update counters.
+ Map<Integer, Map<UUID, Long>> result = validatePartitionsUpdateCounters(top, messages, ignoringNodes);
+ if (!result.isEmpty())
+ throw new IgniteCheckedException("Partitions update counters are inconsistent for " + fold(topVer, result));
+
+ // For sizes validation ignore also nodes which are not able to send cache sizes.
+ for (UUID id : messages.keySet()) {
+ ClusterNode node = cctx.discovery().node(id);
+ if (node != null && node.version().compareTo(SIZES_VALIDATION_AVAILABLE_SINCE) < 0)
+ ignoringNodes.add(id);
+ }
+
+ // Validate cache sizes.
+ result = validatePartitionsSizes(top, messages, ignoringNodes);
+ if (!result.isEmpty())
+ throw new IgniteCheckedException("Partitions cache sizes are inconsistent for " + fold(topVer, result));
+ }
+
+ /**
+ * Validate partitions update counters for given {@code top}.
+ *
+ * @param top Topology to validate.
+ * @param messages Single messages received from all nodes.
+ * @param ignoringNodes Nodes for what we ignore validation.
+ * @return Invalid partitions map with following structure: (partId, (nodeId, updateCounter)).
+ * If map is empty validation is successful.
+ */
+ Map<Integer, Map<UUID, Long>> validatePartitionsUpdateCounters(
+ GridDhtPartitionTopology top,
+ Map<UUID, GridDhtPartitionsSingleMessage> messages,
+ Set<UUID> ignoringNodes) {
+ Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>();
+
+ Map<Integer, T2<UUID, Long>> updateCountersAndNodesByPartitions = new HashMap<>();
+
+ // Populate counters statistics from local node partitions.
+ for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
+ if (top.partitionState(cctx.localNodeId(), part.id()) != GridDhtPartitionState.OWNING)
+ continue;
+
+ updateCountersAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.updateCounter()));
+ }
+
+ int partitions = top.partitions();
+
+ // Then process and validate counters from other nodes.
+ for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : messages.entrySet()) {
+ UUID nodeId = e.getKey();
+ if (ignoringNodes.contains(nodeId))
+ continue;
+
+ CachePartitionPartialCountersMap countersMap = e.getValue().partitionUpdateCounters(top.groupId(), partitions);
+
+ for (int part = 0; part < partitions; part++) {
+ if (top.partitionState(nodeId, part) != GridDhtPartitionState.OWNING)
+ continue;
+
+ int partIdx = countersMap.partitionIndex(part);
+ long currentCounter = partIdx >= 0 ? countersMap.updateCounterAt(partIdx) : 0;
+
+ process(invalidPartitions, updateCountersAndNodesByPartitions, part, nodeId, currentCounter);
+ }
+ }
+
+ return invalidPartitions;
+ }
+
+ /**
+ * Validate partitions cache sizes for given {@code top}.
+ *
+ * @param top Topology to validate.
+ * @param messages Single messages received from all nodes.
+ * @param ignoringNodes Nodes for what we ignore validation.
+ * @return Invalid partitions map with following structure: (partId, (nodeId, cacheSize)).
+ * If map is empty validation is successful.
+ */
+ Map<Integer, Map<UUID, Long>> validatePartitionsSizes(
+ GridDhtPartitionTopology top,
+ Map<UUID, GridDhtPartitionsSingleMessage> messages,
+ Set<UUID> ignoringNodes) {
+ Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>();
+
+ Map<Integer, T2<UUID, Long>> sizesAndNodesByPartitions = new HashMap<>();
+
+ // Populate sizes statistics from local node partitions.
+ for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
+ if (top.partitionState(cctx.localNodeId(), part.id()) != GridDhtPartitionState.OWNING)
+ continue;
+
+ sizesAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.fullSize()));
+ }
+
+ int partitions = top.partitions();
+
+ // Then process and validate sizes from other nodes.
+ for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : messages.entrySet()) {
+ UUID nodeId = e.getKey();
+ if (ignoringNodes.contains(nodeId))
+ continue;
+
+ Map<Integer, Long> sizesMap = e.getValue().partitionSizes(top.groupId());
+
+ for (int part = 0; part < partitions; part++) {
+ if (top.partitionState(nodeId, part) != GridDhtPartitionState.OWNING)
+ continue;
+
+ long currentSize = sizesMap.containsKey(part) ? sizesMap.get(part) : 0L;
+
+ process(invalidPartitions, sizesAndNodesByPartitions, part, nodeId, currentSize);
+ }
+ }
+
+ return invalidPartitions;
+ }
+
+ /**
+ * Processes given {@code counter} for partition {@code part} reported by {@code node}.
+ * Populates {@code invalidPartitions} map if existing counter and current {@code counter} are different.
+ *
+ * @param invalidPartitions Invalid partitions map.
+ * @param countersAndNodes Current map of counters and nodes by partitions.
+ * @param part Processing partition.
+ * @param node Node id.
+ * @param counter Counter value reported by {@code node}.
+ */
+ private void process(Map<Integer, Map<UUID, Long>> invalidPartitions,
+ Map<Integer, T2<UUID, Long>> countersAndNodes,
+ int part,
+ UUID node,
+ long counter) {
+ T2<UUID, Long> existingData = countersAndNodes.get(part);
+
+ if (existingData == null)
+ countersAndNodes.put(part, new T2<>(node, counter));
+
+ if (existingData != null && counter != existingData.get2()) {
+ if (!invalidPartitions.containsKey(part)) {
+ Map<UUID, Long> map = new HashMap<>();
+ map.put(existingData.get1(), existingData.get2());
+ invalidPartitions.put(part, map);
+ }
+
+ invalidPartitions.get(part).put(node, counter);
+ }
+ }
+
+ /**
+ * Folds given map of invalid partition states to string representation in the following format:
+ * Part [id]: [consistentId=value*]
+ *
+ * Value can be both update counter or cache size.
+ *
+ * @param topVer Last topology version.
+ * @param invalidPartitions Invalid partitions map.
+ * @return String representation of invalid partitions.
+ */
+ private String fold(AffinityTopologyVersion topVer, Map<Integer, Map<UUID, Long>> invalidPartitions) {
+ SB sb = new SB();
+
+ NavigableMap<Integer, Map<UUID, Long>> sortedPartitions = new TreeMap<>(invalidPartitions);
+
+ for (Map.Entry<Integer, Map<UUID, Long>> p : sortedPartitions.entrySet()) {
+ sb.a("Part ").a(p.getKey()).a(": [");
+ for (Map.Entry<UUID, Long> e : p.getValue().entrySet()) {
+ Object consistentId = cctx.discovery().node(topVer, e.getKey()).consistentId();
+ sb.a(consistentId).a("=").a(e.getValue()).a(" ");
+ }
+ sb.a("] ");
+ }
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 28cc018..0609f04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -447,6 +447,11 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
err = e;
}
+ catch (Throwable t) {
+ fut.onDone(t);
+
+ throw t;
+ }
if (primarySync)
sendFinishReply(err);
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index cbb4985..dd4a571 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -41,6 +41,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
@@ -75,10 +76,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.LocalJoinCachesContext;
import org.apache.ignite.internal.processors.cache.StateChangeRequest;
import org.apache.ignite.internal.processors.cache.WalStateAbstractMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsStateValidator;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -290,6 +293,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
@GridToStringExclude
private GridDhtPartitionsExchangeFuture mergedWith;
+ /** Validator for partition states. */
+ @GridToStringExclude
+ private final GridDhtPartitionsStateValidator validator;
+
/**
* @param cctx Cache context.
* @param busyLock Busy lock.
@@ -314,6 +321,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
this.exchId = exchId;
this.exchActions = exchActions;
this.affChangeMsg = affChangeMsg;
+ this.validator = new GridDhtPartitionsStateValidator(cctx);
log = cctx.logger(getClass());
exchLog = cctx.logger(EXCHANGE_LOG);
@@ -1099,7 +1107,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
// To correctly rebalance when persistence is enabled, it is necessary to reserve history within exchange.
partHistReserved = cctx.database().reserveHistoryForExchange();
- waitPartitionRelease();
+ // On first phase we wait for finishing all local tx updates, atomic updates and lock releases.
+ waitPartitionRelease(1);
+
+ // Second phase is needed to wait for finishing all tx updates from primary to backup nodes remaining after first phase.
+ waitPartitionRelease(2);
boolean topChanged = firstDiscoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
@@ -1202,9 +1214,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* For the exact list of the objects being awaited for see
* {@link GridCacheSharedContext#partitionReleaseFuture(AffinityTopologyVersion)} javadoc.
*
+ * @param phase Phase of partition release.
+ *
* @throws IgniteCheckedException If failed.
*/
- private void waitPartitionRelease() throws IgniteCheckedException {
+ private void waitPartitionRelease(int phase) throws IgniteCheckedException {
+ Latch releaseLatch = null;
+
+ // Wait for other nodes only on first phase.
+ if (phase == 1)
+ releaseLatch = cctx.exchange().latch().getOrCreate("exchange", initialVersion());
+
IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(initialVersion());
// Assign to class variable so it will be included into toString() method.
@@ -1238,6 +1258,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, futTimeout);
}
}
+ catch (IgniteCheckedException e) {
+ U.warn(log,"Unable to await partitions release future", e);
+
+ throw e;
+ }
}
long waitEnd = U.currentTimeMillis();
@@ -1290,6 +1315,35 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
}
+
+ if (releaseLatch == null)
+ return;
+
+ releaseLatch.countDown();
+
+ if (!localJoinExchange()) {
+ try {
+ while (true) {
+ try {
+ releaseLatch.await(futTimeout, TimeUnit.MILLISECONDS);
+
+ if (log.isInfoEnabled())
+ log.info("Finished waiting for partitions release latch: " + releaseLatch);
+
+ break;
+ }
+ catch (IgniteFutureTimeoutCheckedException ignored) {
+ U.warn(log, "Unable to await partitions release latch within timeout: " + releaseLatch);
+
+ // Try to resend ack.
+ releaseLatch.countDown();
+ }
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.warn(log, "Stop waiting for partitions release latch: " + e.getMessage());
+ }
+ }
}
/**
@@ -2499,6 +2553,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
+ validatePartitionsState();
+
if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
assert firstDiscoEvt instanceof DiscoveryCustomEvent;
@@ -2683,6 +2739,42 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * Validates that partition update counters and cache sizes for all caches are consistent.
+ */
+ private void validatePartitionsState() {
+ for (Map.Entry<Integer, CacheGroupDescriptor> e : cctx.affinity().cacheGroups().entrySet()) {
+ CacheGroupDescriptor grpDesc = e.getValue();
+ if (grpDesc.config().getCacheMode() == CacheMode.LOCAL)
+ continue;
+
+ int grpId = e.getKey();
+
+ CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpId);
+
+ GridDhtPartitionTopology top = grpCtx != null ?
+ grpCtx.topology() :
+ cctx.exchange().clientTopology(grpId, events().discoveryCache());
+
+ // Do not validate read or write through caches or caches with disabled rebalance.
+ if (grpCtx == null
+ || grpCtx.config().isReadThrough()
+ || grpCtx.config().isWriteThrough()
+ || grpCtx.config().getCacheStoreFactory() != null
+ || grpCtx.config().getRebalanceDelay() != -1
+ || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE)
+ continue;
+
+ try {
+ validator.validatePartitionCountersAndSizes(this, top, msgs);
+ }
+ catch (IgniteCheckedException ex) {
+ log.warning("Partition states validation was failed for cache " + grpDesc.cacheOrGroupName(), ex);
+ // TODO: Handle such errors https://issues.apache.org/jira/browse/IGNITE-7833
+ }
+ }
+ }
+
+ /**
*
*/
private void assignPartitionsStates() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 215152d..6ebafac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import java.util.Collection;
import java.io.Externalizable;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -67,6 +67,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** Serialized partitions counters. */
private byte[] partCntrsBytes;
+ /** Partitions sizes. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Map<Integer, Map<Integer, Long>> partSizes;
+
+ /** Serialized partitions counters. */
+ private byte[] partSizesBytes;
+
/** Partitions history reservation counters. */
@GridToStringInclude
@GridDirectTransient
@@ -220,6 +228,35 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
}
/**
+ * Adds partition sizes map for specified {@code grpId} to the current message.
+ *
+ * @param grpId Group id.
+ * @param partSizesMap Partition sizes map.
+ */
+ public void addPartitionSizes(int grpId, Map<Integer, Long> partSizesMap) {
+ if (partSizesMap.isEmpty())
+ return;
+
+ if (partSizes == null)
+ partSizes = new HashMap<>();
+
+ partSizes.put(grpId, partSizesMap);
+ }
+
+ /**
+ * Returns partition sizes map for specified {@code grpId}.
+ *
+ * @param grpId Group id.
+ * @return Partition sizes map (partId, partSize).
+ */
+ public Map<Integer, Long> partitionSizes(int grpId) {
+ if (partSizes == null)
+ return Collections.emptyMap();
+
+ return partSizes.getOrDefault(grpId, Collections.emptyMap());
+ }
+
+ /**
* @param grpId Cache group ID.
* @param cntrMap Partition history counters.
*/
@@ -287,12 +324,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
boolean marshal = (parts != null && partsBytes == null) ||
(partCntrs != null && partCntrsBytes == null) ||
(partHistCntrs != null && partHistCntrsBytes == null) ||
+ (partSizes != null && partSizesBytes == null) ||
(err != null && errBytes == null);
if (marshal) {
byte[] partsBytes0 = null;
byte[] partCntrsBytes0 = null;
byte[] partHistCntrsBytes0 = null;
+ byte[] partSizesBytes0 = null;
byte[] errBytes0 = null;
if (parts != null && partsBytes == null)
@@ -304,6 +343,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
if (partHistCntrs != null && partHistCntrsBytes == null)
partHistCntrsBytes0 = U.marshal(ctx, partHistCntrs);
+ if (partSizes != null && partSizesBytes == null)
+ partSizesBytes0 = U.marshal(ctx, partSizes);
+
if (err != null && errBytes == null)
errBytes0 = U.marshal(ctx, err);
@@ -314,11 +356,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
byte[] partsBytesZip = U.zip(partsBytes0);
byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
byte[] partHistCntrsBytesZip = U.zip(partHistCntrsBytes0);
+ byte[] partSizesBytesZip = U.zip(partSizesBytes0);
byte[] exBytesZip = U.zip(errBytes0);
partsBytes0 = partsBytesZip;
partCntrsBytes0 = partCntrsBytesZip;
partHistCntrsBytes0 = partHistCntrsBytesZip;
+ partSizesBytes0 = partSizesBytesZip;
errBytes0 = exBytesZip;
compressed(true);
@@ -331,6 +375,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
partsBytes = partsBytes0;
partCntrsBytes = partCntrsBytes0;
partHistCntrsBytes = partHistCntrsBytes0;
+ partSizesBytes = partSizesBytes0;
errBytes = errBytes0;
}
}
@@ -360,6 +405,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
partHistCntrs = U.unmarshal(ctx, partHistCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
}
+ if (partSizesBytes != null && partSizes == null) {
+ if (compressed())
+ partSizes = U.unmarshalZip(ctx.marshaller(), partSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ else
+ partSizes = U.unmarshal(ctx, partSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
+
if (errBytes != null && err == null) {
if (compressed())
err = U.unmarshalZip(ctx.marshaller(), errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
@@ -451,6 +503,11 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
writer.incrementState();
+ case 13:
+ if (!writer.writeByteArray("partsSizesBytes", partSizesBytes))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -531,6 +588,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
+ case 13:
+ partSizesBytes = reader.readByteArray("partsSizesBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return reader.afterMessageRead(GridDhtPartitionsSingleMessage.class);
@@ -543,7 +607,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 13;
+ return 14;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
index 596fa8c..42a9ba6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -235,7 +235,7 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
if (awaited.remove(node.id())) {
GridDhtPartitionsFullMessage fullMsg0 = msg.finishMessage();
- if (fullMsg0 != null) {
+ if (fullMsg0 != null && fullMsg0.resultTopologyVersion() != null) {
assert fullMsg == null || fullMsg.resultTopologyVersion().equals(fullMsg0.resultTopologyVersion());
fullMsg = fullMsg0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
new file mode 100644
index 0000000..c205cb1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+
+/**
+ * Class is responsible to create and manage instances of distributed latches {@link Latch}.
+ */
+public class ExchangeLatchManager {
+ /** Version since latch management is available. */
+ private static final IgniteProductVersion VERSION_SINCE = IgniteProductVersion.fromString("2.5.0");
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Context. */
+ private final GridKernalContext ctx;
+
+ /** Discovery manager. */
+ private final GridDiscoveryManager discovery;
+
+ /** IO manager. */
+ private final GridIoManager io;
+
+ /** Current coordinator. */
+ private volatile ClusterNode coordinator;
+
+ /** Pending acks collection. */
+ private final ConcurrentMap<T2<String, AffinityTopologyVersion>, Set<UUID>> pendingAcks = new ConcurrentHashMap<>();
+
+ /** Server latches collection. */
+ private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ServerLatch> serverLatches = new ConcurrentHashMap<>();
+
+ /** Client latches collection. */
+ private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ClientLatch> clientLatches = new ConcurrentHashMap<>();
+
+ /** Lock. */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Kernal context.
+ */
+ public ExchangeLatchManager(GridKernalContext ctx) {
+ this.ctx = ctx;
+ this.log = ctx.log(getClass());
+ this.discovery = ctx.discovery();
+ this.io = ctx.io();
+
+ if (!ctx.clientNode()) {
+ ctx.io().addMessageListener(GridTopic.TOPIC_EXCHANGE, (nodeId, msg, plc) -> {
+ if (msg instanceof LatchAckMessage) {
+ processAck(nodeId, (LatchAckMessage) msg);
+ }
+ });
+
+ // First coordinator initialization.
+ ctx.discovery().localJoinFuture().listen(f -> {
+ this.coordinator = getLatchCoordinator(AffinityTopologyVersion.NONE);
+ });
+
+ ctx.event().addDiscoveryEventListener((e, cache) -> {
+ assert e != null;
+ assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : this;
+
+ // Do not process from discovery thread.
+ ctx.closure().runLocalSafe(() -> processNodeLeft(e.eventNode()));
+ }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ }
+ }
+
+ /**
+ * Creates server latch with given {@code id} and {@code topVer}.
+ * Adds corresponding pending acks to it.
+ *
+ * @param id Latch id.
+ * @param topVer Latch topology version.
+ * @param participants Participant nodes.
+ * @return Server latch instance.
+ */
+ private Latch createServerLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) {
+ final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, topVer);
+
+ if (serverLatches.containsKey(latchId))
+ return serverLatches.get(latchId);
+
+ ServerLatch latch = new ServerLatch(id, topVer, participants);
+
+ serverLatches.put(latchId, latch);
+
+ if (log.isDebugEnabled())
+ log.debug("Server latch is created [latch=" + latchId + ", participantsSize=" + participants.size() + "]");
+
+ if (pendingAcks.containsKey(latchId)) {
+ Set<UUID> acks = pendingAcks.get(latchId);
+
+ for (UUID node : acks)
+ if (latch.hasParticipant(node) && !latch.hasAck(node))
+ latch.ack(node);
+
+ pendingAcks.remove(latchId);
+ }
+
+ if (latch.isCompleted())
+ serverLatches.remove(latchId);
+
+ return latch;
+ }
+
+ /**
+ * Creates client latch.
+ * If there is final ack corresponds to given {@code id} and {@code topVer}, latch will be completed immediately.
+ *
+ * @param id Latch id.
+ * @param topVer Latch topology version.
+ * @param coordinator Coordinator node.
+ * @param participants Participant nodes.
+ * @return Client latch instance.
+ */
+ private Latch createClientLatch(String id, AffinityTopologyVersion topVer, ClusterNode coordinator, Collection<ClusterNode> participants) {
+ final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, topVer);
+
+ if (clientLatches.containsKey(latchId))
+ return clientLatches.get(latchId);
+
+ ClientLatch latch = new ClientLatch(id, topVer, coordinator, participants);
+
+ if (log.isDebugEnabled())
+ log.debug("Client latch is created [latch=" + latchId
+ + ", crd=" + coordinator
+ + ", participantsSize=" + participants.size() + "]");
+
+ // There is final ack for created latch.
+ if (pendingAcks.containsKey(latchId)) {
+ latch.complete();
+ pendingAcks.remove(latchId);
+ }
+ else
+ clientLatches.put(latchId, latch);
+
+ return latch;
+ }
+
+ /**
+ * Creates new latch with specified {@code id} and {@code topVer} or returns existing latch.
+ *
+ * Participants of latch are calculated from given {@code topVer} as alive server nodes.
+ * If local node is coordinator {@code ServerLatch} instance will be created, otherwise {@code ClientLatch} instance.
+ *
+ * @param id Latch id.
+ * @param topVer Latch topology version.
+ * @return Latch instance.
+ */
+ public Latch getOrCreate(String id, AffinityTopologyVersion topVer) {
+ lock.lock();
+
+ try {
+ ClusterNode coordinator = getLatchCoordinator(topVer);
+
+ if (coordinator == null) {
+ ClientLatch latch = new ClientLatch(id, AffinityTopologyVersion.NONE, null, Collections.emptyList());
+ latch.complete();
+
+ return latch;
+ }
+
+ Collection<ClusterNode> participants = getLatchParticipants(topVer);
+
+ return coordinator.isLocal()
+ ? createServerLatch(id, topVer, participants)
+ : createClientLatch(id, topVer, coordinator, participants);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @param topVer Latch topology version.
+ * @return Collection of alive server nodes with latch functionality.
+ */
+ private Collection<ClusterNode> getLatchParticipants(AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> aliveNodes = topVer == AffinityTopologyVersion.NONE
+ ? discovery.aliveServerNodes()
+ : discovery.discoCache(topVer).aliveServerNodes();
+
+ return aliveNodes
+ .stream()
+ .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * @param topVer Latch topology version.
+ * @return Oldest alive server node with latch functionality.
+ */
+ @Nullable private ClusterNode getLatchCoordinator(AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> aliveNodes = topVer == AffinityTopologyVersion.NONE
+ ? discovery.aliveServerNodes()
+ : discovery.discoCache(topVer).aliveServerNodes();
+
+ return aliveNodes
+ .stream()
+ .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
+ .findFirst()
+ .orElse(null);
+ }
+
+ /**
+ * Processes ack message from given {@code from} node.
+ *
+ * Completes client latch in case of final ack message.
+ *
+ * If no latch is associated with message, ack is placed to {@link #pendingAcks} set.
+ *
+ * @param from Node sent ack.
+ * @param message Ack message.
+ */
+ private void processAck(UUID from, LatchAckMessage message) {
+ lock.lock();
+
+ try {
+ ClusterNode coordinator = getLatchCoordinator(AffinityTopologyVersion.NONE);
+
+ if (coordinator == null)
+ return;
+
+ T2<String, AffinityTopologyVersion> latchId = new T2<>(message.latchId(), message.topVer());
+
+ if (message.isFinal()) {
+ if (log.isDebugEnabled())
+ log.debug("Process final ack [latch=" + latchId + ", from=" + from + "]");
+
+ if (clientLatches.containsKey(latchId)) {
+ ClientLatch latch = clientLatches.remove(latchId);
+ latch.complete();
+ }
+ else if (!coordinator.isLocal()) {
+ pendingAcks.computeIfAbsent(latchId, (id) -> new GridConcurrentHashSet<>());
+ pendingAcks.get(latchId).add(from);
+ }
+ } else {
+ if (log.isDebugEnabled())
+ log.debug("Process ack [latch=" + latchId + ", from=" + from + "]");
+
+ if (serverLatches.containsKey(latchId)) {
+ ServerLatch latch = serverLatches.get(latchId);
+
+ if (latch.hasParticipant(from) && !latch.hasAck(from)) {
+ latch.ack(from);
+
+ if (latch.isCompleted())
+ serverLatches.remove(latchId);
+ }
+ }
+ else {
+ pendingAcks.computeIfAbsent(latchId, (id) -> new GridConcurrentHashSet<>());
+ pendingAcks.get(latchId).add(from);
+ }
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Changes coordinator to current local node.
+ * Restores all server latches from pending acks and own client latches.
+ */
+ private void becomeNewCoordinator() {
+ if (log.isInfoEnabled())
+ log.info("Become new coordinator " + coordinator.id());
+
+ List<T2<String, AffinityTopologyVersion>> latchesToRestore = new ArrayList<>();
+ latchesToRestore.addAll(pendingAcks.keySet());
+ latchesToRestore.addAll(clientLatches.keySet());
+
+ for (T2<String, AffinityTopologyVersion> latchId : latchesToRestore) {
+ String id = latchId.get1();
+ AffinityTopologyVersion topVer = latchId.get2();
+ Collection<ClusterNode> participants = getLatchParticipants(topVer);
+
+ if (!participants.isEmpty())
+ createServerLatch(id, topVer, participants);
+ }
+ }
+
+ /**
+ * Handles node left discovery event.
+ *
+ * Summary:
+ * Removes pending acks corresponds to the left node.
+ * Adds fake acknowledgements to server latches where such node was participant.
+ * Changes client latches coordinator to oldest available server node where such node was coordinator.
+ * Detects coordinator change.
+ *
+ * @param left Left node.
+ */
+ private void processNodeLeft(ClusterNode left) {
+ assert this.coordinator != null : "Coordinator is not initialized";
+
+ lock.lock();
+
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Process node left " + left.id());
+
+ ClusterNode coordinator = getLatchCoordinator(AffinityTopologyVersion.NONE);
+
+ if (coordinator == null)
+ return;
+
+ // Clear pending acks.
+ for (Map.Entry<T2<String, AffinityTopologyVersion>, Set<UUID>> ackEntry : pendingAcks.entrySet())
+ if (ackEntry.getValue().contains(left.id()))
+ pendingAcks.get(ackEntry.getKey()).remove(left.id());
+
+ // Change coordinator for client latches.
+ for (Map.Entry<T2<String, AffinityTopologyVersion>, ClientLatch> latchEntry : clientLatches.entrySet()) {
+ ClientLatch latch = latchEntry.getValue();
+ if (latch.hasCoordinator(left.id())) {
+ // Change coordinator for latch and re-send ack if necessary.
+ if (latch.hasParticipant(coordinator.id()))
+ latch.newCoordinator(coordinator);
+ else {
+ /* If new coordinator is not able to take control on the latch,
+ it means that all other latch participants are left from topology
+ and there is no reason to track such latch. */
+ AffinityTopologyVersion topVer = latchEntry.getKey().get2();
+
+ assert getLatchParticipants(topVer).isEmpty();
+
+ latch.complete(new IgniteCheckedException("All latch participants are left from topology."));
+ clientLatches.remove(latchEntry.getKey());
+ }
+ }
+ }
+
+ // Add acknowledgements from left node.
+ for (Map.Entry<T2<String, AffinityTopologyVersion>, ServerLatch> latchEntry : serverLatches.entrySet()) {
+ ServerLatch latch = latchEntry.getValue();
+
+ if (latch.hasParticipant(left.id()) && !latch.hasAck(left.id())) {
+ if (log.isDebugEnabled())
+ log.debug("Process node left [latch=" + latchEntry.getKey() + ", left=" + left.id() + "]");
+
+ latch.ack(left.id());
+
+ if (latch.isCompleted())
+ serverLatches.remove(latchEntry.getKey());
+ }
+ }
+
+ // Coordinator is changed.
+ if (coordinator.isLocal() && this.coordinator.id() != coordinator.id()) {
+ this.coordinator = coordinator;
+
+ becomeNewCoordinator();
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Latch creating on coordinator node.
+ * Latch collects acks from participants: non-coordinator nodes and current local node.
+ * Latch completes when all acks from all participants are received.
+ *
+ * After latch completion final ack is sent to all participants.
+ */
+ class ServerLatch extends CompletableLatch {
+ /** Number of latch permits. This is needed to track number of countDown invocations. */
+ private final AtomicInteger permits;
+
+ /** Set of received acks. */
+ private final Set<UUID> acks = new GridConcurrentHashSet<>();
+
+ /**
+ * Constructor.
+ *
+ * @param id Latch id.
+ * @param topVer Latch topology version.
+ * @param participants Participant nodes.
+ */
+ ServerLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) {
+ super(id, topVer, participants);
+ this.permits = new AtomicInteger(participants.size());
+
+ // Send final acks when latch is completed.
+ this.complete.listen(f -> {
+ for (ClusterNode node : participants) {
+ try {
+ if (discovery.alive(node)) {
+ io.sendToGridTopic(node, GridTopic.TOPIC_EXCHANGE, new LatchAckMessage(id, topVer, true), GridIoPolicy.SYSTEM_POOL);
+
+ if (log.isDebugEnabled())
+ log.debug("Final ack is ackSent [latch=" + latchId() + ", to=" + node.id() + "]");
+ }
+ } catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Unable to send final ack [latch=" + latchId() + ", to=" + node.id() + "]");
+ }
+ }
+ });
+ }
+
+ /**
+ * Checks if latch has ack from given node.
+ *
+ * @param from Node.
+ * @return {@code true} if latch has ack from given node.
+ */
+ private boolean hasAck(UUID from) {
+ return acks.contains(from);
+ }
+
+ /**
+ * Receives ack from given node.
+ * Count downs latch if ack was not already processed.
+ *
+ * @param from Node.
+ */
+ private void ack(UUID from) {
+ if (log.isDebugEnabled())
+ log.debug("Ack is accepted [latch=" + latchId() + ", from=" + from + "]");
+
+ countDown0(from);
+ }
+
+ /**
+ * Count down latch from ack of given node.
+ * Completes latch if all acks are received.
+ *
+ * @param node Node.
+ */
+ private void countDown0(UUID node) {
+ if (isCompleted() || acks.contains(node))
+ return;
+
+ acks.add(node);
+
+ int remaining = permits.decrementAndGet();
+
+ if (log.isDebugEnabled())
+ log.debug("Count down + [latch=" + latchId() + ", remaining=" + remaining + "]");
+
+ if (remaining == 0)
+ complete();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void countDown() {
+ countDown0(ctx.localNodeId());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ Set<UUID> pendingAcks = participants.stream().filter(ack -> !acks.contains(ack)).collect(Collectors.toSet());
+
+ return S.toString(ServerLatch.class, this,
+ "pendingAcks", pendingAcks,
+ "super", super.toString());
+ }
+ }
+
+ /**
+ * Latch creating on non-coordinator node.
+ * Latch completes when final ack from coordinator is received.
+ */
+ class ClientLatch extends CompletableLatch {
+ /** Latch coordinator node. Can be changed if coordinator is left from topology. */
+ private volatile ClusterNode coordinator;
+
+ /** Flag indicates that ack is sent to coordinator. */
+ private boolean ackSent;
+
+ /**
+ * Constructor.
+ *
+ * @param id Latch id.
+ * @param topVer Latch topology version.
+ * @param coordinator Coordinator node.
+ * @param participants Participant nodes.
+ */
+ ClientLatch(String id, AffinityTopologyVersion topVer, ClusterNode coordinator, Collection<ClusterNode> participants) {
+ super(id, topVer, participants);
+
+ this.coordinator = coordinator;
+ }
+
+ /**
+ * Checks if latch coordinator is given {@code node}.
+ *
+ * @param node Node.
+ * @return {@code true} if latch coordinator is given node.
+ */
+ private boolean hasCoordinator(UUID node) {
+ return coordinator.id().equals(node);
+ }
+
+ /**
+ * Changes coordinator of latch and resends ack to new coordinator if needed.
+ *
+ * @param coordinator New coordinator.
+ */
+ private void newCoordinator(ClusterNode coordinator) {
+ if (log.isDebugEnabled())
+ log.debug("Coordinator is changed [latch=" + latchId() + ", crd=" + coordinator.id() + "]");
+
+ synchronized (this) {
+ this.coordinator = coordinator;
+
+ // Resend ack to new coordinator.
+ if (ackSent)
+ sendAck();
+ }
+ }
+
+ /**
+ * Sends ack to coordinator node.
+ * There is ack deduplication on coordinator. So it's fine to send same ack twice.
+ */
+ private void sendAck() {
+ try {
+ ackSent = true;
+
+ io.sendToGridTopic(coordinator, GridTopic.TOPIC_EXCHANGE, new LatchAckMessage(id, topVer, false), GridIoPolicy.SYSTEM_POOL);
+
+ if (log.isDebugEnabled())
+ log.debug("Ack is ackSent + [latch=" + latchId() + ", to=" + coordinator.id() + "]");
+ } catch (IgniteCheckedException e) {
+ // Coordinator is unreachable. On coodinator node left discovery event ack will be resent.
+ if (log.isDebugEnabled())
+ log.debug("Unable to send ack [latch=" + latchId() + ", to=" + coordinator.id() + "]: " + e.getMessage());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void countDown() {
+ if (isCompleted())
+ return;
+
+ // Synchronize in case of changed coordinator.
+ synchronized (this) {
+ sendAck();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ClientLatch.class, this,
+ "super", super.toString());
+ }
+ }
+
+ /**
+ * Base latch functionality with implemented complete / await logic.
+ */
+ private abstract static class CompletableLatch implements Latch {
+ /** Latch id. */
+ @GridToStringInclude
+ protected final String id;
+
+ /** Latch topology version. */
+ @GridToStringInclude
+ protected final AffinityTopologyVersion topVer;
+
+ /** Latch node participants. Only participant nodes are able to change state of latch. */
+ @GridToStringExclude
+ protected final Set<UUID> participants;
+
+ /** Future indicates that latch is completed. */
+ @GridToStringExclude
+ protected final GridFutureAdapter<?> complete = new GridFutureAdapter<>();
+
+ /**
+ * Constructor.
+ *
+ * @param id Latch id.
+ * @param topVer Latch topology version.
+ * @param participants Participant nodes.
+ */
+ CompletableLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) {
+ this.id = id;
+ this.topVer = topVer;
+ this.participants = participants.stream().map(ClusterNode::id).collect(Collectors.toSet());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void await() throws IgniteCheckedException {
+ complete.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void await(long timeout, TimeUnit timeUnit) throws IgniteCheckedException {
+ complete.get(timeout, timeUnit);
+ }
+
+ /**
+ * Checks if latch participants contain given {@code node}.
+ *
+ * @param node Node.
+ * @return {@code true} if latch participants contain given node.
+ */
+ boolean hasParticipant(UUID node) {
+ return participants.contains(node);
+ }
+
+ /**
+ * @return {@code true} if latch is completed.
+ */
+ boolean isCompleted() {
+ return complete.isDone();
+ }
+
+ /**
+ * Completes current latch.
+ */
+ void complete() {
+ complete.onDone();
+ }
+
+ /**
+ * Completes current latch with given {@code error}.
+ *
+ * @param error Error.
+ */
+ void complete(Throwable error) {
+ complete.onDone(error);
+ }
+
+ /**
+ * @return Full latch id.
+ */
+ String latchId() {
+ return id + "-" + topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CompletableLatch.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java
new file mode 100644
index 0000000..9704c2e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Simple distributed count down latch interface.
+ * Latch supports count down and await logic.
+ * Latch functionality is not relied on caches and has own state management {@link ExchangeLatchManager}.
+ */
+public interface Latch {
+ /**
+ * Decrements count on current latch.
+ * Release all latch waiters on all nodes if count reaches zero.
+ *
+ * This is idempotent operation. Invoking this method twice or more on the same node doesn't have any effect.
+ */
+ void countDown();
+
+ /**
+ * Awaits current latch completion.
+ *
+ * @throws IgniteCheckedException If await is failed.
+ */
+ void await() throws IgniteCheckedException;
+
+ /**
+ * Awaits current latch completion with specified timeout.
+ *
+ * @param timeout Timeout value.
+ * @param timeUnit Timeout time unit.
+ * @throws IgniteCheckedException If await is failed.
+ */
+ void await(long timeout, TimeUnit timeUnit) throws IgniteCheckedException;
+}