You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/09 09:27:25 UTC
[15/17] ignite git commit: ignite-4154
ignite-4154
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5d4987a9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5d4987a9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5d4987a9
Branch: refs/heads/ignite-4154
Commit: 5d4987a9387fa2906cf8608bbbab0dc114432eb7
Parents: dc92038
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 9 10:59:29 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 9 10:59:29 2016 +0300
----------------------------------------------------------------------
.../affinity/GridAffinityAssignmentCache.java | 62 +--
.../affinity/GridAffinityProcessor.java | 81 ++++
.../cache/CacheAffinitySharedManager.java | 27 +-
.../cache/DynamicCacheChangeBatch.java | 2 +-
.../GridCachePartitionExchangeManager.java | 275 ++++++++++---
.../dht/GridClientPartitionTopology.java | 33 +-
.../dht/GridDhtPartitionTopology.java | 3 +-
.../dht/GridDhtPartitionTopologyImpl.java | 27 +-
.../dht/preloader/GridDhtPartitionFullMap.java | 18 +
.../dht/preloader/GridDhtPartitionMap2.java | 62 ++-
.../GridDhtPartitionsExchangeFuture.java | 92 +----
.../preloader/GridDhtPartitionsFullMessage.java | 89 ++++-
.../GridDhtPartitionsSingleMessage.java | 68 +++-
.../continuous/GridContinuousProcessor.java | 4 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 26 +-
...CacheExchangeMessageDuplicatedStateTest.java | 386 +++++++++++++++++++
.../GridCacheSyncReplicatedPreloadSelfTest.java | 3 -
.../IgniteCacheSyncRebalanceModeSelfTest.java | 4 +-
...ContinuousQueryFailoverAbstractSelfTest.java | 2 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 134 ++++++-
.../testsuites/IgniteCacheTestSuite2.java | 3 +
21 files changed, 1138 insertions(+), 263 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 9166b31..a388c7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -108,7 +108,7 @@ public class GridAffinityAssignmentCache {
private final AtomicInteger fullHistSize = new AtomicInteger();
/** */
- private final SimilarAffinityKey similarAffKey;
+ private final Object similarAffKey;
/**
* Constructs affinity cached calculations.
@@ -147,9 +147,14 @@ public class GridAffinityAssignmentCache {
affCache = new ConcurrentSkipListMap<>();
head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
- similarAffKey = new SimilarAffinityKey(aff.getClass(), nodeFilter.getClass(), backups, partsCnt);
+ similarAffKey = ctx.affinity().similaryAffinityKey(aff, nodeFilter, backups, partsCnt);
+
+ assert similarAffKey != null;
}
+ /**
+ * @return Key to find caches with similar affinity.
+ */
public Object similarAffinityKey() {
return similarAffKey;
}
@@ -612,57 +617,4 @@ public class GridAffinityAssignmentCache {
return S.toString(AffinityReadyFuture.class, this);
}
}
-
- /**
- *
- */
- private static class SimilarAffinityKey {
- /** */
- private final int backups;
-
- /** */
- private final Class<?> affFuncCls;
-
- /** */
- private final Class<?> filterCls;
-
- /** */
- private final int partsCnt;
-
- /** */
- private final int hash;
-
- public SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int backups, int partsCnt) {
- this.backups = backups;
- this.affFuncCls = affFuncCls;
- this.filterCls = filterCls;
- this.partsCnt = partsCnt;
-
- int hash = backups;
- hash = 31 * hash + affFuncCls.hashCode();
- hash = 31 * hash + filterCls.hashCode();
- hash= 31 * hash + partsCnt;
-
- this.hash = hash;
- }
-
- @Override public int hashCode() {
- return hash;
- }
-
- @Override public boolean equals(Object o) {
- if (o == this)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- SimilarAffinityKey key = (SimilarAffinityKey)o;
-
- return backups == key.backups &&
- affFuncCls == key.affFuncCls &&
- filterCls == key.filterCls &&
- partsCnt == key.partsCnt;
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index d3783f0..b9182ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -568,6 +569,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
return nodes.iterator().next();
}
+ /**
+ * @param aff Affinity function.
+ * @param nodeFilter Node class.
+ * @param backups Number of backups.
+ * @param parts Number of partitions.
+ * @return Key to find caches with similar affinity.
+ */
+ public Object similaryAffinityKey(AffinityFunction aff,
+ IgnitePredicate<ClusterNode> nodeFilter,
+ int backups,
+ int parts) {
+ return new SimilarAffinityKey(aff.getClass(), nodeFilter.getClass(), backups, parts);
+ }
+
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>>");
@@ -966,4 +981,70 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
return aff;
}
}
+
+ /**
+ *
+ */
+ private static class SimilarAffinityKey {
+ /** */
+ private final int backups;
+
+ /** */
+ private final Class<?> affFuncCls;
+
+ /** */
+ private final Class<?> filterCls;
+
+ /** */
+ private final int partsCnt;
+
+ /** */
+ private final int hash;
+
+ /**
+ * @param affFuncCls Affinity function class.
+ * @param filterCls Node filter class.
+ * @param backups Number of backups.
+ * @param partsCnt Number of partitions.
+ */
+ SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int backups, int partsCnt) {
+ this.backups = backups;
+ this.affFuncCls = affFuncCls;
+ this.filterCls = filterCls;
+ this.partsCnt = partsCnt;
+
+ int hash = backups;
+ hash = 31 * hash + affFuncCls.hashCode();
+ hash = 31 * hash + filterCls.hashCode();
+ hash= 31 * hash + partsCnt;
+
+ this.hash = hash;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return hash;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (o == this)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ SimilarAffinityKey key = (SimilarAffinityKey)o;
+
+ return backups == key.backups &&
+ affFuncCls == key.affFuncCls &&
+ filterCls == key.filterCls &&
+ partsCnt == key.partsCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SimilarAffinityKey.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 88f1f97..2890887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -127,7 +127,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param node Event node.
* @param topVer Topology version.
*/
- public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+ void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
if (type == EVT_NODE_JOINED && node.isLocal()) {
// Clean-up in case of client reconnect.
registeredCaches.clear();
@@ -153,7 +153,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param msg Customer message.
* @return {@code True} if minor topology version should be increased.
*/
- public boolean onCustomEvent(CacheAffinityChangeMessage msg) {
+ boolean onCustomEvent(CacheAffinityChangeMessage msg) {
assert lateAffAssign : msg;
if (msg.exchangeId() != null) {
@@ -219,7 +219,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param top Topology.
* @param checkCacheId Cache ID.
*/
- public void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) {
+ void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) {
if (!lateAffAssign)
return;
@@ -1246,6 +1246,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param aff Affinity.
* @param rebalanceInfo Rebalance information.
* @param latePrimary If {@code true} delays primary assignment if it is not owner.
+ * @param affCache Already calculated assignments (to reduce data stored in history).
* @throws IgniteCheckedException If failed.
*/
private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut,
@@ -1303,10 +1304,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * @param aff
- * @param assign
- * @param affCache
- * @return
+ * @param aff Assignment cache.
+ * @param assign Assignment.
+ * @param affCache Assignments already calculated for other caches.
+ * @return Assignment.
*/
private List<List<ClusterNode>> cachedAssignment(GridAffinityAssignmentCache aff,
List<List<ClusterNode>> assign,
@@ -1393,7 +1394,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @return Affinity assignment.
* @throws IgniteCheckedException If failed.
*/
- public Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
+ private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
throws IgniteCheckedException {
final AffinityTopologyVersion topVer = fut.topologyVersion();
@@ -1580,7 +1581,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param aff Affinity cache.
* @param initAff Existing affinity cache.
*/
- public CacheHolder(boolean rebalanceEnabled, GridAffinityAssignmentCache aff, @Nullable GridAffinityAssignmentCache initAff) {
+ CacheHolder(boolean rebalanceEnabled,
+ GridAffinityAssignmentCache aff,
+ @Nullable GridAffinityAssignmentCache initAff) {
this.aff = aff;
if (initAff != null)
@@ -1632,7 +1635,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
/**
* Created cache is started on coordinator.
*/
- class CacheHolder1 extends CacheHolder {
+ private class CacheHolder1 extends CacheHolder {
/** */
private final GridCacheContext cctx;
@@ -1640,7 +1643,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param cctx Cache context.
* @param initAff Current affinity.
*/
- public CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) {
+ CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) {
super(cctx.rebalanceEnabled(), cctx.affinity().affinityCache(), initAff);
assert !cctx.isLocal() : cctx.name();
@@ -1677,7 +1680,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
/**
* Created if cache is not started on coordinator.
*/
- static class CacheHolder2 extends CacheHolder {
+ private static class CacheHolder2 extends CacheHolder {
/** */
private final GridCacheSharedContext cctx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 4dcff9b..39e1c50 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -86,7 +86,7 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
* @param clientNodes Client nodes map.
*/
public void clientNodes(Map<String, Map<UUID, Boolean>> clientNodes) {
- this.clientNodes = clientNodes;
+ this.clientNodes = null;//clientNodes;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index a81bf0f..928500f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,6 +21,7 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
@@ -44,7 +45,9 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -56,6 +59,7 @@ import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -72,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridListSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -81,6 +86,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -532,8 +538,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (top != null)
return top;
+ Object affKey = null;
+
+ DynamicCacheDescriptor desc = cctx.cache().cacheDescriptor(cacheId);
+
+ if (desc != null) {
+ CacheConfiguration ccfg = desc.cacheConfiguration();
+
+ AffinityFunction aff = ccfg.getAffinity();
+
+ affKey = cctx.kernalContext().affinity().similaryAffinityKey(aff,
+ ccfg.getNodeFilter(),
+ ccfg.getBackups(),
+ aff.partitions());
+ }
+
GridClientPartitionTopology old = clientTops.putIfAbsent(cacheId,
- top = new GridClientPartitionTopology(cctx, cacheId, exchFut));
+ top = new GridClientPartitionTopology(cctx, cacheId, exchFut, affKey));
return old != null ? old : top;
}
@@ -762,43 +783,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param nodes Nodes.
* @return {@code True} if message was sent, {@code false} if node left grid.
*/
- private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) {
- GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE);
-
- boolean useOldApi = false;
- boolean compress = true;
-
- for (ClusterNode node : nodes) {
- if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
- useOldApi = true;
- compress = false;
-
- break;
- }
- else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0)
- compress = false;
- }
-
- m.compress(compress);
-
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal() && cacheCtx.started()) {
- GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
-
- if (useOldApi) {
- locMap = new GridDhtPartitionFullMap(locMap.nodeId(),
- locMap.nodeOrder(),
- locMap.updateSequence(),
- locMap);
- }
-
- m.addFullPartitionsMap(cacheCtx.cacheId(), locMap);
- }
- }
-
- // It is important that client topologies be added after contexts.
- for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
- m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true));
+ private boolean sendAllPartitions(Collection<ClusterNode> nodes) {
+ GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, true);
if (log.isDebugEnabled())
log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
@@ -821,32 +807,140 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param node Node.
- * @param id ID.
+ * @param nodes Target nodes.
+ * @param exchId Non-null exchange ID if message is created for exchange.
+ * @param lastVer Last version.
+ * @param compress {@code True} if it is possible to use compression for message.
+ * @return Message.
*/
- private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
- GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
- cctx.kernalContext().clientNode(),
- cctx.versions().last(),
- node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0);
+ public GridDhtPartitionsFullMessage createPartitionsFullMessage(Collection<ClusterNode> nodes,
+ @Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable GridCacheVersion lastVer,
+ boolean compress) {
+ GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
+ lastVer,
+ exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
+
+ boolean useOldApi = false;
+
+ if (nodes != null) {
+ for (ClusterNode node : nodes) {
+ if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
+ useOldApi = true;
+ compress = false;
+
+ break;
+ }
+ else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0)
+ compress = false;
+ }
+ }
+
+ m.compress(compress);
+
+ Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.isLocal()) {
- GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap();
+ boolean ready;
- if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
- locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map());
+ if (exchId != null) {
+ AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
+
+ ready = startTopVer == null || startTopVer.compareTo(exchId.topologyVersion()) <= 0;
+ }
+ else
+ ready = cacheCtx.started();
- m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);
+ if (ready) {
+ GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
+
+ if (useOldApi) {
+ locMap = new GridDhtPartitionFullMap(locMap.nodeId(),
+ locMap.nodeOrder(),
+ locMap.updateSequence(),
+ locMap);
+ }
+
+ addFullPartitionsMap(m,
+ dupData,
+ compress,
+ cacheCtx.cacheId(),
+ locMap,
+ cacheCtx.affinity().affinityCache().similarAffinityKey());
+
+ if (exchId != null)
+ m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
+ }
}
}
- for (GridClientPartitionTopology top : clientTops.values()) {
- GridDhtPartitionMap2 locMap = top.localPartitionMap();
+ // It is important that client topologies be added after contexts.
+ for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
+ GridDhtPartitionFullMap map = top.partitionMap(true);
+
+ addFullPartitionsMap(m,
+ dupData,
+ compress,
+ top.cacheId(),
+ map,
+ top.similarAffinityKey());
+
+ if (exchId != null)
+ m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters(true));
+ }
+
+ return m;
+ }
+
+ /**
+ * @param m Message.
+ * @param dupData Duplicated data map.
+ * @param compress {@code True} if need check for duplicated partition state data.
+ * @param cacheId Cache ID.
+ * @param map Map to add.
+ * @param affKey Cache affinity key.
+ */
+ private void addFullPartitionsMap(GridDhtPartitionsFullMessage m,
+ Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData,
+ boolean compress,
+ Integer cacheId,
+ GridDhtPartitionFullMap map,
+ Object affKey) {
+ Integer dupDataCache = null;
- m.addLocalPartitionMap(top.cacheId(), locMap);
+ if (compress && affKey != null && !m.containsCache(cacheId)) {
+ T2<Integer, GridDhtPartitionFullMap> state0 = dupData.get(affKey);
+
+ if (state0 != null && state0.get2().partitionStateEquals(map)) {
+ GridDhtPartitionFullMap map0 = new GridDhtPartitionFullMap(map.nodeId(),
+ map.nodeOrder(),
+ map.updateSequence());
+
+ for (Map.Entry<UUID, GridDhtPartitionMap2> e : map.entrySet())
+ map0.put(e.getKey(), e.getValue().emptyCopy());
+
+ map = map0;
+
+ dupDataCache = state0.get1();
+ }
+ else
+ dupData.put(affKey, new T2<>(cacheId, map));
}
+ m.addFullPartitionsMap(cacheId, map, dupDataCache);
+ }
+
+ /**
+ * @param node Node.
+ * @param id ID.
+ */
+ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
+ GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node,
+ id,
+ cctx.kernalContext().clientNode(),
+ false);
+
if (log.isDebugEnabled())
log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']');
@@ -864,6 +958,81 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * @param targetNode Target node.
+ * @param exchangeId ID.
+ * @param clientOnlyExchange Client exchange flag.
+ * @param sndCounters {@code True} if need send partition update counters.
+ * @return Message.
+ */
+ public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(ClusterNode targetNode,
+ @Nullable GridDhtPartitionExchangeId exchangeId,
+ boolean clientOnlyExchange,
+ boolean sndCounters)
+ {
+ boolean compress =
+ targetNode.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0;
+
+ GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId,
+ clientOnlyExchange,
+ cctx.versions().last(),
+ compress);
+
+ Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<>();
+
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (!cacheCtx.isLocal()) {
+ GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap();
+
+ if (targetNode.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
+ locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map());
+
+ addPartitionMap(m,
+ dupData,
+ compress,
+ cacheCtx.cacheId(),
+ locMap,
+ cacheCtx.affinity().affinityCache().similarAffinityKey());
+
+ if (sndCounters)
+ m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
+ }
+ }
+
+ return m;
+ }
+
+ /**
+ * @param m Message.
+ * @param dupData Duplicated data map.
+ * @param compress {@code True} if need check for duplicated partition state data.
+ * @param cacheId Cache ID.
+ * @param map Map to add.
+ * @param affKey Cache affinity key.
+ */
+ private void addPartitionMap(GridDhtPartitionsSingleMessage m,
+ Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData,
+ boolean compress,
+ Integer cacheId,
+ GridDhtPartitionMap2 map,
+ Object affKey) {
+ Integer dupDataCache = null;
+
+ if (compress) {
+ T2<Integer, Map<Integer, GridDhtPartitionState>> state0 = dupData.get(affKey);
+
+ if (state0 != null && state0.get2().equals(map.map())) {
+ dupDataCache = state0.get1();
+
+ map.map(U.<Integer, GridDhtPartitionState>newHashMap(0));
+ }
+ else
+ dupData.put(affKey, new T2<>(cacheId, map.map()));
+ }
+
+ m.addLocalPartitionMap(cacheId, map, dupDataCache);
+ }
+
+ /**
* @param nodeId Cause node ID.
* @param topVer Topology version.
* @param evt Event type.
@@ -880,7 +1049,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param affChangeMsg Affinity change message.
* @return Exchange future.
*/
- GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
+ private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
@Nullable DiscoveryEvent discoEvt,
@Nullable Collection<DynamicCacheChangeRequest> reqs,
@Nullable CacheAffinityChangeMessage affChangeMsg) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 58933b7..5efb317 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -61,6 +61,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** Flag to control amount of output for full map. */
private static final boolean FULL_MAP_DEBUG = false;
+ /** */
+ private static final Long ZERO = 0L;
+
/** Cache shared context. */
private GridCacheSharedContext cctx;
@@ -97,18 +100,24 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** Partition update counters. */
private Map<Integer, Long> cntrMap = new HashMap<>();
+ /** */
+ private final Object similarAffKey;
+
/**
* @param cctx Context.
* @param cacheId Cache ID.
* @param exchFut Exchange ID.
+ * @param similarAffKey Key to find caches with similar affinity.
*/
public GridClientPartitionTopology(
GridCacheSharedContext cctx,
int cacheId,
- GridDhtPartitionsExchangeFuture exchFut
+ GridDhtPartitionsExchangeFuture exchFut,
+ Object similarAffKey
) {
this.cctx = cctx;
this.cacheId = cacheId;
+ this.similarAffKey = similarAffKey;
topVer = exchFut.topologyVersion();
@@ -125,6 +134,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/**
+ * @return Key to find caches with similar affinity.
+ */
+ @Nullable public Object similarAffinityKey() {
+ return similarAffKey;
+ }
+
+ /**
* @return Full map string representation.
*/
@SuppressWarnings( {"ConstantConditions"})
@@ -873,11 +889,22 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public Map<Integer, Long> updateCounters() {
+ @Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
lock.readLock().lock();
try {
- return new HashMap<>(cntrMap);
+ if (skipZeros) {
+ Map<Integer, Long> res = U.newHashMap(cntrMap.size());
+
+ for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+ if (!e.getValue().equals(ZERO))
+ res.put(e.getKey(), e.getValue());
+ }
+
+ return res;
+ }
+ else
+ return new HashMap<>(cntrMap);
}
finally {
lock.readLock().unlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 6e9b907..4ae4e47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -220,9 +220,10 @@ public interface GridDhtPartitionTopology {
@Nullable Map<Integer, Long> cntrMap);
/**
+ * @param skipZeros If {@code true} then filters out zero counters.
* @return Partition update counters.
*/
- public Map<Integer, Long> updateCounters();
+ public Map<Integer, Long> updateCounters(boolean skipZeros);
/**
* @param part Partition to own.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 871a084..f3751ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -71,6 +71,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** Flag to control amount of output for full map. */
private static final boolean FULL_MAP_DEBUG = false;
+ /** */
+ private static final Long ZERO = 0L;
+
/** Context. */
private final GridCacheContext<?, ?> cctx;
@@ -1500,11 +1503,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public Map<Integer, Long> updateCounters() {
+ @Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
lock.readLock().lock();
try {
- Map<Integer, Long> res = new HashMap<>(cntrMap);
+ Map<Integer, Long> res;
+
+ if (skipZeros) {
+ res = U.newHashMap(cntrMap.size());
+
+ for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+ Long cntr = e.getValue();
+
+ if (ZERO.equals(cntr))
+ continue;
+
+ res.put(e.getKey(), cntr);
+ }
+ }
+ else
+ res = new HashMap<>(cntrMap);
for (int i = 0; i < locParts.length; i++) {
GridDhtLocalPartition part = locParts[i];
@@ -1513,7 +1531,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
continue;
Long cntr0 = res.get(part.id());
- Long cntr1 = part.updateCounter();
+ long cntr1 = part.updateCounter();
+
+ if (skipZeros && cntr1 == 0L)
+ continue;
if (cntr0 == null || cntr1 > cntr0)
res.put(part.id(), cntr1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
index 498d492..8f5ad17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
@@ -158,6 +158,24 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
}
/**
+ * @param fullMap Map.
+ * @return {@code True} if this map and given map contain the same data.
+ */
+ public boolean partitionStateEquals(GridDhtPartitionFullMap fullMap) {
+ if (size() != fullMap.size())
+ return false;
+
+ for (Map.Entry<UUID, GridDhtPartitionMap2> e : entrySet()) {
+ GridDhtPartitionMap2 m = fullMap.get(e.getKey());
+
+ if (m == null || !m.map().equals(e.getValue().map()))
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* @param updateSeq New update sequence value.
* @return Old update sequence value.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
index 15b5a2e..dc308ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
@@ -63,25 +63,15 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
/**
* @param nodeId Node ID.
* @param updateSeq Update sequence number.
- */
- public GridDhtPartitionMap2(UUID nodeId, long updateSeq) {
- assert nodeId != null;
- assert updateSeq > 0;
-
- this.nodeId = nodeId;
- this.updateSeq = updateSeq;
-
- map = new HashMap<>();
- }
-
- /**
- * @param nodeId Node ID.
- * @param updateSeq Update sequence number.
+ * @param top Topology version.
* @param m Map to copy.
* @param onlyActive If {@code true}, then only active states will be included.
*/
- public GridDhtPartitionMap2(UUID nodeId, long updateSeq, AffinityTopologyVersion top,
- Map<Integer, GridDhtPartitionState> m, boolean onlyActive) {
+ public GridDhtPartitionMap2(UUID nodeId,
+ long updateSeq,
+ AffinityTopologyVersion top,
+ Map<Integer, GridDhtPartitionState> m,
+ boolean onlyActive) {
assert nodeId != null;
assert updateSeq > 0;
@@ -100,6 +90,36 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
}
/**
+ * @param nodeId Node ID.
+ * @param updateSeq Update sequence number.
+ * @param top Topology version.
+ * @param map Map.
+ * @param moving Number of moving partitions.
+ */
+ private GridDhtPartitionMap2(UUID nodeId,
+ long updateSeq,
+ AffinityTopologyVersion top,
+ Map<Integer, GridDhtPartitionState> map,
+ int moving) {
+ this.nodeId = nodeId;
+ this.updateSeq = updateSeq;
+ this.top = top;
+ this.map = map;
+ this.moving = moving;
+ }
+
+ /**
+ * @return Copy with empty partition state map.
+ */
+ public GridDhtPartitionMap2 emptyCopy() {
+ return new GridDhtPartitionMap2(nodeId,
+ updateSeq,
+ top,
+ U.<Integer, GridDhtPartitionState>newHashMap(0),
+ moving);
+ }
+
+ /**
* Empty constructor required for {@link Externalizable}.
*/
public GridDhtPartitionMap2() {
@@ -174,6 +194,13 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
}
/**
+ * @param map Partition states map.
+ */
+ public void map(Map<Integer, GridDhtPartitionState> map) {
+ this.map = map;
+ }
+
+ /**
* @return Node ID.
*/
public UUID nodeId() {
@@ -277,9 +304,8 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
long ver = in.readLong();
int minorVer = in.readInt();
- if (ver != 0) {
+ if (ver != 0)
top = new AffinityTopologyVersion(ver, minorVer);
- }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 6a17583..a79aba3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -64,6 +65,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -544,7 +546,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
if (updateTop && clientTop != null)
- cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters());
+ cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
}
top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId()));
@@ -668,7 +670,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (top.cacheId() == cacheCtx.cacheId()) {
cacheCtx.topology().update(exchId,
top.partitionMap(true),
- top.updateCounters());
+ top.updateCounters(false));
break;
}
@@ -678,7 +680,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
else {
if (!centralizedAff)
- sendLocalPartitions(crd, exchId);
+ sendLocalPartitions(crd);
initDone();
@@ -928,28 +930,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/**
* @param node Node.
- * @param id ID.
* @throws IgniteCheckedException If failed.
*/
- private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id)
+ private void sendLocalPartitions(ClusterNode node)
throws IgniteCheckedException {
- GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
+ GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(node,
+ exchangeId(),
clientOnlyExchange,
- cctx.versions().last(),
- node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0);
-
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal()) {
- GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap();
-
- if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
- locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map());
-
- m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);
-
- m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
- }
- }
+ true);
if (log.isDebugEnabled())
log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']');
@@ -965,60 +953,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/**
* @param nodes Target nodes.
- * @return Message;
+ * @param compress {@code True} if it is possible to use compression for message.
+ * @return Message.
*/
- private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes) {
+ private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes, boolean compress) {
GridCacheVersion last = lastVer.get();
- GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchangeId(),
+ return cctx.exchange().createPartitionsFullMessage(nodes,
+ exchangeId(),
last != null ? last : cctx.versions().last(),
- topologyVersion());
-
- boolean useOldApi = false;
- boolean compress = true;
-
- if (nodes != null) {
- for (ClusterNode node : nodes) {
- if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
- useOldApi = true;
- compress = false;
-
- break;
- }
- else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0)
- compress = false;
- }
- }
-
- m.compress(compress);
-
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal()) {
- AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
-
- boolean ready = startTopVer == null || startTopVer.compareTo(topologyVersion()) <= 0;
-
- if (ready) {
- GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
-
- if (useOldApi)
- locMap = new GridDhtPartitionFullMap(locMap.nodeId(), locMap.nodeOrder(), locMap.updateSequence(), locMap);
-
- m.addFullPartitionsMap(cacheCtx.cacheId(), locMap);
-
- m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
- }
- }
- }
-
- // It is important that client topologies be added after contexts.
- for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
- m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true));
-
- m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters());
- }
-
- return m;
+ compress);
}
/**
@@ -1026,7 +970,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @throws IgniteCheckedException If failed.
*/
private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException {
- GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes);
+ GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, true);
if (log.isDebugEnabled())
log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
@@ -1040,7 +984,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
*/
private void sendPartitions(ClusterNode oldestNode) {
try {
- sendLocalPartitions(oldestNode, exchId);
+ sendLocalPartitions(oldestNode);
}
catch (ClusterTopologyCheckedException ignore) {
if (log.isDebugEnabled())
@@ -1244,7 +1188,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
- GridDhtPartitionsFullMessage m = createPartitionsMessage(null);
+ GridDhtPartitionsFullMessage m = createPartitionsMessage(null, false);
CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 63d63e2..3d2d380 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -22,14 +22,19 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
+
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
@@ -48,6 +53,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
private Map<Integer, GridDhtPartitionFullMap> parts;
/** */
+ @GridDirectMap(keyType = Integer.class, valueType = Integer.class)
+ private Map<Integer, Integer> dupPartsData;
+
+ /** */
private byte[] partsBytes;
/** Partitions update counters. */
@@ -63,7 +72,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** */
@GridDirectTransient
- private boolean compress;
+ private transient boolean compress;
/**
* Required by {@link Externalizable}.
@@ -87,6 +96,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
this.topVer = topVer;
}
+ /**
+ * @param compress {@code True} if it is possible to use compression for message.
+ */
public void compress(boolean compress) {
this.compress = compress;
}
@@ -100,14 +112,33 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/**
* @param cacheId Cache ID.
+ * @return {@code True} if message contains full map for given cache.
+ */
+ public boolean containsCache(int cacheId) {
+ return parts != null && parts.containsKey(cacheId);
+ }
+
+ /**
+ * @param cacheId Cache ID.
* @param fullMap Full partitions map.
+ * @param dupDataCache Optional ID of cache with the same partition state map.
*/
- public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap) {
+ public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) {
if (parts == null)
parts = new HashMap<>();
- if (!parts.containsKey(cacheId))
+ if (!parts.containsKey(cacheId)) {
parts.put(cacheId, fullMap);
+
+ if (dupDataCache != null) {
+ assert parts.containsKey(dupDataCache);
+
+ if (dupPartsData == null)
+ dupPartsData = new HashMap<>();
+
+ dupPartsData.put(cacheId, dupDataCache);
+ }
+ }
}
/**
@@ -197,6 +228,34 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
else
parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
+ if (dupPartsData != null) {
+ assert parts != null;
+
+ for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) {
+ GridDhtPartitionFullMap map1 = parts.get(e.getKey());
+
+ assert map1 != null : e.getKey();
+
+ GridDhtPartitionFullMap map2 = parts.get(e.getValue());
+
+ assert map2 != null : e.getValue();
+
+ for (Map.Entry<UUID, GridDhtPartitionMap2> e0 : map2.entrySet()) {
+ GridDhtPartitionMap2 partMap1 = map1.get(e0.getKey());
+
+ assert partMap1 != null && partMap1.map().isEmpty() : partMap1;
+ assert !partMap1.hasMovingPartitions() : partMap1;
+
+ GridDhtPartitionMap2 partMap2 = e0.getValue();
+
+ assert partMap2 != null;
+
+ for (Map.Entry<Integer, GridDhtPartitionState> stateEntry : partMap2.entrySet())
+ partMap1.put(stateEntry.getKey(), stateEntry.getValue());
+ }
+ }
+ }
}
if (parts == null)
@@ -229,18 +288,24 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
switch (writer.state()) {
case 6:
- if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+ if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
return false;
writer.incrementState();
case 7:
- if (!writer.writeByteArray("partsBytes", partsBytes))
+ if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
return false;
writer.incrementState();
case 8:
+ if (!writer.writeByteArray("partsBytes", partsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -263,7 +328,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
switch (reader.state()) {
case 6:
- partCntrsBytes = reader.readByteArray("partCntrsBytes");
+ dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
if (!reader.isLastRead())
return false;
@@ -271,7 +336,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 7:
- partsBytes = reader.readByteArray("partsBytes");
+ partCntrsBytes = reader.readByteArray("partCntrsBytes");
if (!reader.isLastRead())
return false;
@@ -279,6 +344,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 8:
+ partsBytes = reader.readByteArray("partsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -298,7 +371,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 9;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index a37e092..416d298 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -23,12 +23,16 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -45,6 +49,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
@GridDirectTransient
private Map<Integer, GridDhtPartitionMap2> parts;
+ /** */
+ @GridDirectMap(keyType = Integer.class, valueType = Integer.class)
+ private Map<Integer, Integer> dupPartsData;
+
/** Serialized partitions. */
private byte[] partsBytes;
@@ -60,7 +68,8 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
private boolean client;
/** */
- private boolean compress;
+ @GridDirectTransient
+ private transient boolean compress;
/**
* Required by {@link Externalizable}.
@@ -73,6 +82,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
* @param exchId Exchange ID.
* @param client Client message flag.
* @param lastVer Last version.
+ * @param compress {@code True} if it is possible to use compression for message.
*/
public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
boolean client,
@@ -92,16 +102,24 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
}
/**
- * Adds partition map to this message.
- *
* @param cacheId Cache ID to add local partition for.
* @param locMap Local partition map.
+ * @param dupDataCache Optional ID of cache with the same partition state map.
*/
- public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap) {
+ public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap, @Nullable Integer dupDataCache) {
if (parts == null)
parts = new HashMap<>();
parts.put(cacheId, locMap);
+
+ if (dupDataCache != null) {
+ assert F.isEmpty(locMap.map());
+
+ if (dupPartsData == null)
+ dupPartsData = new HashMap<>();
+
+ dupPartsData.put(cacheId, dupDataCache);
+ }
}
/**
@@ -183,7 +201,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
if (compressed())
parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
else
- parts =U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
}
if (partCntrsBytes != null && partCntrs == null) {
@@ -192,6 +210,26 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
else
partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
}
+
+ if (dupPartsData != null) {
+ assert parts != null;
+
+ for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) {
+ GridDhtPartitionMap2 map1 = parts.get(e.getKey());
+
+ assert map1 != null : e.getKey();
+ assert F.isEmpty(map1.map());
+ assert !map1.hasMovingPartitions();
+
+ GridDhtPartitionMap2 map2 = parts.get(e.getValue());
+
+ assert map2 != null : e.getValue();
+ assert map2.map() != null;
+
+ for (Map.Entry<Integer, GridDhtPartitionState> e0 : map2.map().entrySet())
+ map1.put(e0.getKey(), e0.getValue());
+ }
+ }
}
/** {@inheritDoc} */
@@ -216,12 +254,18 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
writer.incrementState();
case 7:
- if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+ if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
return false;
writer.incrementState();
case 8:
+ if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
if (!writer.writeByteArray("partsBytes", partsBytes))
return false;
@@ -252,7 +296,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
case 7:
- partCntrsBytes = reader.readByteArray("partCntrsBytes");
+ dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
if (!reader.isLastRead())
return false;
@@ -260,6 +304,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
case 8:
+ partCntrsBytes = reader.readByteArray("partCntrsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
partsBytes = reader.readByteArray("partsBytes");
if (!reader.isLastRead())
@@ -279,7 +331,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 9;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 3a559e7..9fd9b6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -239,7 +239,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
GridCacheContext cctx = interCache != null ? interCache.context() : null;
if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
- cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters());
+ cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters(false));
routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
}
@@ -1049,7 +1049,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
if (cache != null && !cache.isLocal() && cache.context().userCache())
- req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters());
+ req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters(false));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index f929121..f8e38d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1049,7 +1049,7 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
- TcpDiscoveryAbstractMessage msg = null;
+ TcpDiscoveryAbstractMessage msg;
while (!Thread.currentThread().isInterrupted()) {
Socket sock;
@@ -1063,8 +1063,7 @@ class ClientImpl extends TcpDiscoveryImpl {
continue;
}
- if (msg == null)
- msg = queue.poll();
+ msg = queue.poll();
if (msg == null) {
mux.wait();
@@ -1121,19 +1120,7 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
}
- catch (IOException e) {
- if (log.isDebugEnabled())
- U.error(log, "Failed to send node left message (will stop anyway) " +
- "[sock=" + sock + ", msg=" + msg + ']', e);
-
- U.closeQuiet(sock);
-
- synchronized (mux) {
- if (sock == this.sock)
- this.sock = null; // Connection has dead.
- }
- }
- catch (IgniteCheckedException e) {
+ catch (Exception e) {
if (spi.getSpiContext().isStopping()) {
if (log.isDebugEnabled())
log.debug("Failed to send message, node is stopping [msg=" + msg + ", err=" + e + ']');
@@ -1141,7 +1128,12 @@ class ClientImpl extends TcpDiscoveryImpl {
else
U.error(log, "Failed to send message: " + msg, e);
- msg = null;
+ U.closeQuiet(sock);
+
+ synchronized (mux) {
+ if (sock == this.sock)
+ this.sock = null; // Connection has dead.
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
new file mode 100644
index 0000000..d07fdd3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String AFF1_CACHE1 = "a1c1";
+
+ /** */
+ private static final String AFF1_CACHE2 = "a1c2";
+
+ /** */
+ private static final String AFF2_CACHE1 = "a2c1";
+
+ /** */
+ private static final String AFF2_CACHE2 = "a2c2";
+
+ /** */
+ private static final String AFF3_CACHE1 = "a3c1";
+
+ /** */
+ private static final String AFF4_FILTER_CACHE1 = "a4c1";
+
+ /** */
+ private static final String AFF4_FILTER_CACHE2 = "a4c2";
+
+ /** */
+ private static final String AFF5_FILTER_CACHE1 = "a5c1";
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+ commSpi.record(GridDhtPartitionsSingleMessage.class, GridDhtPartitionsFullMessage.class);
+
+ cfg.setCommunicationSpi(commSpi);
+
+ List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF1_CACHE1);
+ ccfg.setAffinity(new RendezvousAffinityFunction());
+ ccfgs.add(ccfg);
+ }
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF1_CACHE2);
+ ccfg.setAffinity(new RendezvousAffinityFunction());
+ ccfgs.add(ccfg);
+ }
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF2_CACHE1);
+ ccfg.setAffinity(new FairAffinityFunction());
+ ccfgs.add(ccfg);
+ }
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF2_CACHE2);
+ ccfg.setAffinity(new FairAffinityFunction());
+ ccfgs.add(ccfg);
+ }
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF3_CACHE1);
+ ccfg.setBackups(3);
+
+ RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 64);
+ ccfg.setAffinity(aff);
+
+ ccfgs.add(ccfg);
+ }
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF4_FILTER_CACHE1);
+ ccfg.setNodeFilter(new TestNodeFilter());
+ ccfg.setAffinity(new RendezvousAffinityFunction());
+ ccfgs.add(ccfg);
+ }
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF4_FILTER_CACHE2);
+ ccfg.setNodeFilter(new TestNodeFilter());
+ ccfg.setAffinity(new RendezvousAffinityFunction());
+ ccfgs.add(ccfg);
+ }
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF5_FILTER_CACHE1);
+ ccfg.setNodeFilter(new TestNodeFilter());
+ ccfg.setAffinity(new FairAffinityFunction());
+ ccfgs.add(ccfg);
+ }
+
+ cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrid(0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExchangeMessages() throws Exception {
+ ignite(0);
+
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ checkMessages(0, true);
+
+ startGrid(2);
+
+ awaitPartitionMapExchange();
+
+ checkMessages(0, true);
+
+ client = true;
+
+ startGrid(3);
+
+ awaitPartitionMapExchange();
+
+ checkMessages(0, false);
+
+ stopGrid(0);
+
+ awaitPartitionMapExchange();
+
+ checkMessages(1, true);
+ }
+
+ /**
+ * @param crdIdx Coordinator node index.
+ * @param checkSingle {@code True} if need check single messages.
+ */
+ private void checkMessages(int crdIdx, boolean checkSingle) {
+ checkFullMessages(crdIdx);
+
+ if (checkSingle)
+ checkSingleMessages(crdIdx);
+ }
+
+ /**
+ * @param crdIdx Coordinator node index.
+ */
+ private void checkFullMessages(int crdIdx) {
+ TestRecordingCommunicationSpi commSpi0 =
+ (TestRecordingCommunicationSpi)ignite(crdIdx).configuration().getCommunicationSpi();
+
+ List<Object> msgs = commSpi0.recordedMessages(false);
+
+ assertTrue(msgs.size() > 0);
+
+ for (Object msg : msgs) {
+ assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsFullMessage);
+
+ checkFullMessage((GridDhtPartitionsFullMessage)msg);
+ }
+ }
+
+ /**
+ * @param crdIdx Coordinator node index.
+ */
+ private void checkSingleMessages(int crdIdx) {
+ int cnt = 0;
+
+ for (Ignite ignite : Ignition.allGrids()) {
+ if (getTestGridName(crdIdx).equals(ignite.name()) || ignite.configuration().isClientMode())
+ continue;
+
+ TestRecordingCommunicationSpi commSpi0 =
+ (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+ List<Object> msgs = commSpi0.recordedMessages(false);
+
+ assertTrue(msgs.size() > 0);
+
+ for (Object msg : msgs) {
+ assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsSingleMessage);
+
+ checkSingleMessage((GridDhtPartitionsSingleMessage)msg);
+ }
+
+ cnt++;
+ }
+
+ assertTrue(cnt > 0);
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void checkFullMessage(GridDhtPartitionsFullMessage msg) {
+ Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, "dupPartsData");
+
+ assertNotNull(dupPartsData);
+
+ checkFullMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg);
+ checkFullMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg);
+ checkFullMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg);
+
+ assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1)));
+ assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1)));
+
+ Map<Integer, Map<Integer, Long>> partCntrs = GridTestUtils.getFieldValue(msg, "partCntrs");
+
+ if (partCntrs != null) {
+ for (Map<Integer, Long> cntrs : partCntrs.values())
+ assertTrue(cntrs.isEmpty());
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void checkSingleMessage(GridDhtPartitionsSingleMessage msg) {
+ Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, "dupPartsData");
+
+ assertNotNull(dupPartsData);
+
+ checkSingleMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg);
+ checkSingleMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg);
+ checkSingleMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg);
+
+ assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1)));
+ assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1)));
+
+ Map<Integer, Map<Integer, Long>> partCntrs = GridTestUtils.getFieldValue(msg, "partCntrs");
+
+ if (partCntrs != null) {
+ for (Map<Integer, Long> cntrs : partCntrs.values())
+ assertTrue(cntrs.isEmpty());
+ }
+ }
+
+ /**
+ * @param cache1 Cache 1.
+ * @param cache2 Cache 2.
+ * @param dupPartsData Duplicated data map.
+ * @param msg Message.
+ */
+ private void checkFullMessage(String cache1,
+ String cache2,
+ Map<Integer, Integer> dupPartsData,
+ GridDhtPartitionsFullMessage msg)
+ {
+ Integer cacheId;
+ Integer dupCacheId;
+
+ if (dupPartsData.containsKey(CU.cacheId(cache1))) {
+ cacheId = CU.cacheId(cache1);
+ dupCacheId = CU.cacheId(cache2);
+ }
+ else {
+ cacheId = CU.cacheId(cache2);
+ dupCacheId = CU.cacheId(cache1);
+ }
+
+ assertTrue(dupPartsData.containsKey(cacheId));
+ assertEquals(dupCacheId, dupPartsData.get(cacheId));
+ assertFalse(dupPartsData.containsKey(dupCacheId));
+
+ Map<Integer, GridDhtPartitionFullMap> parts = msg.partitions();
+
+ GridDhtPartitionFullMap emptyFullMap = parts.get(cacheId);
+
+ for (GridDhtPartitionMap2 map : emptyFullMap.values())
+ assertEquals(0, map.map().size());
+
+ GridDhtPartitionFullMap fullMap = parts.get(dupCacheId);
+
+ for (GridDhtPartitionMap2 map : fullMap.values())
+ assertFalse(map.map().isEmpty());
+ }
+
+ /**
+ * @param cache1 Cache 1.
+ * @param cache2 Cache 2.
+ * @param dupPartsData Duplicated data map.
+ * @param msg Message.
+ */
+ private void checkSingleMessage(String cache1,
+ String cache2,
+ Map<Integer, Integer> dupPartsData,
+ GridDhtPartitionsSingleMessage msg)
+ {
+ Integer cacheId;
+ Integer dupCacheId;
+
+ if (dupPartsData.containsKey(CU.cacheId(cache1))) {
+ cacheId = CU.cacheId(cache1);
+ dupCacheId = CU.cacheId(cache2);
+ }
+ else {
+ cacheId = CU.cacheId(cache2);
+ dupCacheId = CU.cacheId(cache1);
+ }
+
+ assertTrue(dupPartsData.containsKey(cacheId));
+ assertEquals(dupCacheId, dupPartsData.get(cacheId));
+ assertFalse(dupPartsData.containsKey(dupCacheId));
+
+ Map<Integer, GridDhtPartitionMap2> parts = msg.partitions();
+
+ GridDhtPartitionMap2 emptyMap = parts.get(cacheId);
+
+ assertEquals(0, emptyMap.map().size());
+
+ GridDhtPartitionMap2 map = parts.get(dupCacheId);
+
+ assertFalse(map.map().isEmpty());
+ }
+
+ /**
+ *
+ */
+ private static class TestNodeFilter implements IgnitePredicate<ClusterNode> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ // Do not start cache on coordinator.
+ return node.order() > 1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
index 87d02a5..cde6b8d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
@@ -41,9 +41,6 @@ public class GridCacheSyncReplicatedPreloadSelfTest extends GridCommonAbstractTe
/** */
private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
- /** */
- private static final boolean DISCO_DEBUG_MODE = false;
-
/**
* Constructs test.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
index 9b0637e..f3942d5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
@@ -34,7 +34,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
public class IgniteCacheSyncRebalanceModeSelfTest extends GridCommonAbstractTest {
/** Entry count. */
public static final int CNT = 100_000;
- public static final String STATIC_CACHE_NAME = "static";
+
+ /** */
+ private static final String STATIC_CACHE_NAME = "static";
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {