You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/22 11:40:11 UTC
[22/50] [abbrv] ignite git commit: ignite-4154 Optimize amount of
data stored in discovery history Discovery history optimizations: - remove
discarded message for discovery pending messages - remove duplicated data
from TcpDiscoveryNodeAddedMessage.oldNo
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index a4ff04b..90d6242 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -22,14 +22,19 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
+
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
@@ -48,6 +53,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
private Map<Integer, GridDhtPartitionFullMap> parts;
/** */
+ @GridDirectMap(keyType = Integer.class, valueType = Integer.class)
+ private Map<Integer, Integer> dupPartsData;
+
+ /** */
private byte[] partsBytes;
/** Partitions update counters. */
@@ -61,6 +70,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** Topology version. */
private AffinityTopologyVersion topVer;
+ /** */
+ @GridDirectTransient
+ private transient boolean compress;
+
/**
* Required by {@link Externalizable}.
*/
@@ -84,6 +97,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
/**
+ * @param compress {@code True} if it is possible to use compression for message.
+ */
+ public void compress(boolean compress) {
+ this.compress = compress;
+ }
+
+ /**
* @return Local partitions.
*/
public Map<Integer, GridDhtPartitionFullMap> partitions() {
@@ -92,14 +112,34 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/**
* @param cacheId Cache ID.
+ * @return {@code True} if message contains full map for given cache.
+ */
+ public boolean containsCache(int cacheId) {
+ return parts != null && parts.containsKey(cacheId);
+ }
+
+ /**
+ * @param cacheId Cache ID.
* @param fullMap Full partitions map.
+ * @param dupDataCache Optional ID of cache with the same partition state map.
*/
- public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap) {
+ public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) {
if (parts == null)
parts = new HashMap<>();
- if (!parts.containsKey(cacheId))
+ if (!parts.containsKey(cacheId)) {
parts.put(cacheId, fullMap);
+
+ if (dupDataCache != null) {
+ assert compress;
+ assert parts.containsKey(dupDataCache);
+
+ if (dupPartsData == null)
+ dupPartsData = new HashMap<>();
+
+ dupPartsData.put(cacheId, dupDataCache);
+ }
+ }
}
/**
@@ -132,11 +172,38 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (parts != null && partsBytes == null)
- partsBytes = U.marshal(ctx, parts);
+ boolean marshal = (parts != null && partsBytes == null) || (partCntrs != null && partCntrsBytes == null);
+
+ if (marshal) {
+ byte[] partsBytes0 = null;
+ byte[] partCntrsBytes0 = null;
+
+ if (parts != null && partsBytes == null)
+ partsBytes0 = U.marshal(ctx, parts);
- if (partCntrs != null && partCntrsBytes == null)
- partCntrsBytes = U.marshal(ctx, partCntrs);
+ if (partCntrs != null && partCntrsBytes == null)
+ partCntrsBytes0 = U.marshal(ctx, partCntrs);
+
+ if (compress) {
+ assert !compressed();
+
+ try {
+ byte[] partsBytesZip = U.zip(partsBytes0);
+ byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
+
+ partsBytes0 = partsBytesZip;
+ partCntrsBytes0 = partCntrsBytesZip;
+
+ compressed(true);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e);
+ }
+ }
+
+ partsBytes = partsBytes0;
+ partCntrsBytes = partCntrsBytes0;
+ }
}
/**
@@ -157,14 +224,49 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (partsBytes != null && parts == null)
- parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (partsBytes != null && parts == null) {
+ if (compressed())
+ parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ else
+ parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
+ if (dupPartsData != null) {
+ assert parts != null;
+
+ for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) {
+ GridDhtPartitionFullMap map1 = parts.get(e.getKey());
+ GridDhtPartitionFullMap map2 = parts.get(e.getValue());
+
+ assert map1 != null : e.getKey();
+ assert map2 != null : e.getValue();
+ assert map1.size() == map2.size();
+
+ for (Map.Entry<UUID, GridDhtPartitionMap2> e0 : map2.entrySet()) {
+ GridDhtPartitionMap2 partMap1 = map1.get(e0.getKey());
+
+ assert partMap1 != null && partMap1.map().isEmpty() : partMap1;
+ assert !partMap1.hasMovingPartitions() : partMap1;
+
+ GridDhtPartitionMap2 partMap2 = e0.getValue();
+
+ assert partMap2 != null;
+
+ for (Map.Entry<Integer, GridDhtPartitionState> stateEntry : partMap2.entrySet())
+ partMap1.put(stateEntry.getKey(), stateEntry.getValue());
+ }
+ }
+ }
+ }
if (parts == null)
parts = new HashMap<>();
- if (partCntrsBytes != null && partCntrs == null)
- partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (partCntrsBytes != null && partCntrs == null) {
+ if (compressed())
+ partCntrs = U.unmarshalZip(ctx.marshaller(), partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ else
+ partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
if (partCntrs == null)
partCntrs = new HashMap<>();
@@ -185,19 +287,25 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
switch (writer.state()) {
- case 5:
+ case 6:
+ if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
return false;
writer.incrementState();
- case 6:
+ case 8:
if (!writer.writeByteArray("partsBytes", partsBytes))
return false;
writer.incrementState();
- case 7:
+ case 9:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -219,7 +327,15 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
return false;
switch (reader.state()) {
- case 5:
+ case 6:
+ dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
partCntrsBytes = reader.readByteArray("partCntrsBytes");
if (!reader.isLastRead())
@@ -227,7 +343,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
- case 6:
+ case 8:
partsBytes = reader.readByteArray("partsBytes");
if (!reader.isLastRead())
@@ -235,7 +351,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
- case 7:
+ case 9:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -255,7 +371,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 8;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index e4356b1..bf08f0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -23,12 +23,16 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -45,6 +49,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
@GridDirectTransient
private Map<Integer, GridDhtPartitionMap2> parts;
+ /** */
+ @GridDirectMap(keyType = Integer.class, valueType = Integer.class)
+ private Map<Integer, Integer> dupPartsData;
+
/** Serialized partitions. */
private byte[] partsBytes;
@@ -59,6 +67,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** */
private boolean client;
+ /** */
+ @GridDirectTransient
+ private transient boolean compress;
+
/**
* Required by {@link Externalizable}.
*/
@@ -70,13 +82,16 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
* @param exchId Exchange ID.
* @param client Client message flag.
* @param lastVer Last version.
+ * @param compress {@code True} if it is possible to use compression for message.
*/
public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
boolean client,
- @Nullable GridCacheVersion lastVer) {
+ @Nullable GridCacheVersion lastVer,
+ boolean compress) {
super(exchId, lastVer);
this.client = client;
+ this.compress = compress;
}
/**
@@ -87,16 +102,26 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
}
/**
- * Adds partition map to this message.
- *
* @param cacheId Cache ID to add local partition for.
* @param locMap Local partition map.
+ * @param dupDataCache Optional ID of cache with the same partition state map.
*/
- public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap) {
+ public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap, @Nullable Integer dupDataCache) {
if (parts == null)
parts = new HashMap<>();
parts.put(cacheId, locMap);
+
+ if (dupDataCache != null) {
+ assert compress;
+ assert F.isEmpty(locMap.map());
+ assert parts.containsKey(dupDataCache);
+
+ if (dupPartsData == null)
+ dupPartsData = new HashMap<>();
+
+ dupPartsData.put(cacheId, dupDataCache);
+ }
}
/**
@@ -136,22 +161,77 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (partsBytes == null && parts != null)
- partsBytes = U.marshal(ctx, parts);
+ boolean marshal = (parts != null && partsBytes == null) || (partCntrs != null && partCntrsBytes == null);
+
+ if (marshal) {
+ byte[] partsBytes0 = null;
+ byte[] partCntrsBytes0 = null;
+
+ if (parts != null && partsBytes == null)
+ partsBytes0 = U.marshal(ctx, parts);
- if (partCntrsBytes == null && partCntrs != null)
- partCntrsBytes = U.marshal(ctx, partCntrs);
+ if (partCntrs != null && partCntrsBytes == null)
+ partCntrsBytes0 = U.marshal(ctx, partCntrs);
+
+ if (compress) {
+ assert !compressed();
+
+ try {
+ byte[] partsBytesZip = U.zip(partsBytes0);
+ byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
+
+ partsBytes0 = partsBytesZip;
+ partCntrsBytes0 = partCntrsBytesZip;
+
+ compressed(true);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e);
+ }
+ }
+
+ partsBytes = partsBytes0;
+ partCntrsBytes = partCntrsBytes0;
+ }
}
/** {@inheritDoc} */
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (partsBytes != null && parts == null)
- parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (partsBytes != null && parts == null) {
+ if (compressed())
+ parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ else
+ parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
+
+ if (partCntrsBytes != null && partCntrs == null) {
+ if (compressed())
+ partCntrs = U.unmarshalZip(ctx.marshaller(), partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ else
+ partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
+
+ if (dupPartsData != null) {
+ assert parts != null;
+
+ for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) {
+ GridDhtPartitionMap2 map1 = parts.get(e.getKey());
- if (partCntrsBytes != null && partCntrs == null)
- partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ assert map1 != null : e.getKey();
+ assert F.isEmpty(map1.map());
+ assert !map1.hasMovingPartitions();
+
+ GridDhtPartitionMap2 map2 = parts.get(e.getValue());
+
+ assert map2 != null : e.getValue();
+ assert map2.map() != null;
+
+ for (Map.Entry<Integer, GridDhtPartitionState> e0 : map2.map().entrySet())
+ map1.put(e0.getKey(), e0.getValue());
+ }
+ }
}
/** {@inheritDoc} */
@@ -169,19 +249,25 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
}
switch (writer.state()) {
- case 5:
+ case 6:
if (!writer.writeBoolean("client", client))
return false;
writer.incrementState();
- case 6:
+ case 7:
+ if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
return false;
writer.incrementState();
- case 7:
+ case 9:
if (!writer.writeByteArray("partsBytes", partsBytes))
return false;
@@ -203,7 +289,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
return false;
switch (reader.state()) {
- case 5:
+ case 6:
client = reader.readBoolean("client");
if (!reader.isLastRead())
@@ -211,7 +297,15 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
- case 6:
+ case 7:
+ dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
partCntrsBytes = reader.readByteArray("partCntrsBytes");
if (!reader.isLastRead())
@@ -219,7 +313,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
- case 7:
+ case 9:
partsBytes = reader.readByteArray("partsBytes");
if (!reader.isLastRead())
@@ -239,7 +333,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 8;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
index a4106af..850b6d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
@@ -81,11 +81,11 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 5;
+ return 6;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtPartitionsSingleRequest.class, this, super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 09aec81..d6865c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -35,8 +35,8 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -600,7 +600,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer +
", node=" + node + ']');
- GridAffinityAssignment assignment = cctx.affinity().assignment(topVer);
+ AffinityAssignment assignment = cctx.affinity().assignment(topVer);
boolean newAffMode = node.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 3a559e7..9fd9b6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -239,7 +239,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
GridCacheContext cctx = interCache != null ? interCache.context() : null;
if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
- cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters());
+ cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters(false));
routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
}
@@ -1049,7 +1049,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
if (cache != null && !cache.isLocal() && cache.context().userCache())
- req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters());
+ req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters(false));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 7b011dd..e0f4a2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.util;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.Externalizable;
@@ -128,6 +130,8 @@ import java.util.logging.Logger;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
import javax.management.DynamicMBean;
import javax.management.JMException;
import javax.management.MBeanServer;
@@ -9693,6 +9697,32 @@ public abstract class IgniteUtils {
}
/**
+ * @param marsh Marshaller.
+ * @param zipBytes Zip-compressed bytes.
+ * @param clsLdr Class loader to use.
+ * @return Unmarshalled object.
+ * @throws IgniteCheckedException
+ */
+ public static <T> T unmarshalZip(Marshaller marsh, byte[] zipBytes, @Nullable ClassLoader clsLdr) throws IgniteCheckedException {
+ assert marsh != null;
+ assert zipBytes != null;
+
+ try {
+ ZipInputStream in = new ZipInputStream(new ByteArrayInputStream(zipBytes));
+
+ in.getNextEntry();
+
+ return marsh.unmarshal(in, clsLdr);
+ }
+ catch (IgniteCheckedException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
* Unmarshals object from the input stream using given class loader.
* This method should not close given input stream.
* <p/>
@@ -9907,4 +9937,38 @@ public abstract class IgniteUtils {
if (oldName != curName)
LOC_IGNITE_NAME.set(oldName);
}
+
+ /**
+ * @param bytes Byte array to compress.
+ * @return Compressed bytes.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static byte[] zip(@Nullable byte[] bytes) throws IgniteCheckedException {
+ try {
+ if (bytes == null)
+ return null;
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ try (ZipOutputStream zos = new ZipOutputStream(bos)) {
+ ZipEntry entry = new ZipEntry("");
+
+ try {
+ entry.setSize(bytes.length);
+
+ zos.putNextEntry(entry);
+
+ zos.write(bytes);
+ }
+ finally {
+ zos.closeEntry();
+ }
+ }
+
+ return bos.toByteArray();
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index f929121..733d204 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1049,7 +1049,7 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
- TcpDiscoveryAbstractMessage msg = null;
+ TcpDiscoveryAbstractMessage msg;
while (!Thread.currentThread().isInterrupted()) {
Socket sock;
@@ -1063,8 +1063,7 @@ class ClientImpl extends TcpDiscoveryImpl {
continue;
}
- if (msg == null)
- msg = queue.poll();
+ msg = queue.poll();
if (msg == null) {
mux.wait();
@@ -1121,19 +1120,13 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
}
- catch (IOException e) {
+ catch (InterruptedException e) {
if (log.isDebugEnabled())
- U.error(log, "Failed to send node left message (will stop anyway) " +
- "[sock=" + sock + ", msg=" + msg + ']', e);
-
- U.closeQuiet(sock);
+ log.debug("Client socket writer interrupted.");
- synchronized (mux) {
- if (sock == this.sock)
- this.sock = null; // Connection has dead.
- }
+ return;
}
- catch (IgniteCheckedException e) {
+ catch (Exception e) {
if (spi.getSpiContext().isStopping()) {
if (log.isDebugEnabled())
log.debug("Failed to send message, node is stopping [msg=" + msg + ", err=" + e + ']');
@@ -1141,7 +1134,12 @@ class ClientImpl extends TcpDiscoveryImpl {
else
U.error(log, "Failed to send message: " + msg, e);
- msg = null;
+ U.closeQuiet(sock);
+
+ synchronized (mux) {
+ if (sock == this.sock)
+ this.sock = null; // Connection has dead.
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0de787d..8814745 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -133,7 +133,7 @@ import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SERVICES_COMPATIBILITY_MODE;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -167,7 +167,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
*/
class ServerImpl extends TcpDiscoveryImpl {
/** */
- private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, 1024);
+ private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE, 512);
/** */
private static final IgniteProductVersion CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE =
@@ -1479,7 +1479,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private void prepareNodeAddedMessage(
TcpDiscoveryAbstractMessage msg,
UUID destNodeId,
- @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+ @Nullable Collection<PendingMessage> msgs,
@Nullable IgniteUuid discardMsgId,
@Nullable IgniteUuid discardCustomMsgId
) {
@@ -1506,7 +1506,19 @@ class ServerImpl extends TcpDiscoveryImpl {
}
nodeAddedMsg.topology(topToSnd);
- nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId);
+
+ Collection<TcpDiscoveryAbstractMessage> msgs0 = null;
+
+ if (msgs != null) {
+ msgs0 = new ArrayList<>(msgs.size());
+
+ for (PendingMessage pendingMsg : msgs) {
+ if (pendingMsg.msg != null)
+ msgs0.add(pendingMsg.msg);
+ }
+ }
+
+ nodeAddedMsg.messages(msgs0, discardMsgId, discardCustomMsgId);
Map<Long, Collection<ClusterNode>> hist;
@@ -1892,7 +1904,10 @@ class ServerImpl extends TcpDiscoveryImpl {
assert spi.ensured(msg) && msg.verified() : msg;
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
- TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg;
+ TcpDiscoveryNodeAddedMessage addedMsg =
+ new TcpDiscoveryNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
+
+ msg = addedMsg;
TcpDiscoveryNode node = addedMsg.node();
@@ -1910,12 +1925,109 @@ class ServerImpl extends TcpDiscoveryImpl {
addedMsg.clientTopology(top);
}
+
+ // Do not need this data for client reconnect.
+ addedMsg.oldNodesDiscoveryData(null);
+ }
+ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+ TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg;
+
+ if (addFinishMsg.clientDiscoData() != null) {
+ addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg);
+
+ msg = addFinishMsg;
+
+ Map<UUID, Map<Integer, byte[]>> discoData = addFinishMsg.clientDiscoData();
+
+ Set<UUID> replaced = null;
+
+ for (TcpDiscoveryAbstractMessage msg0 : msgs) {
+ if (msg0 instanceof TcpDiscoveryNodeAddFinishedMessage) {
+ Map<UUID, Map<Integer, byte[]>> existingDiscoData =
+ ((TcpDiscoveryNodeAddFinishedMessage)msg0).clientDiscoData();
+
+ // Check if already stored message contains the same data to do not store copies multiple times.
+ if (existingDiscoData != null) {
+ for (Map.Entry<UUID, Map<Integer, byte[]>> e : discoData.entrySet()) {
+ UUID nodeId = e.getKey();
+
+ if (F.contains(replaced, nodeId))
+ continue;
+
+ Map<Integer, byte[]> existingData = existingDiscoData.get(e.getKey());
+
+ if (existingData != null && mapsEqual(e.getValue(), existingData)) {
+ e.setValue(existingData);
+
+ if (replaced == null)
+ replaced = new HashSet<>();
+
+ boolean add = replaced.add(nodeId);
+
+ assert add;
+
+ if (replaced.size() == discoData.size())
+ break;
+ }
+ }
+
+ if (replaced != null && replaced.size() == discoData.size())
+ break;
+ }
+ }
+ }
+ }
}
+ else if (msg instanceof TcpDiscoveryNodeLeftMessage)
+ clearClientAddFinished(msg.creatorNodeId());
+ else if (msg instanceof TcpDiscoveryNodeFailedMessage)
+ clearClientAddFinished(((TcpDiscoveryNodeFailedMessage)msg).failedNodeId());
msgs.add(msg);
}
/**
+ * @param clientId Client node ID.
+ */
+ private void clearClientAddFinished(UUID clientId) {
+ for (TcpDiscoveryAbstractMessage msg : msgs) {
+ if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+ TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg;
+
+ if (addFinishMsg.clientDiscoData() != null && clientId.equals(addFinishMsg.nodeId())) {
+ addFinishMsg.clientDiscoData(null);
+ addFinishMsg.clientNodeAttributes(null);
+
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * @param m1 Map 1.
+ * @param m2 Map 2.
+ * @return {@code True} if maps contain the same data.
+ */
+ private boolean mapsEqual(Map<Integer, byte[]> m1, Map<Integer, byte[]> m2) {
+ if (m1 == m2)
+ return true;
+
+ if (m1.size() == m2.size()) {
+ for (Map.Entry<Integer, byte[]> e : m1.entrySet()) {
+ byte[] data = m2.get(e.getKey());
+
+ if (!Arrays.equals(e.getValue(), data))
+ return false;
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* Gets messages starting from provided ID (exclusive). If such
* message is not found, {@code null} is returned (this indicates
* a failure condition when it was already removed from queue).
@@ -2009,6 +2121,37 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ *
+ */
+ private static class PendingMessage {
+ /** */
+ TcpDiscoveryAbstractMessage msg;
+
+ /** */
+ final boolean customMsg;
+
+ /** */
+ final IgniteUuid id;
+
+ /**
+ * @param msg Message.
+ */
+ PendingMessage(TcpDiscoveryAbstractMessage msg) {
+ assert msg != null && msg.id() != null : msg;
+
+ this.msg = msg;
+
+ id = msg.id();
+ customMsg = msg instanceof TcpDiscoveryCustomEventMessage;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PendingMessage.class, this);
+ }
+ }
+
+ /**
* Pending messages container.
*/
private static class PendingMessages implements Iterable<TcpDiscoveryAbstractMessage> {
@@ -2016,7 +2159,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private static final int MAX = 1024;
/** Pending messages. */
- private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+ private final Queue<PendingMessage> msgs = new ArrayDeque<>(MAX * 2);
/** Processed custom message IDs. */
private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<>(MAX * 2);
@@ -2024,7 +2167,7 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Discarded message ID. */
private IgniteUuid discardId;
- /** Discarded message ID. */
+ /** Discarded custom message ID. */
private IgniteUuid customDiscardId;
/**
@@ -2034,14 +2177,14 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message to add.
*/
void add(TcpDiscoveryAbstractMessage msg) {
- msgs.add(msg);
+ msgs.add(new PendingMessage(msg));
while (msgs.size() > MAX) {
- TcpDiscoveryAbstractMessage polled = msgs.poll();
+ PendingMessage polled = msgs.poll();
assert polled != null;
- if (polled.id().equals(discardId))
+ if (polled.id.equals(discardId))
break;
}
}
@@ -2051,6 +2194,7 @@ class ServerImpl extends TcpDiscoveryImpl {
*
* @param msgs Message.
* @param discardId Discarded message ID.
+ * @param customDiscardId Discarded custom event message ID.
*/
void reset(
@Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
@@ -2059,8 +2203,10 @@ class ServerImpl extends TcpDiscoveryImpl {
) {
this.msgs.clear();
- if (msgs != null)
- this.msgs.addAll(msgs);
+ if (msgs != null) {
+ for (TcpDiscoveryAbstractMessage msg : msgs)
+ this.msgs.add(new PendingMessage(msg));
+ }
this.discardId = discardId;
this.customDiscardId = customDiscardId;
@@ -2070,12 +2216,52 @@ class ServerImpl extends TcpDiscoveryImpl {
* Discards message with provided ID and all before it.
*
* @param id Discarded message ID.
+ * @param custom {@code True} if discard for {@link TcpDiscoveryCustomEventMessage}.
*/
void discard(IgniteUuid id, boolean custom) {
if (custom)
customDiscardId = id;
else
discardId = id;
+
+ cleanup();
+ }
+
+ /**
+ *
+ */
+ void cleanup() {
+ Iterator<PendingMessage> msgIt = msgs.iterator();
+
+ boolean skipMsg = discardId != null;
+ boolean skipCustomMsg = customDiscardId != null;
+
+ while (msgIt.hasNext()) {
+ PendingMessage msg = msgIt.next();
+
+ if (msg.customMsg) {
+ if (skipCustomMsg) {
+ assert customDiscardId != null;
+
+ if (F.eq(customDiscardId, msg.id)) {
+ msg.msg = null;
+
+ return;
+ }
+ }
+ }
+ else {
+ if (skipMsg) {
+ assert discardId != null;
+
+ if (F.eq(discardId, msg.id)) {
+ msg.msg = null;
+
+ return;
+ }
+ }
+ }
+ }
}
/**
@@ -2098,7 +2284,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private boolean skipCustomMsg = customDiscardId != null;
/** Internal iterator. */
- private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+ private Iterator<PendingMessage> msgIt = msgs.iterator();
/** Next message. */
private TcpDiscoveryAbstractMessage next;
@@ -2136,13 +2322,13 @@ class ServerImpl extends TcpDiscoveryImpl {
next = null;
while (msgIt.hasNext()) {
- TcpDiscoveryAbstractMessage msg0 = msgIt.next();
+ PendingMessage msg0 = msgIt.next();
- if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
+ if (msg0.customMsg) {
if (skipCustomMsg) {
assert customDiscardId != null;
- if (F.eq(customDiscardId, msg0.id()))
+ if (F.eq(customDiscardId, msg0.id))
skipCustomMsg = false;
continue;
@@ -2152,14 +2338,17 @@ class ServerImpl extends TcpDiscoveryImpl {
if (skipMsg) {
assert discardId != null;
- if (F.eq(discardId, msg0.id()))
+ if (F.eq(discardId, msg0.id))
skipMsg = false;
continue;
}
}
- next = msg0;
+ if (msg0.msg == null)
+ continue;
+
+ next = msg0.msg;
break;
}
@@ -2985,9 +3174,9 @@ class ServerImpl extends TcpDiscoveryImpl {
if (pendingMsgs.msgs.isEmpty())
return false;
- for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
- if (pendingMsg instanceof TcpDiscoveryNodeAddedMessage) {
- TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg;
+ for (PendingMessage pendingMsg : pendingMsgs.msgs) {
+ if (pendingMsg.msg instanceof TcpDiscoveryNodeAddedMessage) {
+ TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg.msg;
if (addMsg.node().id().equals(nodeId) && addMsg.id().compareTo(pendingMsgs.discardId) > 0)
return true;
@@ -3901,8 +4090,7 @@ class ServerImpl extends TcpDiscoveryImpl {
Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
if (data != null)
- spi.onExchange(node.id(), node.id(), data,
- U.resolveClassLoader(spi.ignite().configuration()));
+ spi.onExchange(node.id(), node.id(), data, U.resolveClassLoader(spi.ignite().configuration()));
msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id()));
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index 1b99a56..80f4565 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -59,6 +59,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
}
/**
+ * @param msg Message.
+ */
+ public TcpDiscoveryNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
+ super(msg);
+
+ nodeId = msg.nodeId;
+ clientDiscoData = msg.clientDiscoData;
+ clientNodeAttrs = msg.clientNodeAttrs;
+ }
+
+ /**
* Gets ID of the node added.
*
* @return ID of the node added.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 6f8e14e..bd52c04 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -17,7 +17,9 @@
package org.apache.ignite.spi.discovery.tcp.messages;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
@@ -234,14 +236,41 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
}
/**
+ * @param oldNodesDiscoData Discovery data from old nodes.
+ */
+ public void oldNodesDiscoveryData(Map<UUID, Map<Integer, byte[]>> oldNodesDiscoData) {
+ this.oldNodesDiscoData = oldNodesDiscoData;
+ }
+
+ /**
* @param nodeId Node ID.
* @param discoData Discovery data to add.
*/
public void addDiscoveryData(UUID nodeId, Map<Integer, byte[]> discoData) {
// Old nodes disco data may be null if message
// makes more than 1 pass due to stopping of the nodes in topology.
- if (oldNodesDiscoData != null)
- oldNodesDiscoData.put(nodeId, discoData);
+ if (oldNodesDiscoData != null) {
+ for (Map.Entry<UUID, Map<Integer, byte[]>> existingDataEntry : oldNodesDiscoData.entrySet()) {
+ Map<Integer, byte[]> existingData = existingDataEntry.getValue();
+
+ Iterator<Map.Entry<Integer, byte[]>> it = discoData.entrySet().iterator();
+
+ while (it.hasNext()) {
+ Map.Entry<Integer, byte[]> discoDataEntry = it.next();
+
+ byte[] curData = existingData.get(discoDataEntry.getKey());
+
+ if (Arrays.equals(curData, discoDataEntry.getValue()))
+ it.remove();
+ }
+
+ if (discoData.isEmpty())
+ break;
+ }
+
+ if (!discoData.isEmpty())
+ oldNodesDiscoData.put(nodeId, discoData);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
new file mode 100644
index 0000000..ed186ac
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String AFF1_CACHE1 = "a1c1";
+
+ /** */
+ private static final String AFF1_CACHE2 = "a1c2";
+
+ /** */
+ private static final String AFF2_CACHE1 = "a2c1";
+
+ /** */
+ private static final String AFF2_CACHE2 = "a2c2";
+
+ /** */
+ private static final String AFF3_CACHE1 = "a3c1";
+
+ /** */
+ private static final String AFF4_FILTER_CACHE1 = "a4c1";
+
+ /** */
+ private static final String AFF4_FILTER_CACHE2 = "a4c2";
+
+ /** */
+ private static final String AFF5_FILTER_CACHE1 = "a5c1";
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+ commSpi.record(GridDhtPartitionsSingleMessage.class, GridDhtPartitionsFullMessage.class);
+
+ cfg.setCommunicationSpi(commSpi);
+
+ List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF1_CACHE1);
+ ccfg.setAffinity(new RendezvousAffinityFunction());
+ ccfgs.add(ccfg);
+ }
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF1_CACHE2);
+ ccfg.setAffinity(new RendezvousAffinityFunction());
+ ccfgs.add(ccfg);
+ }
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF2_CACHE1);
+ ccfg.setAffinity(new FairAffinityFunction());
+ ccfgs.add(ccfg);
+ }
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF2_CACHE2);
+ ccfg.setAffinity(new FairAffinityFunction());
+ ccfgs.add(ccfg);
+ }
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF3_CACHE1);
+ ccfg.setBackups(3);
+
+ RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 64);
+ ccfg.setAffinity(aff);
+
+ ccfgs.add(ccfg);
+ }
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF4_FILTER_CACHE1);
+ ccfg.setNodeFilter(new TestNodeFilter());
+ ccfg.setAffinity(new RendezvousAffinityFunction());
+ ccfgs.add(ccfg);
+ }
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF4_FILTER_CACHE2);
+ ccfg.setNodeFilter(new TestNodeFilter());
+ ccfg.setAffinity(new RendezvousAffinityFunction());
+ ccfgs.add(ccfg);
+ }
+ {
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(AFF5_FILTER_CACHE1);
+ ccfg.setNodeFilter(new TestNodeFilter());
+ ccfg.setAffinity(new FairAffinityFunction());
+ ccfgs.add(ccfg);
+ }
+
+ cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrid(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExchangeMessages() throws Exception {
+ ignite(0);
+
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ checkMessages(0, true);
+
+ startGrid(2);
+
+ awaitPartitionMapExchange();
+
+ checkMessages(0, true);
+
+ client = true;
+
+ startGrid(3);
+
+ awaitPartitionMapExchange();
+
+ checkMessages(0, false);
+
+ stopGrid(0);
+
+ awaitPartitionMapExchange();
+
+ checkMessages(1, true);
+ }
+
+ /**
+ * @param crdIdx Coordinator node index.
+ * @param checkSingle {@code True} if need check single messages.
+ */
+ private void checkMessages(int crdIdx, boolean checkSingle) {
+ checkFullMessages(crdIdx);
+
+ if (checkSingle)
+ checkSingleMessages(crdIdx);
+ }
+
+ /**
+ * @param crdIdx Coordinator node index.
+ */
+ private void checkFullMessages(int crdIdx) {
+ TestRecordingCommunicationSpi commSpi0 =
+ (TestRecordingCommunicationSpi)ignite(crdIdx).configuration().getCommunicationSpi();
+
+ List<Object> msgs = commSpi0.recordedMessages(false);
+
+ assertTrue(msgs.size() > 0);
+
+ for (Object msg : msgs) {
+ assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsFullMessage);
+
+ checkFullMessage((GridDhtPartitionsFullMessage)msg);
+ }
+ }
+
+ /**
+ * @param crdIdx Coordinator node index.
+ */
+ private void checkSingleMessages(int crdIdx) {
+ int cnt = 0;
+
+ for (Ignite ignite : Ignition.allGrids()) {
+ if (getTestGridName(crdIdx).equals(ignite.name()) || ignite.configuration().isClientMode())
+ continue;
+
+ TestRecordingCommunicationSpi commSpi0 =
+ (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+ List<Object> msgs = commSpi0.recordedMessages(false);
+
+ assertTrue(msgs.size() > 0);
+
+ for (Object msg : msgs) {
+ assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsSingleMessage);
+
+ checkSingleMessage((GridDhtPartitionsSingleMessage)msg);
+ }
+
+ cnt++;
+ }
+
+ assertTrue(cnt > 0);
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void checkFullMessage(GridDhtPartitionsFullMessage msg) {
+ Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, "dupPartsData");
+
+ assertNotNull(dupPartsData);
+
+ checkFullMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg);
+ checkFullMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg);
+ checkFullMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg);
+
+ assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1)));
+ assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1)));
+
+ Map<Integer, Map<Integer, Long>> partCntrs = GridTestUtils.getFieldValue(msg, "partCntrs");
+
+ if (partCntrs != null) {
+ for (Map<Integer, Long> cntrs : partCntrs.values())
+ assertTrue(cntrs.isEmpty());
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void checkSingleMessage(GridDhtPartitionsSingleMessage msg) {
+ Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, "dupPartsData");
+
+ assertNotNull(dupPartsData);
+
+ checkSingleMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg);
+ checkSingleMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg);
+ checkSingleMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg);
+
+ assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1)));
+ assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1)));
+
+ Map<Integer, Map<Integer, Long>> partCntrs = GridTestUtils.getFieldValue(msg, "partCntrs");
+
+ if (partCntrs != null) {
+ for (Map<Integer, Long> cntrs : partCntrs.values())
+ assertTrue(cntrs.isEmpty());
+ }
+ }
+
+ /**
+ * @param cache1 Cache 1.
+ * @param cache2 Cache 2.
+ * @param dupPartsData Duplicated data map.
+ * @param msg Message.
+ */
+ private void checkFullMessage(String cache1,
+ String cache2,
+ Map<Integer, Integer> dupPartsData,
+ GridDhtPartitionsFullMessage msg)
+ {
+ Integer cacheId;
+ Integer dupCacheId;
+
+ if (dupPartsData.containsKey(CU.cacheId(cache1))) {
+ cacheId = CU.cacheId(cache1);
+ dupCacheId = CU.cacheId(cache2);
+ }
+ else {
+ cacheId = CU.cacheId(cache2);
+ dupCacheId = CU.cacheId(cache1);
+ }
+
+ assertTrue(dupPartsData.containsKey(cacheId));
+ assertEquals(dupCacheId, dupPartsData.get(cacheId));
+ assertFalse(dupPartsData.containsKey(dupCacheId));
+
+ Map<Integer, GridDhtPartitionFullMap> parts = msg.partitions();
+
+ GridDhtPartitionFullMap emptyFullMap = parts.get(cacheId);
+
+ for (GridDhtPartitionMap2 map : emptyFullMap.values())
+ assertEquals(0, map.map().size());
+
+ GridDhtPartitionFullMap fullMap = parts.get(dupCacheId);
+
+ for (GridDhtPartitionMap2 map : fullMap.values())
+ assertFalse(map.map().isEmpty());
+ }
+
+ /**
+ * @param cache1 Cache 1.
+ * @param cache2 Cache 2.
+ * @param dupPartsData Duplicated data map.
+ * @param msg Message.
+ */
+ private void checkSingleMessage(String cache1,
+ String cache2,
+ Map<Integer, Integer> dupPartsData,
+ GridDhtPartitionsSingleMessage msg)
+ {
+ Integer cacheId;
+ Integer dupCacheId;
+
+ if (dupPartsData.containsKey(CU.cacheId(cache1))) {
+ cacheId = CU.cacheId(cache1);
+ dupCacheId = CU.cacheId(cache2);
+ }
+ else {
+ cacheId = CU.cacheId(cache2);
+ dupCacheId = CU.cacheId(cache1);
+ }
+
+ assertTrue(dupPartsData.containsKey(cacheId));
+ assertEquals(dupCacheId, dupPartsData.get(cacheId));
+ assertFalse(dupPartsData.containsKey(dupCacheId));
+
+ Map<Integer, GridDhtPartitionMap2> parts = msg.partitions();
+
+ GridDhtPartitionMap2 emptyMap = parts.get(cacheId);
+
+ assertEquals(0, emptyMap.map().size());
+
+ GridDhtPartitionMap2 map = parts.get(dupCacheId);
+
+ assertFalse(map.map().isEmpty());
+ }
+
+ /**
+ *
+ */
+ private static class TestNodeFilter implements IgnitePredicate<ClusterNode> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ // Do not start cache on coordinator.
+ return node.order() > 1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
index 5dc059b..6c577c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
@@ -630,7 +630,7 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
if (cacheMode() == LOCAL)
return;
- awaitPartitionMapExchange(true, true);
+ awaitPartitionMapExchange(true, true, null);
checkEmpty();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
index 71d1182..3b0c2fa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -72,6 +73,8 @@ public class IgniteCacheGetRestartTest extends GridCommonAbstractTest {
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
Boolean clientMode = client.get();
if (clientMode != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
index 2c47a1c..7b57d5f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
@@ -115,21 +115,21 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
startGrid(2);
startGrid(3);
- awaitPartitionMapExchange(true, true);
+ awaitPartitionMapExchange(true, true, null);
for (int i = 0; i < 2; i++) {
stopGrid(3);
- awaitPartitionMapExchange(true, true);
+ awaitPartitionMapExchange(true, true, null);
startGrid(3);
- awaitPartitionMapExchange(true, true);
+ awaitPartitionMapExchange(true, true, null);
}
startGrid(4);
- awaitPartitionMapExchange(true, true);
+ awaitPartitionMapExchange(true, true, null);
assert rs.isEmpty();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 5716d59..de38952 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -240,7 +240,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
waitForRebalancing(0, new AffinityTopologyVersion(2, waitMinorVer));
waitForRebalancing(1, new AffinityTopologyVersion(2, waitMinorVer));
- awaitPartitionMapExchange(true, true);
+ awaitPartitionMapExchange(true, true, null);
checkPartitionMapExchangeFinished();
@@ -250,7 +250,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
waitForRebalancing(1, 3);
- awaitPartitionMapExchange(true, true);
+ awaitPartitionMapExchange(true, true, null);
checkPartitionMapExchangeFinished();
@@ -261,7 +261,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
waitForRebalancing(1, new AffinityTopologyVersion(4, waitMinorVer));
waitForRebalancing(2, new AffinityTopologyVersion(4, waitMinorVer));
- awaitPartitionMapExchange(true, true);
+ awaitPartitionMapExchange(true, true, null);
checkPartitionMapExchangeFinished();
@@ -271,7 +271,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
waitForRebalancing(1, 5);
- awaitPartitionMapExchange(true, true);
+ awaitPartitionMapExchange(true, true, null);
checkPartitionMapExchangeFinished();
@@ -339,7 +339,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
concurrentStartFinished = true;
- awaitPartitionMapExchange(true, true);
+ awaitPartitionMapExchange(true, true, null);
checkSupplyContextMapIsEmpty();
@@ -607,7 +607,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
waitForRebalancing(3, 5, 1);
waitForRebalancing(4, 5, 1);
- awaitPartitionMapExchange(true, true);
+ awaitPartitionMapExchange(true, true, null);
checkSupplyContextMapIsEmpty();
@@ -631,7 +631,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
waitForRebalancing(3, 6);
waitForRebalancing(4, 6);
- awaitPartitionMapExchange(true, true);
+ awaitPartitionMapExchange(true, true, null);
checkSupplyContextMapIsEmpty();
@@ -641,7 +641,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
waitForRebalancing(3, 7);
waitForRebalancing(4, 7);
- awaitPartitionMapExchange(true, true);
+ awaitPartitionMapExchange(true, true, null);
checkSupplyContextMapIsEmpty();
@@ -650,7 +650,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
waitForRebalancing(3, 8);
waitForRebalancing(4, 8);
- awaitPartitionMapExchange(true, true);
+ awaitPartitionMapExchange(true, true, null);
checkPartitionMapExchangeFinished();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
index 87d02a5..cde6b8d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
@@ -41,9 +41,6 @@ public class GridCacheSyncReplicatedPreloadSelfTest extends GridCommonAbstractTe
/** */
private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
- /** */
- private static final boolean DISCO_DEBUG_MODE = false;
-
/**
* Constructs test.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
index 9b0637e..f3942d5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
@@ -34,7 +34,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
public class IgniteCacheSyncRebalanceModeSelfTest extends GridCommonAbstractTest {
/** Entry count. */
public static final int CNT = 100_000;
- public static final String STATIC_CACHE_NAME = "static";
+
+ /** */
+ private static final String STATIC_CACHE_NAME = "static";
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 1b7fe2b..d2cb710 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -537,7 +537,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
Affinity<Object> aff = grid(i).affinity(null);
- Map<Integer, Long> act = grid(i).cachex(null).context().topology().updateCounters();
+ Map<Integer, Long> act = grid(i).cachex(null).context().topology().updateCounters(false);
for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) {
if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode()))
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
index 5ecc27a..1259faf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
@@ -68,6 +68,13 @@ public class IgniteNoCustomEventsOnNodeStart extends GridCommonAbstractTest {
assertFalse(failed);
}
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
/**
*
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 1ce98a5..043208c 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
@@ -114,6 +115,12 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
/** */
private GridStringLogger strLog;
+ /** */
+ private CacheConfiguration[] ccfgs;
+
+ /** */
+ private boolean client;
+
/**
* @throws Exception If fails.
*/
@@ -152,7 +159,10 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
cfg.setDiscoverySpi(spi);
- cfg.setCacheConfiguration();
+ if (ccfgs != null)
+ cfg.setCacheConfiguration(ccfgs);
+ else
+ cfg.setCacheConfiguration();
cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
@@ -194,9 +204,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
else if (gridName.contains("testPingInterruptedOnNodeFailedPingingNode"))
cfg.setFailureDetectionTimeout(30_000);
- else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureNormalNode")) {
+ else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureNormalNode"))
cfg.setFailureDetectionTimeout(3_000);
- }
else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureSegmentedNode")) {
cfg.setFailureDetectionTimeout(6_000);
@@ -205,6 +214,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
else if (gridName.contains("testNodeShutdownOnRingMessageWorkerFailureFailedNode"))
cfg.setGridLogger(strLog = new GridStringLogger());
+ cfg.setClientMode(client);
+
return cfg;
}
@@ -1961,6 +1972,63 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testDuplicatedDiscoveryDataRemoved() throws Exception {
+ try {
+ TestDiscoveryDataDuplicateSpi.checkNodeAdded = false;
+ TestDiscoveryDataDuplicateSpi.checkClientNodeAddFinished = false;
+ TestDiscoveryDataDuplicateSpi.fail = false;
+
+ ccfgs = new CacheConfiguration[5];
+
+ for (int i = 0; i < ccfgs.length; i++) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(i == 0 ? null : ("static-cache-" + i));
+
+ ccfgs[i] = ccfg;
+ }
+
+ TestDiscoveryDataDuplicateSpi spi = new TestDiscoveryDataDuplicateSpi();
+
+ nodeSpi.set(spi);
+
+ startGrid(0);
+
+ for (int i = 0; i < 5; i++) {
+ nodeSpi.set(new TestDiscoveryDataDuplicateSpi());
+
+ startGrid(i + 1);
+ }
+
+ client = true;
+
+ Ignite clientNode = startGrid(6);
+
+ assertTrue(clientNode.configuration().isClientMode());
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName("c1");
+
+ clientNode.createCache(ccfg);
+
+ client = false;
+
+ nodeSpi.set(new TestDiscoveryDataDuplicateSpi());
+
+ startGrid(7);
+
+ assertTrue(TestDiscoveryDataDuplicateSpi.checkNodeAdded);
+ assertTrue(TestDiscoveryDataDuplicateSpi.checkClientNodeAddFinished);
+ assertFalse(TestDiscoveryDataDuplicateSpi.fail);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
* @param nodeName Node name.
* @throws Exception If failed.
*/
@@ -2015,6 +2083,66 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
}
+ /**
+ *
+ */
+ private static class TestDiscoveryDataDuplicateSpi extends TcpDiscoverySpi {
+ /** */
+ static volatile boolean fail;
+
+ /** */
+ static volatile boolean checkNodeAdded;
+
+ /** */
+ static volatile boolean checkClientNodeAddFinished;
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, OutputStream out,
+ TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+ Map<UUID, Map<Integer, byte[]>> discoData = ((TcpDiscoveryNodeAddedMessage)msg).oldNodesDiscoveryData();
+
+ checkDiscoData(discoData, msg);
+ }
+ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+ Map<UUID, Map<Integer, byte[]>> discoData = ((TcpDiscoveryNodeAddFinishedMessage)msg).clientDiscoData();
+
+ checkDiscoData(discoData, msg);
+ }
+
+ super.writeToSocket(sock, out, msg, timeout);
+ }
+
+ /**
+ * @param discoData Discovery data.
+ * @param msg Message.
+ */
+ private void checkDiscoData(Map<UUID, Map<Integer, byte[]>> discoData, TcpDiscoveryAbstractMessage msg) {
+ if (discoData != null && discoData.size() > 1) {
+ int cnt = 0;
+
+ for (Map.Entry<UUID, Map<Integer, byte[]>> e : discoData.entrySet()) {
+ Map<Integer, byte[]> map = e.getValue();
+
+ if (map.containsKey(GridComponent.DiscoveryDataExchangeType.CACHE_PROC.ordinal()))
+ cnt++;
+ }
+
+ if (cnt > 1) {
+ fail = true;
+
+ log.error("Expect cache data only from one node, but actually: " + cnt);
+ }
+
+ if (msg instanceof TcpDiscoveryNodeAddedMessage)
+ checkNodeAdded = true;
+ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+ checkClientNodeAddFinished = true;
+ }
+ }
+ }
+
/**
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 87509a4..22fa36d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -428,21 +428,42 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
*/
@SuppressWarnings("BusyWait")
protected void awaitPartitionMapExchange() throws InterruptedException {
- awaitPartitionMapExchange(false, false);
+ awaitPartitionMapExchange(false, false, null);
}
/**
* @param waitEvicts If {@code true} will wait for evictions finished.
* @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update finished.
+ * @param nodes Optional nodes.
* @throws InterruptedException If interrupted.
*/
@SuppressWarnings("BusyWait")
- protected void awaitPartitionMapExchange(boolean waitEvicts, boolean waitNode2PartUpdate) throws InterruptedException {
+ protected void awaitPartitionMapExchange(boolean waitEvicts,
+ boolean waitNode2PartUpdate,
+ @Nullable Collection<ClusterNode> nodes)
+ throws InterruptedException {
long timeout = 30_000;
+ long startTime = -1;
+
+ Set<String> names = new HashSet<>();
+
for (Ignite g : G.allGrids()) {
+ if (nodes != null && !nodes.contains(g.cluster().localNode()))
+ continue;
+
IgniteKernal g0 = (IgniteKernal)g;
+ names.add(g0.configuration().getGridName());
+
+ if (startTime != -1) {
+ if (startTime != g0.context().discovery().gridStartTime())
+ fail("Found nodes from different clusters, probable some test does not stop nodes " +
+ "[allNodes=" + names + ']');
+ }
+ else
+ startTime = g0.context().discovery().gridStartTime();
+
for (IgniteCacheProxy<?, ?> c : g0.context().cache().jcaches()) {
CacheConfiguration cfg = c.context().config();