You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/21 10:49:31 UTC
ignite git commit: ignite-5872 Replace standard java maps for
partition counters with more effective data structures
Repository: ignite
Updated Branches:
refs/heads/master 3084f00ee -> 918c409f5
ignite-5872 Replace standard java maps for partition counters with more effective data structures
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/918c409f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/918c409f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/918c409f
Branch: refs/heads/master
Commit: 918c409f51b5be9058590e0e7f1fbfd4c7f48573
Parents: 3084f00
Author: Alexey Goncharuk <ag...@apache.org>
Authored: Mon Aug 21 13:49:22 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Aug 21 13:49:22 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 2 +-
.../GridCachePartitionExchangeManager.java | 45 +++---
.../cache/IgniteCacheOffheapManager.java | 2 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 7 +-
.../dht/GridClientPartitionTopology.java | 59 +++----
.../dht/GridDhtPartitionTopology.java | 25 ++-
.../dht/GridDhtPartitionTopologyImpl.java | 145 ++++++++++-------
.../CachePartitionFullCountersMap.java | 99 ++++++++++++
.../CachePartitionPartialCountersMap.java | 161 +++++++++++++++++++
.../GridDhtPartitionsAbstractMessage.java | 8 -
.../GridDhtPartitionsExchangeFuture.java | 36 +++--
.../preloader/GridDhtPartitionsFullMessage.java | 13 +-
.../GridDhtPartitionsSingleMessage.java | 21 +--
.../GridDhtPartitionsSingleRequest.java | 8 -
.../IgniteDhtPartitionCountersMap.java | 14 +-
.../persistence/GridCacheOffheapManager.java | 2 +-
.../continuous/GridContinuousProcessor.java | 7 +-
...ContinuousQueryFailoverAbstractSelfTest.java | 11 +-
.../ignite/testsuites/IgniteBasicTestSuite.java | 3 +-
19 files changed, 475 insertions(+), 193 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 5d77c9e..39a5ea8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -459,7 +459,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (clientTop != null) {
grp.topology().update(grpHolder.affinity().lastVersion(),
clientTop.partitionMap(true),
- clientTop.updateCounters(false),
+ clientTop.fullUpdateCounters(),
Collections.<Integer>emptySet(),
null);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 08eb1bf..200f677 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -723,23 +723,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (top != null)
return top;
- Object affKey = null;
-
CacheGroupDescriptor grpDesc = cctx.affinity().cacheGroups().get(grpId);
- if (grpDesc != null) {
- CacheConfiguration<?, ?> ccfg = grpDesc.config();
+ assert grpDesc != null : grpId;
- AffinityFunction aff = ccfg.getAffinity();
+ CacheConfiguration<?, ?> ccfg = grpDesc.config();
- affKey = cctx.kernalContext().affinity().similaryAffinityKey(aff,
- ccfg.getNodeFilter(),
- ccfg.getBackups(),
- aff.partitions());
- }
+ AffinityFunction aff = ccfg.getAffinity();
+
+ Object affKey = cctx.kernalContext().affinity().similaryAffinityKey(aff,
+ ccfg.getNodeFilter(),
+ ccfg.getBackups(),
+ aff.partitions());
GridClientPartitionTopology old = clientTops.putIfAbsent(grpId,
- top = new GridClientPartitionTopology(cctx, grpId, affKey));
+ top = new GridClientPartitionTopology(cctx, grpId, aff.partitions(), affKey));
return old != null ? old : top;
}
@@ -1004,8 +1002,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* finishUnmarshall methods are called).
* @param exchId Non-null exchange ID if message is created for exchange.
* @param lastVer Last version.
- * @param partHistSuppliers
- * @param partsToReload
+ * @param partHistSuppliers Partition history suppliers map.
+ * @param partsToReload Partitions to reload map.
* @return Message.
*/
public GridDhtPartitionsFullMessage createPartitionsFullMessage(
@@ -1049,7 +1047,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
if (exchId != null)
- m.addPartitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true));
+ m.addPartitionUpdateCounters(grp.groupId(), grp.topology().fullUpdateCounters());
}
}
@@ -1067,7 +1065,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
if (exchId != null)
- m.addPartitionUpdateCounters(top.groupId(), top.updateCounters(true));
+ m.addPartitionUpdateCounters(top.groupId(), top.fullUpdateCounters());
}
return m;
@@ -1120,7 +1118,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id,
cctx.kernalContext().clientNode(),
- false);
+ false,
+ null);
if (log.isDebugEnabled())
log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']');
@@ -1144,10 +1143,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param sndCounters {@code True} if need send partition update counters.
* @return Message.
*/
- public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(@Nullable GridDhtPartitionExchangeId exchangeId,
+ public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(
+ @Nullable GridDhtPartitionExchangeId exchangeId,
boolean clientOnlyExchange,
- boolean sndCounters)
- {
+ boolean sndCounters,
+ ExchangeActions exchActions
+ ) {
GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId,
clientOnlyExchange,
cctx.versions().last(),
@@ -1156,7 +1157,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
Map<Object, T2<Integer, GridPartitionStateMap>> dupData = new HashMap<>();
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (!grp.isLocal()) {
+ if (!grp.isLocal() && (exchActions == null || !exchActions.cacheGroupStopping(grp.groupId()))) {
GridDhtPartitionMap locMap = grp.topology().localPartitionMap();
addPartitionMap(m,
@@ -1167,7 +1168,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
grp.affinity().similarAffinityKey());
if (sndCounters)
- m.partitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true));
+ m.partitionUpdateCounters(grp.groupId(), grp.topology().localUpdateCounters(true));
}
}
@@ -1185,7 +1186,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top.similarAffinityKey());
if (sndCounters)
- m.partitionUpdateCounters(top.groupId(), top.updateCounters(true));
+ m.partitionUpdateCounters(top.groupId(), top.localUpdateCounters(true));
}
return m;
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 001848e..4531802 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -414,7 +414,7 @@ public interface IgniteCacheOffheapManager {
/**
* @return Initial update counter.
*/
- public Long initialUpdateCounter();
+ public long initialUpdateCounter();
/**
* @param cctx Cache context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index ba6c89d..9e48d45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1054,8 +1054,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/** */
private final ConcurrentMap<Integer, AtomicLong> cacheSizes = new ConcurrentHashMap<>();
- /** Initialized update counter. */
- protected Long initCntr = 0L;
+ /** Initial update counter. */
+ protected long initCntr;
/**
* @param partId Partition number.
@@ -1600,7 +1600,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public Long initialUpdateCounter() {
+ @Override public long initialUpdateCounter() {
return initCntr;
}
@@ -1629,7 +1629,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
* @param key Key.
* @param oldVal Old value.
* @param newVal New value.
- * @throws IgniteCheckedException If failed.
*/
private void updateIgfsMetrics(
GridCacheContext cctx,
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 745e7d7..77792c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -39,6 +39,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
@@ -48,7 +50,6 @@ import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridPartitionStateMap;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -106,7 +107,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
/** Partition update counters. */
- private Map<Integer, T2<Long, Long>> cntrMap = new HashMap<>();
+ private CachePartitionFullCountersMap cntrMap;
/** */
private final Object similarAffKey;
@@ -117,11 +118,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/**
* @param cctx Context.
* @param grpId Group ID.
+ * @param parts Number of partitions in the group.
* @param similarAffKey Key to find caches with similar affinity.
*/
public GridClientPartitionTopology(
GridCacheSharedContext<?, ?> cctx,
int grpId,
+ int parts,
Object similarAffKey
) {
this.cctx = cctx;
@@ -135,6 +138,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
node2part = new GridDhtPartitionFullMap(cctx.localNode().id(),
cctx.localNode().order(),
updateSeq.get());
+
+ cntrMap = new CachePartitionFullCountersMap(parts);
}
/**
@@ -641,7 +646,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
@Override public boolean update(
@Nullable AffinityTopologyVersion exchangeVer,
GridDhtPartitionFullMap partMap,
- Map<Integer, T2<Long, Long>> cntrMap,
+ @Nullable CachePartitionFullCountersMap cntrMap,
Set<Integer> partsToReload,
@Nullable AffinityTopologyVersion msgTopVer) {
if (log.isDebugEnabled())
@@ -744,7 +749,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
if (cntrMap != null)
- this.cntrMap = new HashMap<>(cntrMap);
+ this.cntrMap = new CachePartitionFullCountersMap(cntrMap);
consistencyCheck();
@@ -759,17 +764,22 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) {
+ @Override public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap) {
assert cntrMap != null;
lock.writeLock().lock();
try {
- for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
- T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+ for (int i = 0; i < cntrMap.size(); i++) {
+ int pId = cntrMap.partitionAt(i);
+
+ long initialUpdateCntr = cntrMap.initialUpdateCounterAt(i);
+ long updateCntr = cntrMap.updateCounterAt(i);
- if (cntr == null || cntr.get2() < e.getValue().get2())
- this.cntrMap.put(e.getKey(), e.getValue());
+ if (this.cntrMap.updateCounter(pId) < updateCntr) {
+ this.cntrMap.initialUpdateCounter(pId, initialUpdateCntr);
+ this.cntrMap.updateCounter(pId, updateCntr);
+ }
}
}
finally {
@@ -777,6 +787,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
}
+ /** {@inheritDoc} */
+ @Override public void applyUpdateCounters() {
+ // No-op on client topology.
+ }
+
/**
* Method checks is new partition map more stale than current partition map
* New partition map is stale if topology version or update sequence are less than of current map
@@ -1097,33 +1112,23 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros) {
+ @Override public CachePartitionFullCountersMap fullUpdateCounters() {
lock.readLock().lock();
try {
- if (skipZeros) {
- Map<Integer, T2<Long, Long>> res = U.newHashMap(cntrMap.size());
-
- for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
- T2<Long, Long> val = e.getValue();
-
- if (val.get1() == 0L && val.get2() == 0L)
- continue;
-
- res.put(e.getKey(), e.getValue());
- }
-
- return res;
- }
- else
- return new HashMap<>(cntrMap);
-}
+ return new CachePartitionFullCountersMap(cntrMap);
+ }
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
+ @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) {
+ return CachePartitionPartialCountersMap.EMPTY;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
assert false : "Should not be called on non-affinity node";
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 0dea5e4..22205ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
@@ -29,12 +28,13 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.T2;
import org.jetbrains.annotations.Nullable;
/**
@@ -257,7 +257,7 @@ public interface GridDhtPartitionTopology {
* means full map received is not related to exchange
* @param partMap Update partition map.
* @param cntrMap Partition update counters.
- * @param partsToReload
+ * @param partsToReload Set of partitions that need to be reloaded.
* @param msgTopVer Topology version from incoming message. This value is not null only for case message is not
* related to exchange. Value should be not less than previous 'Topology version from exchange'.
* @return {@code True} if local state was changed.
@@ -265,7 +265,7 @@ public interface GridDhtPartitionTopology {
public boolean update(
@Nullable AffinityTopologyVersion exchangeResVer,
GridDhtPartitionFullMap partMap,
- @Nullable Map<Integer, T2<Long, Long>> cntrMap,
+ @Nullable CachePartitionFullCountersMap cntrMap,
Set<Integer> partsToReload,
@Nullable AffinityTopologyVersion msgTopVer);
@@ -280,9 +280,16 @@ public interface GridDhtPartitionTopology {
boolean force);
/**
+ * Collects update counters collected during exchange. Called on coordinator.
+ *
* @param cntrMap Counters map.
*/
- public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap);
+ public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap);
+
+ /**
+ * Applies update counters collected during exchange on coordinator. Called on coordinator.
+ */
+ public void applyUpdateCounters();
/**
* Checks if there is at least one owner for each partition in the cache topology.
@@ -309,10 +316,14 @@ public interface GridDhtPartitionTopology {
public Collection<Integer> lostPartitions();
/**
- * @param skipZeros If {@code true} then filters out zero counters.
* @return Partition update counters.
*/
- public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros);
+ public CachePartitionFullCountersMap fullUpdateCounters();
+
+ /**
+ * @return Partition update counters.
+ */
+ public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros);
/**
* @param part Partition to own.
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index d1e4fa9..16fe012 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -43,6 +43,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
@@ -53,7 +55,6 @@ import org.apache.ignite.internal.util.GridPartitionStateMap;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -131,7 +132,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16);
/** Partition update counter. */
- private Map<Integer, T2<Long, Long>> cntrMap = new HashMap<>();
+ private final CachePartitionFullCountersMap cntrMap;
/** */
private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
@@ -140,8 +141,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @param ctx Cache shared context.
* @param grp Cache group.
*/
- public GridDhtPartitionTopologyImpl(GridCacheSharedContext ctx,
- CacheGroupContext grp) {
+ public GridDhtPartitionTopologyImpl(
+ GridCacheSharedContext ctx,
+ CacheGroupContext grp
+ ) {
assert ctx != null;
assert grp != null;
@@ -153,6 +156,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
timeLog = ctx.logger(GridDhtPartitionsExchangeFuture.EXCHANGE_LOG);
locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions());
+
+ cntrMap = new CachePartitionFullCountersMap(locParts.length());
}
/** {@inheritDoc} */
@@ -713,10 +718,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p));
- T2<Long, Long> cntr = cntrMap.get(p);
+ long updCntr = cntrMap.updateCounter(p);
- if (cntr != null)
- loc.updateCounter(cntr.get2());
+ if (updCntr != 0)
+ loc.updateCounter(updCntr);
if (ctx.pageStore() != null) {
try {
@@ -1165,7 +1170,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
@Override public boolean update(
@Nullable AffinityTopologyVersion exchangeVer,
GridDhtPartitionFullMap partMap,
- @Nullable Map<Integer, T2<Long, Long>> incomeCntrMap,
+ @Nullable CachePartitionFullCountersMap incomeCntrMap,
Set<Integer> partsToReload,
@Nullable AffinityTopologyVersion msgTopVer) {
if (log.isDebugEnabled())
@@ -1180,14 +1185,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
return false;
if (incomeCntrMap != null) {
- // update local map partition counters
- for (Map.Entry<Integer, T2<Long, Long>> e : incomeCntrMap.entrySet()) {
- T2<Long, Long> existCntr = this.cntrMap.get(e.getKey());
-
- if (existCntr == null || existCntr.get2() < e.getValue().get2())
- this.cntrMap.put(e.getKey(), e.getValue());
- }
-
// update local counters in partitions
for (int i = 0; i < locParts.length(); i++) {
GridDhtLocalPartition part = locParts.get(i);
@@ -1195,10 +1192,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (part == null)
continue;
- T2<Long, Long> cntr = incomeCntrMap.get(part.id());
+ if (part.state() == OWNING || part.state() == MOVING) {
+ long updCntr = incomeCntrMap.updateCounter(part.id());
- if (cntr != null)
- part.updateCounter(cntr.get2());
+ if (updCntr != 0 && updCntr > part.updateCounter())
+ part.updateCounter(updCntr);
+ }
}
}
@@ -1330,13 +1329,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
assert locPart != null : grp.cacheOrGroupName();
- if (incomeCntrMap != null) {
- T2<Long, Long> cntr = incomeCntrMap.get(p);
-
- if (cntr != null && cntr.get2() > locPart.updateCounter())
- locPart.updateCounter(cntr.get2());
- }
-
if (locPart.state() == MOVING) {
boolean success = locPart.own();
@@ -1356,13 +1348,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
changed = true;
}
-
- if (incomeCntrMap != null) {
- T2<Long, Long> cntr = incomeCntrMap.get(p);
-
- if (cntr != null && cntr.get2() > locPart.updateCounter())
- locPart.updateCounter(cntr.get2());
- }
}
else if (state == RENTING && partsToReload.contains(p)) {
GridDhtLocalPartition locPart = locParts.get(p);
@@ -1412,7 +1397,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) {
+ @Override public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap) {
assert cntrMap != null;
long now = U.currentTimeMillis();
@@ -1431,12 +1416,40 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (stopping)
return;
- for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
- T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+ for (int i = 0; i < cntrMap.size(); i++) {
+ int pId = cntrMap.partitionAt(i);
+
+ long initialUpdateCntr = cntrMap.initialUpdateCounterAt(i);
+ long updateCntr = cntrMap.updateCounterAt(i);
- if (cntr == null || cntr.get2() < e.getValue().get2())
- this.cntrMap.put(e.getKey(), e.getValue());
+ if (this.cntrMap.updateCounter(pId) < updateCntr) {
+ this.cntrMap.initialUpdateCounter(pId, initialUpdateCntr);
+ this.cntrMap.updateCounter(pId, updateCntr);
+ }
}
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void applyUpdateCounters() {
+ long now = U.currentTimeMillis();
+
+ lock.writeLock().lock();
+
+ try {
+ long acquired = U.currentTimeMillis();
+
+ if (acquired - now >= 100) {
+ if (timeLog.isInfoEnabled())
+ timeLog.info("Waited too long to acquire topology write lock " +
+ "[cache=" + grp.groupId() + ", waitTime=" + (acquired - now) + ']');
+ }
+
+ if (stopping)
+ return;
for (int i = 0; i < locParts.length(); i++) {
GridDhtLocalPartition part = locParts.get(i);
@@ -1444,12 +1457,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (part == null)
continue;
- T2<Long, Long> cntr = cntrMap.get(part.id());
+ long updCntr = cntrMap.updateCounter(part.id());
- if (cntr != null && cntr.get2() > part.updateCounter())
- part.updateCounter(cntr.get2());
- else if (part.updateCounter() > 0)
- this.cntrMap.put(part.id(), new T2<>(part.initialUpdateCounter(), part.updateCounter()));
+ if (updCntr > part.updateCounter())
+ part.updateCounter(updCntr);
+ else if (part.updateCounter() > 0) {
+ cntrMap.initialUpdateCounter(part.id(), part.initialUpdateCounter());
+ cntrMap.updateCounter(part.id(), part.updateCounter());
+ }
}
}
finally {
@@ -2178,26 +2193,32 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros) {
+ @Override public CachePartitionFullCountersMap fullUpdateCounters() {
lock.readLock().lock();
try {
- Map<Integer, T2<Long, Long>> res;
+ return new CachePartitionFullCountersMap(cntrMap);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
- if (skipZeros) {
- res = U.newHashMap(cntrMap.size());
+ /** {@inheritDoc} */
+ @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) {
+ lock.readLock().lock();
- for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
- Long cntr = e.getValue().get2();
+ try {
+ int locPartCnt = 0;
- if (ZERO.equals(cntr))
- continue;
+ for (int i = 0; i < locParts.length(); i++) {
+ GridDhtLocalPartition part = locParts.get(i);
- res.put(e.getKey(), e.getValue());
- }
+ if (part != null)
+ locPartCnt++;
}
- else
- res = new HashMap<>(cntrMap);
+
+ CachePartitionPartialCountersMap res = new CachePartitionPartialCountersMap(locPartCnt);
for (int i = 0; i < locParts.length(); i++) {
GridDhtLocalPartition part = locParts.get(i);
@@ -2205,17 +2226,17 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (part == null)
continue;
- T2<Long, Long> cntr0 = res.get(part.id());
- Long initCntr = part.initialUpdateCounter();
+ long updCntr = part.updateCounter();
+ long initCntr = part.initialUpdateCounter();
- if (cntr0 == null || initCntr >= cntr0.get1()) {
- if (skipZeros && initCntr == 0L && part.updateCounter() == 0L)
- continue;
+ if (skipZeros && initCntr == 0L && updCntr == 0L)
+ continue;
- res.put(part.id(), new T2<>(initCntr, part.updateCounter()));
- }
+ res.add(part.id(), initCntr, updCntr);
}
+ res.trim();
+
return res;
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
new file mode 100644
index 0000000..1384a55
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ *
+ */
+public class CachePartitionFullCountersMap implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long[] initialUpdCntrs;
+
+ /** */
+ private long[] updCntrs;
+
+ /**
+ * @param other Map to copy.
+ */
+ public CachePartitionFullCountersMap(CachePartitionFullCountersMap other) {
+ initialUpdCntrs = Arrays.copyOf(other.initialUpdCntrs, other.initialUpdCntrs.length);
+ updCntrs = Arrays.copyOf(other.updCntrs, other.updCntrs.length);
+ }
+
+ /**
+ * @param partsCnt Total number of partitions.
+ */
+ public CachePartitionFullCountersMap(int partsCnt) {
+ initialUpdCntrs = new long[partsCnt];
+ updCntrs = new long[partsCnt];
+ }
+
+ /**
+ * Gets an initial update counter by the partition ID.
+ *
+ * @param p Partition ID.
+ * @return Initial update counter for the partition with the given ID.
+ */
+ public long initialUpdateCounter(int p) {
+ return initialUpdCntrs[p];
+ }
+
+ /**
+ * Gets an update counter by the partition ID.
+ *
+ * @param p Partition ID.
+ * @return Update counter for the partition with the given ID.
+ */
+ public long updateCounter(int p) {
+ return updCntrs[p];
+ }
+
+ /**
+ * Sets an initial update counter by the partition ID.
+ *
+ * @param p Partition ID.
+ * @param initialUpdCntr Initial update counter to set.
+ */
+ public void initialUpdateCounter(int p, long initialUpdCntr) {
+ initialUpdCntrs[p] = initialUpdCntr;
+ }
+
+ /**
+ * Sets an update counter by the partition ID.
+ *
+ * @param p Partition ID.
+ * @param updCntr Update counter to set.
+ */
+ public void updateCounter(int p, long updCntr) {
+ updCntrs[p] = updCntr;
+ }
+
+ /**
+ * Clears full counters map.
+ */
+ public void clear() {
+ Arrays.fill(initialUpdCntrs, 0);
+ Arrays.fill(updCntrs, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
new file mode 100644
index 0000000..851ffed
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class CachePartitionPartialCountersMap implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public static final CachePartitionPartialCountersMap EMPTY = new CachePartitionPartialCountersMap();
+
+ /** */
+ private int[] partIds;
+
+ /** */
+ private long[] initialUpdCntrs;
+
+ /** */
+ private long[] updCntrs;
+
+ /** */
+ private int curIdx;
+
+ /** */
+ private CachePartitionPartialCountersMap() {
+ // Empty map.
+ }
+
+ /**
+ * @param partsCnt Total number of partitions will be stored in the partial map.
+ */
+ public CachePartitionPartialCountersMap(int partsCnt) {
+ partIds = new int[partsCnt];
+ initialUpdCntrs = new long[partsCnt];
+ updCntrs = new long[partsCnt];
+ }
+
+ /**
+ * @return Total number of partitions added to the map.
+ */
+ public int size() {
+ return curIdx;
+ }
+
+ /**
+ * Adds partition counters for a partition with the given ID.
+ *
+ * @param partId Partition ID to add.
+ * @param initialUpdCntr Partition initial update counter.
+ * @param updCntr Partition update counter.
+ */
+ public void add(int partId, long initialUpdCntr, long updCntr) {
+ if (curIdx > 0) {
+ if (partIds[curIdx - 1] >= partId)
+ throw new IllegalArgumentException("Adding a partition in the wrong order " +
+ "[prevPart=" + partIds[curIdx - 1] + ", partId=" + partId + ']');
+ }
+
+ if (curIdx == partIds.length)
+ throw new IllegalStateException("Adding more partitions than reserved: " + partIds.length);
+
+ partIds[curIdx] = partId;
+ initialUpdCntrs[curIdx] = initialUpdCntr;
+ updCntrs[curIdx] = updCntr;
+
+ curIdx++;
+ }
+
+ /**
+ * Cuts the array sizes according to curIdx. No more entries can be added to this map
+ * after this method is called.
+ */
+ public void trim() {
+ if (curIdx < partIds.length) {
+ partIds = Arrays.copyOf(partIds, curIdx);
+ initialUpdCntrs = Arrays.copyOf(initialUpdCntrs, curIdx);
+ updCntrs = Arrays.copyOf(updCntrs, curIdx);
+ }
+ }
+
+ /**
+ * @param partId Partition ID to search.
+ * @return Partition index in the array.
+ */
+ public int partitionIndex(int partId) {
+ return Arrays.binarySearch(partIds, 0, curIdx, partId);
+ }
+
+ /**
+ * Gets partition ID saved at the given index.
+ *
+ * @param idx Index to get value from.
+ * @return Partition ID.
+ */
+ public int partitionAt(int idx) {
+ return partIds[idx];
+ }
+
+ /**
+ * Gets initial update counter saved at the given index.
+ *
+ * @param idx Index to get value from.
+ * @return Initial update counter.
+ */
+ public long initialUpdateCounterAt(int idx) {
+ return initialUpdCntrs[idx];
+ }
+
+ /**
+ * Gets update counter saved at the given index.
+ *
+ * @param idx Index to get value from.
+ * @return Update counter.
+ */
+ public long updateCounterAt(int idx) {
+ return updCntrs[idx];
+ }
+
+
+ /**
+ * @param cntrsMap Partial local counters map.
+ * @return Partition ID to partition counters map.
+ */
+ public static Map<Integer, T2<Long, Long>> toCountersMap(CachePartitionPartialCountersMap cntrsMap) {
+ if (cntrsMap.size() == 0)
+ return Collections.emptyMap();
+
+ Map<Integer, T2<Long, Long>> res = U.newHashMap(cntrsMap.size());
+
+ for (int idx = 0; idx < cntrsMap.size(); idx++)
+ res.put(cntrsMap.partitionAt(idx),
+ new T2<>(cntrsMap.initialUpdateCounterAt(idx), cntrsMap.updateCounterAt(idx)));
+
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 95c1a4f..84cc792 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -19,11 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.io.Externalizable;
import java.nio.ByteBuffer;
-import java.util.Map;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -111,12 +109,6 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
}
/**
- * @param grpId Cache group ID.
- * @return Parition update counters.
- */
- public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId);
-
- /**
* @return Last used version among all nodes.
*/
@Nullable public GridCacheVersion lastVersion() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index ca6ee5e..d64adc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -764,7 +764,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (updateTop && clientTop != null) {
top.update(null,
clientTop.partitionMap(true),
- clientTop.updateCounters(false),
+ clientTop.fullUpdateCounters(),
Collections.<Integer>emptySet(),
null);
}
@@ -1213,7 +1213,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
GridDhtPartitionsSingleMessage msg;
- // Reset lost partition before send local partition to coordinator.
+ // Reset lost partitions before sending local partitions to coordinator.
if (exchActions != null) {
Set<String> caches = exchActions.cachesToResetLostPartitions();
@@ -1230,7 +1230,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
else {
msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(),
false,
- true);
+ true,
+ exchActions);
Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
@@ -1958,10 +1959,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Map<Integer, Long> minCntrs = new HashMap<>();
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
- assert e.getValue().partitionUpdateCounters(top.groupId()) != null;
+ CachePartitionPartialCountersMap nodeCntrs = e.getValue().partitionUpdateCounters(top.groupId());
+
+ assert nodeCntrs != null;
- for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) {
- int p = e0.getKey();
+ for (int i = 0; i < nodeCntrs.size(); i++) {
+ int p = nodeCntrs.partitionAt(i);
UUID uuid = e.getKey();
@@ -1970,10 +1973,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING)
continue;
- Long cntr = state == GridDhtPartitionState.MOVING ? e0.getValue().get1() : e0.getValue().get2();
-
- if (cntr == null)
- cntr = 0L;
+ long cntr = state == GridDhtPartitionState.MOVING ?
+ nodeCntrs.initialUpdateCounterAt(i) :
+ nodeCntrs.updateCounterAt(i);
Long minCntr = minCntrs.get(p);
@@ -2233,10 +2235,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
GridDhtPartitionTopology top = grp != null ? grp.topology() :
cctx.exchange().clientTopology(grpId);
- Map<Integer, T2<Long, Long>> cntrs = msg.partitionUpdateCounters(grpId);
+ CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId);
if (cntrs != null)
- top.applyUpdateCounters(cntrs);
+ top.collectUpdateCounters(cntrs);
}
Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
@@ -2249,6 +2251,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
+ for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
+ if (!grpCtx.isLocal())
+ grpCtx.topology().applyUpdateCounters();
+ }
+
if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
assert firstDiscoEvt instanceof DiscoveryCustomEvent;
@@ -2563,7 +2570,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
GridDhtPartitionsSingleMessage res = cctx.exchange().createPartitionsSingleMessage(
msg.restoreExchangeId(),
cctx.kernalContext().clientNode(),
- true);
+ true,
+ exchActions);
if (localJoinExchange() && finishState0 == null)
res.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
@@ -2737,7 +2745,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
Integer grpId = entry.getKey();
- Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(grpId);
+ CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId);
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index a164e85..2bb19cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -33,7 +33,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -276,7 +275,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
* @param grpId Cache group ID.
* @param cntrMap Partition update counters.
*/
- public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) {
+ public void addPartitionUpdateCounters(int grpId, CachePartitionFullCountersMap cntrMap) {
if (partCntrs == null)
partCntrs = new IgniteDhtPartitionCountersMap();
@@ -287,14 +286,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
* @param grpId Cache group ID.
* @return Partition update counters.
*/
- @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) {
- if (partCntrs != null) {
- Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId);
-
- return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap();
- }
-
- return Collections.emptyMap();
+ public CachePartitionFullCountersMap partitionUpdateCounters(int grpId) {
+ return partCntrs == null ? null : partCntrs.get(grpId);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index bc7d314..44815ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -18,11 +18,11 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.util.Collection;
-import java.util.Map;
-import java.util.HashMap;
+import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collections;
-import java.io.Externalizable;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -62,7 +61,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** Partitions update counters. */
@GridToStringInclude
@GridDirectTransient
- private Map<Integer, Map<Integer, T2<Long, Long>>> partCntrs;
+ private Map<Integer, CachePartitionPartialCountersMap> partCntrs;
/** Serialized partitions counters. */
private byte[] partCntrsBytes;
@@ -190,7 +189,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
* @param grpId Cache group ID.
* @param cntrMap Partition update counters.
*/
- public void partitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) {
+ public void partitionUpdateCounters(int grpId, CachePartitionPartialCountersMap cntrMap) {
if (partCntrs == null)
partCntrs = new HashMap<>();
@@ -201,14 +200,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
* @param grpId Cache group ID.
* @return Partition update counters.
*/
- @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) {
- if (partCntrs != null) {
- Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId);
-
- return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap();
- }
+ public CachePartitionPartialCountersMap partitionUpdateCounters(int grpId) {
+ CachePartitionPartialCountersMap res = partCntrs == null ? null : partCntrs.get(grpId);
- return Collections.emptyMap();
+ return res == null ? CachePartitionPartialCountersMap.EMPTY : res;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
index 6317fbc..0be0f37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
@@ -19,9 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.io.Externalizable;
import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Map;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -78,11 +75,6 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
}
/** {@inheritDoc} */
- @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) {
- return Collections.emptyMap();
- }
-
- /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
index dc2fbf8..e7954d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
@@ -19,10 +19,8 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.io.Serializable;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.ignite.internal.util.typedef.T2;
/**
* Partition counters map.
@@ -32,7 +30,7 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
private static final long serialVersionUID = 0L;
/** */
- private Map<Integer, Map<Integer, T2<Long, Long>>> map;
+ private Map<Integer, CachePartitionFullCountersMap> map;
/**
* @return {@code True} if map is empty.
@@ -45,7 +43,7 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
* @param cacheId Cache ID.
* @param cntrMap Counters map.
*/
- public synchronized void putIfAbsent(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) {
+ public synchronized void putIfAbsent(int cacheId, CachePartitionFullCountersMap cntrMap) {
if (map == null)
map = new HashMap<>();
@@ -57,14 +55,14 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
* @param cacheId Cache ID.
* @return Counters map.
*/
- public synchronized Map<Integer, T2<Long, Long>> get(int cacheId) {
+ public synchronized CachePartitionFullCountersMap get(int cacheId) {
if (map == null)
- map = new HashMap<>();
+ return null;
- Map<Integer, T2<Long, Long>> cntrMap = map.get(cacheId);
+ CachePartitionFullCountersMap cntrMap = map.get(cacheId);
if (cntrMap == null)
- return Collections.emptyMap();
+ return null;
return cntrMap;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index ed6eee2..83a9f55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1167,7 +1167,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public Long initialUpdateCounter() {
+ @Override public long initialUpdateCounter() {
try {
CacheDataStore delegate0 = init0(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 7062353..fa52be2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -93,6 +93,7 @@ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.GridTopic.TOPIC_CONTINUOUS;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.toCountersMap;
import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_ACK;
import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_NOTIFICATION;
@@ -202,7 +203,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
GridCacheContext cctx = interCache != null ? interCache.context() : null;
if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
- cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters(false));
+ cntrsPerNode.put(ctx.localNodeId(),
+ toCountersMap(cctx.topology().localUpdateCounters(false)));
routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
}
@@ -1070,7 +1072,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
if (cache != null && !cache.isLocal() && cache.context().userCache())
- req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters(false));
+ req.addUpdateCounters(ctx.localNodeId(),
+ toCountersMap(cache.context().topology().localUpdateCounters(false)));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 43069cd..f91c689 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
@@ -519,11 +520,15 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
Affinity<Object> aff = grid(i).affinity(DEFAULT_CACHE_NAME);
- Map<Integer, T2<Long, Long>> act = grid(i).cachex(DEFAULT_CACHE_NAME).context().topology().updateCounters(false);
+ CachePartitionPartialCountersMap act = grid(i).cachex(DEFAULT_CACHE_NAME).context().topology()
+ .localUpdateCounters(false);
for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) {
- if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode()))
- assertEquals(e.getValue(), act.get(e.getKey()).get2());
+ if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode())) {
+ int partIdx = act.partitionIndex(e.getKey());
+
+ assertEquals(e.getValue(), (Long)act.updateCounterAt(partIdx));
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 2ec2c74..202486c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -20,8 +20,6 @@ package org.apache.ignite.testsuites;
import java.util.Set;
import junit.framework.TestSuite;
import org.apache.ignite.GridSuppressedExceptionSelfTest;
-import org.apache.ignite.internal.processors.database.SwapPathConstructionSelfTest;
-import org.apache.ignite.util.AttributeNodeFilterSelfTest;
import org.apache.ignite.internal.ClusterGroupHostsSelfTest;
import org.apache.ignite.internal.ClusterGroupSelfTest;
import org.apache.ignite.internal.GridFailFastNodeFailureDetectionSelfTest;
@@ -57,6 +55,7 @@ import org.apache.ignite.internal.processors.database.BPlusTreeSelfTest;
import org.apache.ignite.internal.processors.database.FreeListImplSelfTest;
import org.apache.ignite.internal.processors.database.MemoryMetricsSelfTest;
import org.apache.ignite.internal.processors.database.MetadataStorageSelfTest;
+import org.apache.ignite.internal.processors.database.SwapPathConstructionSelfTest;
import org.apache.ignite.internal.processors.odbc.OdbcConfigurationValidationSelfTest;
import org.apache.ignite.internal.processors.odbc.OdbcEscapeSequenceSelfTest;
import org.apache.ignite.internal.processors.service.ClosureServiceClientsNodesTest;