You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/11 10:15:54 UTC
[1/2] ignite git commit: IGNITE-4941: Removed old
GridDhtPartitionSupplyMessage.
Repository: ignite
Updated Branches:
refs/heads/ignite-3477-master 8122099f0 -> 3eb05de5e
IGNITE-4941: Removed old GridDhtPartitionSupplyMessage.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aeacad6b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aeacad6b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aeacad6b
Branch: refs/heads/ignite-3477-master
Commit: aeacad6b87ac95dd2f5da525573d6fa58f4e51db
Parents: edfa353
Author: devozerov <vo...@gridgain.com>
Authored: Tue Apr 11 12:18:52 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Apr 11 12:18:52 2017 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 8 +-
.../GridCachePartitionExchangeManager.java | 7 +-
.../processors/cache/GridCachePreloader.java | 4 +-
.../cache/GridCachePreloaderAdapter.java | 4 +-
.../dht/preloader/GridDhtPartitionDemander.java | 2 +-
.../dht/preloader/GridDhtPartitionSupplier.java | 310 +--------------
.../GridDhtPartitionSupplyMessage.java | 99 ++---
.../GridDhtPartitionSupplyMessageV2.java | 384 -------------------
.../dht/preloader/GridDhtPreloader.java | 3 +-
.../resources/META-INF/classnames.properties | 1 -
.../CacheLateAffinityAssignmentTest.java | 6 +-
.../IgniteCacheReadFromBackupTest.java | 6 +-
.../atomic/IgniteCacheAtomicProtocolTest.java | 3 +-
13 files changed, 60 insertions(+), 777 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 737d047..8aac56d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -86,7 +86,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
@@ -498,11 +497,6 @@ public class GridIoMessageFactory implements MessageFactory {
break;
- case 45:
- msg = new GridDhtPartitionSupplyMessage();
-
- break;
-
case 46:
msg = new GridDhtPartitionsFullMessage();
@@ -824,7 +818,7 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case 114:
- msg = new GridDhtPartitionSupplyMessageV2();
+ msg = new GridDhtPartitionSupplyMessage();
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 231dff8..885106d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -67,8 +67,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@ -394,9 +393,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridCacheContext cacheCtx = cctx.cacheContext(m.cacheId);
if (cacheCtx != null) {
- if (m instanceof GridDhtPartitionSupplyMessageV2)
+ if (m instanceof GridDhtPartitionSupplyMessage)
cacheCtx.preloader().handleSupplyMessage(
- idx, id, (GridDhtPartitionSupplyMessageV2)m);
+ idx, id, (GridDhtPartitionSupplyMessage)m);
else if (m instanceof GridDhtPartitionDemandMessage)
cacheCtx.preloader().handleDemandMessage(
idx, id, (GridDhtPartitionDemandMessage)m);
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 0c28691..df0d71d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -25,7 +25,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -168,7 +168,7 @@ public interface GridCachePreloader {
* @param id Node Id.
* @param s Supply message.
*/
- public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s);
+ public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessage s);
/**
* Handles Demand message.
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index d7ec288..ac3b6cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -134,7 +134,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
- @Override public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessageV2 s) {
+ @Override public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessage s) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index d5f2246..f8114cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -541,7 +541,7 @@ public class GridDhtPartitionDemander {
public void handleSupplyMessage(
int idx,
final UUID id,
- final GridDhtPartitionSupplyMessageV2 supply
+ final GridDhtPartitionSupplyMessage supply
) {
AffinityTopologyVersion topVer = supply.topologyVersion();
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 9942423..7c2599a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -87,13 +86,6 @@ class GridDhtPartitionSupplier {
/**
*
*/
- void start() {
- startOldListeners();
- }
-
- /**
- *
- */
void stop() {
synchronized (scMap) {
Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator();
@@ -106,8 +98,6 @@ class GridDhtPartitionSupplier {
it.remove();
}
}
-
- stopOldListeners();
}
/**
@@ -146,6 +136,7 @@ class GridDhtPartitionSupplier {
*
* @param topVer Topology version.
*/
+ @SuppressWarnings("ConstantConditions")
public void onTopologyChanged(AffinityTopologyVersion topVer) {
synchronized (scMap) {
Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator();
@@ -179,6 +170,7 @@ class GridDhtPartitionSupplier {
* @param idx Index.
* @param id Node uuid.
*/
+ @SuppressWarnings("unchecked")
public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
assert d != null;
assert id != null;
@@ -208,7 +200,7 @@ class GridDhtPartitionSupplier {
log.debug("Demand request accepted [current=" + cutTop + ", demanded=" + demTop +
", from=" + id + ", idx=" + idx + "]");
- GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(
+ GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(
d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
ClusterNode node = cctx.discovery().node(id);
@@ -338,7 +330,7 @@ class GridDhtPartitionSupplier {
if (!reply(node, d, s, scId))
return;
- s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+ s = new GridDhtPartitionSupplyMessage(d.updateSequence(),
cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
}
}
@@ -424,7 +416,7 @@ class GridDhtPartitionSupplier {
if (!reply(node, d, s, scId))
return;
- s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+ s = new GridDhtPartitionSupplyMessage(d.updateSequence(),
cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
}
}
@@ -545,7 +537,7 @@ class GridDhtPartitionSupplier {
if (!reply(node, d, s, scId))
return;
- s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+ s = new GridDhtPartitionSupplyMessage(d.updateSequence(),
cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
}
}
@@ -605,7 +597,7 @@ class GridDhtPartitionSupplier {
*/
private boolean reply(ClusterNode n,
GridDhtPartitionDemandMessage d,
- GridDhtPartitionSupplyMessageV2 s,
+ GridDhtPartitionSupplyMessage s,
T3<UUID, Integer, AffinityTopologyVersion> scId)
throws IgniteCheckedException {
@@ -744,294 +736,6 @@ class GridDhtPartitionSupplier {
}
}
- @Deprecated//Backward compatibility. To be removed in future.
- public void startOldListeners() {
- if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) {
- cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
- @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
- processOldDemandMessage(m, id);
- }
- });
- }
- }
-
- @Deprecated//Backward compatibility. To be removed in future.
- public void stopOldListeners() {
- if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled())
- cctx.io().removeHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class);
- }
-
- /**
- * @param d D.
- * @param id Id.
- */
- @Deprecated//Backward compatibility. To be removed in future.
- private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) {
- GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled());
-
- ClusterNode node = cctx.node(id);
-
- if (node == null)
- return;
-
- long preloadThrottle = cctx.config().getRebalanceThrottle();
-
- boolean ack = false;
-
- try {
- for (int part : d.partitions()) {
- GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
-
- if (loc == null || loc.state() != OWNING || !loc.reserve()) {
- // Reply with partition of "-1" to let sender know that
- // this node is no longer an owner.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Requested partition is not owned by local node [part=" + part +
- ", demander=" + id + ']');
-
- continue;
- }
-
- GridCacheEntryInfoCollectSwapListener swapLsnr = null;
-
- try {
- if (cctx.isSwapOrOffheapEnabled()) {
- swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
-
- cctx.swap().addOffHeapListener(part, swapLsnr);
- cctx.swap().addSwapListener(part, swapLsnr);
- }
-
- boolean partMissing = false;
-
- for (GridCacheEntryEx e : loc.allEntries()) {
- if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition, so we send '-1' partition and move on.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition [part=" + part +
- ", nodeId=" + id + ']');
-
- partMissing = true;
-
- break;
- }
-
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
-
- if (!replyOld(node, d, s))
- return;
-
- // Throttle preloading.
- if (preloadThrottle > 0)
- U.sleep(preloadThrottle);
-
- s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
- cctx.cacheId(), cctx.deploymentEnabled());
- }
-
- GridCacheEntryInfo info = e.info();
-
- if (info != null && !info.isNew()) {
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry(part, info, cctx);
- else if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
- info);
- }
- }
-
- if (partMissing)
- continue;
-
- if (cctx.isSwapOrOffheapEnabled()) {
- GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
- cctx.swap().iterator(part);
-
- // Iterator may be null if space does not exist.
- if (iter != null) {
- try {
- boolean prepared = false;
-
- for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
- if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition,
- // so we send '-1' partition and move on.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition " +
- "[part=" + part + ", nodeId=" + id + ']');
-
- partMissing = true;
-
- break; // For.
- }
-
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
-
- if (!replyOld(node, d, s))
- return;
-
- // Throttle preloading.
- if (preloadThrottle > 0)
- U.sleep(preloadThrottle);
-
- s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled());
- }
-
- GridCacheSwapEntry swapEntry = e.getValue();
-
- GridCacheEntryInfo info = new GridCacheEntryInfo();
-
- info.keyBytes(e.getKey());
- info.ttl(swapEntry.ttl());
- info.expireTime(swapEntry.expireTime());
- info.version(swapEntry.version());
- info.value(swapEntry.value());
-
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry0(part, info, cctx);
- else {
- if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not send " +
- "cache entry): " + info);
-
- continue;
- }
-
- // Need to manually prepare cache message.
- if (depEnabled && !prepared) {
- ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
- cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
- swapEntry.valueClassLoaderId() != null ?
- cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
- null;
-
- if (ldr == null)
- continue;
-
- if (ldr instanceof GridDeploymentInfo) {
- s.prepare((GridDeploymentInfo)ldr);
-
- prepared = true;
- }
- }
- }
-
- if (partMissing)
- continue;
- }
- finally {
- iter.close();
- }
- }
- }
-
- // Stop receiving promote notifications.
- if (swapLsnr != null) {
- cctx.swap().removeOffHeapListener(part, swapLsnr);
- cctx.swap().removeSwapListener(part, swapLsnr);
- }
-
- if (swapLsnr != null) {
- Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
-
- swapLsnr = null;
-
- for (GridCacheEntryInfo info : entries) {
- if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition,
- // so we send '-1' partition and move on.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition " +
- "[part=" + part + ", nodeId=" + id + ']');
-
- // No need to continue iteration over swap entries.
- break;
- }
-
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
-
- if (!replyOld(node, d, s))
- return;
-
- s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(),
- cctx.cacheId(),
- cctx.deploymentEnabled());
- }
-
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry(part, info, cctx);
- else if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
- info);
- }
- }
-
- // Mark as last supply message.
- s.last(part);
-
- if (ack) {
- s.markAck();
-
- break; // Partition for loop.
- }
- }
- finally {
- loc.release();
-
- if (swapLsnr != null) {
- cctx.swap().removeOffHeapListener(part, swapLsnr);
- cctx.swap().removeSwapListener(part, swapLsnr);
- }
- }
- }
-
- replyOld(node, d, s);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
- }
- }
-
- /**
- * @param n Node.
- * @param d Demand message.
- * @param s Supply message.
- * @return {@code True} if message was sent, {@code false} if recipient left grid.
- * @throws IgniteCheckedException If failed.
- */
- @Deprecated//Backward compatibility. To be removed in future.
- private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
- throws IgniteCheckedException {
- try {
- if (log.isDebugEnabled())
- log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
-
- cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
-
- return true;
- }
- catch (ClusterTopologyCheckedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Failed to send partition supply message because node left grid: " + n.id());
-
- return false;
- }
- }
-
/**
* Dumps debug information.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index cc30321..a01be28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
@@ -48,14 +49,11 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
/** */
private static final long serialVersionUID = 0L;
- /** Worker ID. */
- private int workerId = -1;
-
/** Update sequence. */
private long updateSeq;
- /** Acknowledgement flag. */
- private boolean ack;
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
/** Partitions that have been fully sent. */
@GridDirectCollection(int.class)
@@ -68,27 +66,26 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
/** Entries. */
@GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
- private Map<Integer, CacheEntryInfoCollection> infos = new HashMap<>();
+ private Map<Integer, CacheEntryInfoCollection> infos;
/** Message size. */
@GridDirectTransient
private int msgSize;
/**
- * @param workerId Worker ID.
* @param updateSeq Update sequence for this node.
* @param cacheId Cache ID.
+ * @param topVer Topology version.
* @param addDepInfo Deployment info flag.
*/
- GridDhtPartitionSupplyMessage(int workerId, long updateSeq, int cacheId, boolean addDepInfo) {
- assert workerId >= 0;
- assert updateSeq > 0;
-
+ GridDhtPartitionSupplyMessage(long updateSeq,
+ int cacheId,
+ AffinityTopologyVersion topVer,
+ boolean addDepInfo) {
this.cacheId = cacheId;
this.updateSeq = updateSeq;
- this.workerId = workerId;
- this.addDepInfo = addDepInfo;
- }
+ this.topVer = topVer;
+ this.addDepInfo = addDepInfo; }
/**
* Empty constructor required for {@link Externalizable}.
@@ -103,13 +100,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
}
/**
- * @return Worker ID.
- */
- int workerId() {
- return workerId;
- }
-
- /**
* @return Update sequence.
*/
long updateSequence() {
@@ -117,17 +107,10 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
}
/**
- * Marks this message for acknowledgment.
+ * @return Topology version for which demand message is sent.
*/
- void markAck() {
- ack = true;
- }
-
- /**
- * @return Acknowledgement flag.
- */
- boolean ack() {
- return ack;
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
}
/**
@@ -148,12 +131,12 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
msgSize += 4;
// If partition is empty, we need to add it.
- if (!infos.containsKey(p)) {
+ if (!infos().containsKey(p)) {
CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection();
infoCol.init();
- infos.put(p, infoCol);
+ infos().put(p, infoCol);
}
}
}
@@ -180,6 +163,9 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
* @return Entries.
*/
Map<Integer, CacheEntryInfoCollection> infos() {
+ if (infos == null)
+ infos = new HashMap<>();
+
return infos;
}
@@ -203,12 +189,12 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
msgSize += info.marshalledSize(ctx);
- CacheEntryInfoCollection infoCol = infos.get(p);
+ CacheEntryInfoCollection infoCol = infos().get(p);
if (infoCol == null) {
msgSize += 4;
- infos.put(p, infoCol = new CacheEntryInfoCollection());
+ infos().put(p, infoCol = new CacheEntryInfoCollection());
infoCol.init();
}
@@ -232,12 +218,12 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
msgSize += info.marshalledSize(ctx);
- CacheEntryInfoCollection infoCol = infos.get(p);
+ CacheEntryInfoCollection infoCol = infos().get(p);
if (infoCol == null) {
msgSize += 4;
- infos.put(p, infoCol = new CacheEntryInfoCollection());
+ infos().put(p, infoCol = new CacheEntryInfoCollection());
infoCol.init();
}
@@ -253,7 +239,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
for (CacheEntryInfoCollection col : infos().values()) {
- List<GridCacheEntryInfo> entries = col.infos();
+ List<GridCacheEntryInfo> entries = col.infos();
for (int i = 0; i < entries.size(); i++)
entries.get(i).unmarshal(cacheCtx, ldr);
@@ -269,7 +255,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
* @return Number of entries in message.
*/
public int size() {
- return infos.size();
+ return infos().size();
}
/** {@inheritDoc} */
@@ -288,25 +274,25 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
switch (writer.state()) {
case 3:
- if (!writer.writeBoolean("ack", ack))
+ if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 4:
- if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
+ if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
return false;
writer.incrementState();
case 5:
- if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
+ if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
return false;
writer.incrementState();
case 6:
- if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
+ if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
@@ -317,12 +303,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
writer.incrementState();
- case 8:
- if (!writer.writeInt("workerId", workerId))
- return false;
-
- writer.incrementState();
-
}
return true;
@@ -340,7 +320,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
switch (reader.state()) {
case 3:
- ack = reader.readBoolean("ack");
+ infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
if (!reader.isLastRead())
return false;
@@ -348,7 +328,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
reader.incrementState();
case 4:
- infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
+ last = reader.readCollection("last", MessageCollectionItemType.INT);
if (!reader.isLastRead())
return false;
@@ -356,7 +336,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
reader.incrementState();
case 5:
- last = reader.readCollection("last", MessageCollectionItemType.INT);
+ missed = reader.readCollection("missed", MessageCollectionItemType.INT);
if (!reader.isLastRead())
return false;
@@ -364,7 +344,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
reader.incrementState();
case 6:
- missed = reader.readCollection("missed", MessageCollectionItemType.INT);
+ topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
return false;
@@ -379,14 +359,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
reader.incrementState();
- case 8:
- workerId = reader.readInt("workerId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
}
return reader.afterMessageRead(GridDhtPartitionSupplyMessage.class);
@@ -394,18 +366,19 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
/** {@inheritDoc} */
@Override public short directType() {
- return 45;
+ return 114;
}
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 9;
+ return 8;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtPartitionSupplyMessage.class, this,
"size", size(),
+ "parts", infos().keySet(),
"super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
deleted file mode 100644
index 2294582..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.io.Externalizable;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectMap;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * Partition supply message.
- */
-public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements GridCacheDeployable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Update sequence. */
- private long updateSeq;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
- /** Partitions that have been fully sent. */
- @GridDirectCollection(int.class)
- private Collection<Integer> last;
-
- /** Partitions which were not found. */
- @GridToStringInclude
- @GridDirectCollection(int.class)
- private Collection<Integer> missed;
-
- /** Entries. */
- @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
- private Map<Integer, CacheEntryInfoCollection> infos;
-
- /** Message size. */
- @GridDirectTransient
- private int msgSize;
-
- /**
- * @param updateSeq Update sequence for this node.
- * @param cacheId Cache ID.
- * @param topVer Topology version.
- * @param addDepInfo Deployment info flag.
- */
- GridDhtPartitionSupplyMessageV2(long updateSeq,
- int cacheId,
- AffinityTopologyVersion topVer,
- boolean addDepInfo) {
- this.cacheId = cacheId;
- this.updateSeq = updateSeq;
- this.topVer = topVer;
- this.addDepInfo = addDepInfo; }
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public GridDhtPartitionSupplyMessageV2() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean ignoreClassErrors() {
- return true;
- }
-
- /**
- * @return Update sequence.
- */
- long updateSequence() {
- return updateSeq;
- }
-
- /**
- * @return Topology version for which demand message is sent.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
- * @return Flag to indicate last message for partition.
- */
- Collection<Integer> last() {
- return last == null ? Collections.<Integer>emptySet() : last;
- }
-
- /**
- * @param p Partition which was fully sent.
- */
- void last(int p) {
- if (last == null)
- last = new HashSet<>();
-
- if (last.add(p)) {
- msgSize += 4;
-
- // If partition is empty, we need to add it.
- if (!infos().containsKey(p)) {
- CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection();
-
- infoCol.init();
-
- infos().put(p, infoCol);
- }
- }
- }
-
- /**
- * @param p Missed partition.
- */
- void missed(int p) {
- if (missed == null)
- missed = new HashSet<>();
-
- if (missed.add(p))
- msgSize += 4;
- }
-
- /**
- * @return Missed partitions.
- */
- Collection<Integer> missed() {
- return missed == null ? Collections.<Integer>emptySet() : missed;
- }
-
- /**
- * @return Entries.
- */
- Map<Integer, CacheEntryInfoCollection> infos() {
- if (infos == null)
- infos = new HashMap<>();
-
- return infos;
- }
-
- /**
- * @return Message size.
- */
- int messageSize() {
- return msgSize;
- }
-
- /**
- * @param p Partition.
- * @param info Entry to add.
- * @param ctx Cache context.
- * @throws IgniteCheckedException If failed.
- */
- void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
- assert info != null;
-
- marshalInfo(info, ctx);
-
- msgSize += info.marshalledSize(ctx);
-
- CacheEntryInfoCollection infoCol = infos().get(p);
-
- if (infoCol == null) {
- msgSize += 4;
-
- infos().put(p, infoCol = new CacheEntryInfoCollection());
-
- infoCol.init();
- }
-
- infoCol.add(info);
- }
-
- /**
- * @param p Partition.
- * @param info Entry to add.
- * @param ctx Cache context.
- * @throws IgniteCheckedException If failed.
- */
- void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
- assert info != null;
- assert (info.key() != null || info.keyBytes() != null);
- assert info.value() != null;
-
- // Need to call this method to initialize info properly.
- marshalInfo(info, ctx);
-
- msgSize += info.marshalledSize(ctx);
-
- CacheEntryInfoCollection infoCol = infos().get(p);
-
- if (infoCol == null) {
- msgSize += 4;
-
- infos().put(p, infoCol = new CacheEntryInfoCollection());
-
- infoCol.init();
- }
-
- infoCol.add(info);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
-
- for (CacheEntryInfoCollection col : infos().values()) {
- List<GridCacheEntryInfo> entries = col.infos();
-
- for (int i = 0; i < entries.size(); i++)
- entries.get(i).unmarshal(cacheCtx, ldr);
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
-
- /**
- * @return Number of entries in message.
- */
- public int size() {
- return infos().size();
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 3:
- if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeMessage("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeLong("updateSeq", updateSeq))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 3:
- infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- last = reader.readCollection("last", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- missed = reader.readCollection("missed", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- topVer = reader.readMessage("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- updateSeq = reader.readLong("updateSeq");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridDhtPartitionSupplyMessageV2.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 114;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 8;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtPartitionSupplyMessageV2.class, this,
- "size", size(),
- "parts", infos().keySet(),
- "super", super.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index dc988bd..a5dcd8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -187,7 +187,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
supplier = new GridDhtPartitionSupplier(cctx);
demander = new GridDhtPartitionDemander(cctx);
- supplier.start();
demander.start();
cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
@@ -380,7 +379,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) {
+ public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessage s) {
if (!enterBusy())
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 9cce826..8c5a72e 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -743,7 +743,6 @@ org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPar
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$1
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$SupplyContextPhase
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage
-org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$1
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$2
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index e482a93..5582fdd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -65,7 +65,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@ -2016,10 +2016,10 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
private void blockSupplySend(TestRecordingCommunicationSpi spi, final String cacheName) {
spi.blockMessages(new IgnitePredicate<GridIoMessage>() {
@Override public boolean apply(GridIoMessage ioMsg) {
- if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessageV2.class))
+ if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessage.class))
return false;
- GridDhtPartitionSupplyMessageV2 msg = (GridDhtPartitionSupplyMessageV2)ioMsg.message();
+ GridDhtPartitionSupplyMessage msg = (GridDhtPartitionSupplyMessage)ioMsg.message();
return msg.cacheId() == CU.cacheId(cacheName);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
index 89fcf6b..29c2af6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
@@ -38,7 +38,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -197,10 +197,10 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest {
spi.blockMessages(new IgnitePredicate<GridIoMessage>() {
@Override public boolean apply(GridIoMessage ioMsg) {
- if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessageV2.class))
+ if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessage.class))
return false;
- GridDhtPartitionSupplyMessageV2 msg = (GridDhtPartitionSupplyMessageV2)ioMsg.message();
+ GridDhtPartitionSupplyMessage msg = (GridDhtPartitionSupplyMessage)ioMsg.message();
return msg.cacheId() == CU.cacheId(ccfg.getName());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index eda030c..dfb3f65 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
@@ -106,7 +105,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
@Override public boolean apply(GridIoMessage msg) {
Object msg0 = msg.message();
- return (msg0 instanceof GridDhtPartitionSupplyMessage || msg0 instanceof GridDhtPartitionSupplyMessageV2)
+ return (msg0 instanceof GridDhtPartitionSupplyMessage)
&& ((GridCacheMessage)msg0).cacheId() == CU.cacheId(TEST_CACHE);
}
});
[2/2] ignite git commit: Merge branch master into ignite-3477-master
Posted by ag...@apache.org.
Merge branch master into ignite-3477-master
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3eb05de5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3eb05de5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3eb05de5
Branch: refs/heads/ignite-3477-master
Commit: 3eb05de5e2ee0d14567167bfe8547441cae69523
Parents: 8122099 aeacad6
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Apr 11 13:16:03 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Apr 11 13:16:03 2017 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 8 +-
.../GridCachePartitionExchangeManager.java | 6 +-
.../processors/cache/GridCachePreloader.java | 4 +-
.../cache/GridCachePreloaderAdapter.java | 4 +-
.../dht/preloader/GridDhtPartitionDemander.java | 2 +-
.../dht/preloader/GridDhtPartitionSupplier.java | 8 +-
.../GridDhtPartitionSupplyMessage.java | 103 +++--
.../GridDhtPartitionSupplyMessageV2.java | 422 -------------------
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../resources/META-INF/classnames.properties | 1 -
.../cache/ClusterStateAbstractTest.java | 10 +-
.../CacheLateAffinityAssignmentTest.java | 6 +-
.../IgniteCacheReadFromBackupTest.java | 6 +-
.../atomic/IgniteCacheAtomicProtocolTest.java | 3 +-
14 files changed, 84 insertions(+), 501 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index b80ad04,7c2599a..f7f0aff
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@@ -332,10 -537,8 +334,10 @@@ class GridDhtPartitionSupplier
if (!reply(node, d, s, scId))
return;
- s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+ s = new GridDhtPartitionSupplyMessage(d.updateSequence(),
- cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
+ cctx.cacheId(),
+ d.topologyVersion(),
+ cctx.deploymentEnabled());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index cc30321,a01be28..ee461ab
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@@ -66,9 -64,9 +64,14 @@@ public class GridDhtPartitionSupplyMess
@GridDirectCollection(int.class)
private Collection<Integer> missed;
++ /** Partitions for which we were able to get historical iterator. */
++ @GridToStringInclude
++ @GridDirectCollection(int.class)
++ private Collection<Integer> clean;
++
/** Entries. */
@GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
- private Map<Integer, CacheEntryInfoCollection> infos = new HashMap<>();
+ private Map<Integer, CacheEntryInfoCollection> infos;
/** Message size. */
@GridDirectTransient
@@@ -159,6 -142,6 +147,25 @@@
}
/**
++ * @param p Partition to clean.
++ */
++ void clean(int p) {
++ if (clean == null)
++ clean = new HashSet<>();
++
++ if (clean.add(p))
++ msgSize += 4;
++ }
++
++ /**
++ * @param p Partition to check.
++ * @return Check result.
++ */
++ boolean isClean(int p) {
++ return clean != null && clean.contains(p);
++ }
++
++ /**
* @param p Missed partition.
*/
void missed(int p) {
@@@ -288,7 -274,7 +298,7 @@@
switch (writer.state()) {
case 3:
- if (!writer.writeBoolean("ack", ack))
- if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
++ if (!writer.writeCollection("clean", clean, MessageCollectionItemType.INT))
return false;
writer.incrementState();
@@@ -312,13 -298,7 +322,13 @@@
writer.incrementState();
case 7:
- if (!writer.writeLong("updateSeq", updateSeq))
++ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
- if (!writer.writeInt("workerId", workerId))
+ if (!writer.writeLong("updateSeq", updateSeq))
return false;
writer.incrementState();
@@@ -340,7 -320,7 +350,7 @@@
switch (reader.state()) {
case 3:
- ack = reader.readBoolean("ack");
- infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
++ clean = reader.readCollection("clean", MessageCollectionItemType.INT);
if (!reader.isLastRead())
return false;
@@@ -372,15 -352,7 +382,15 @@@
reader.incrementState();
case 7:
- updateSeq = reader.readLong("updateSeq");
++ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
- workerId = reader.readInt("workerId");
+ updateSeq = reader.readLong("updateSeq");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --cc modules/core/src/main/resources/META-INF/classnames.properties
index 473f176,8c5a72e..335a33f
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@@ -763,9 -740,9 +763,8 @@@ org.apache.ignite.internal.processors.c
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap
-org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$1
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$SupplyContextPhase
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage
- org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$1
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$2
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
index f095e79,0000000..ce7829a
mode 100644,000000..100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
@@@ -1,439 -1,0 +1,439 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
- import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
++import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+@SuppressWarnings("TooBroadScope")
+public abstract class ClusterStateAbstractTest extends GridCommonAbstractTest {
+ /** Entry count. */
+ public static final int ENTRY_CNT = 5000;
+
+ /** */
+ public static final int GRID_CNT = 4;
+
+ /** */
+ private static final String CACHE_NAME = "cache1";
+
+ /** */
+ private static final Collection<Class> forbidden = new GridConcurrentHashSet<>();
+
+ /** */
+ private static AtomicReference<Exception> errEncountered = new AtomicReference<>();
+
+ /** */
+ private boolean activeOnStart = true;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setActiveOnStart(activeOnStart);
+
+ cfg.setCacheConfiguration(cacheConfiguration(CACHE_NAME));
+
+ if (client)
+ cfg.setClientMode(true);
+
+ cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+ return cfg;
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return Cache configuration.
+ */
+ protected abstract CacheConfiguration cacheConfiguration(String cacheName);
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ forbidden.clear();
+
+ Exception err = errEncountered.getAndSet(null);
+
+ if (err != null)
+ throw err;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testDynamicCacheStart() throws Exception {
+ activeOnStart = false;
+
- forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++ forbidden.add(GridDhtPartitionSupplyMessage.class);
+ forbidden.add(GridDhtPartitionDemandMessage.class);
+
+ startGrids(GRID_CNT);
+
+ checkInactive(GRID_CNT);
+
+ forbidden.clear();
+
+ grid(0).active(true);
+
+ IgniteCache<Object, Object> cache2 = grid(0).createCache(new CacheConfiguration<>("cache2"));
+
+ for (int k = 0; k < ENTRY_CNT; k++)
+ cache2.put(k, k);
+
+ grid(0).active(false);
+
+ checkInactive(GRID_CNT);
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testNoRebalancing() throws Exception {
+ activeOnStart = false;
+
- forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++ forbidden.add(GridDhtPartitionSupplyMessage.class);
+ forbidden.add(GridDhtPartitionDemandMessage.class);
+
+ startGrids(GRID_CNT);
+
+ checkInactive(GRID_CNT);
+
+ forbidden.clear();
+
+ grid(0).active(true);
+
+ awaitPartitionMapExchange();
+
+ final IgniteCache<Object, Object> cache = grid(0).cache(CACHE_NAME);
+
+ for (int k = 0; k < ENTRY_CNT; k++)
+ cache.put(k, k);
+
+ for (int g = 0; g < GRID_CNT; g++) {
+ // Tests that state changes are propagated to existing and new nodes.
+ assertTrue(grid(g).active());
+
+ IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
+
+ for (int k = 0; k < ENTRY_CNT; k++)
+ assertEquals(k, cache0.get(k));
+ }
+
+ // Check that new node startup and shutdown works fine after activation.
+ startGrid(GRID_CNT);
+ startGrid(GRID_CNT + 1);
+
+ for (int g = 0; g < GRID_CNT + 2; g++) {
+ IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
+
+ for (int k = 0; k < ENTRY_CNT; k++)
+ assertEquals("Failed for [grid=" + g + ", key=" + k + ']', k, cache0.get(k));
+ }
+
+ stopGrid(GRID_CNT + 1);
+
+ for (int g = 0; g < GRID_CNT + 1; g++)
+ grid(g).cache(CACHE_NAME).rebalance().get();
+
+ stopGrid(GRID_CNT);
+
+ for (int g = 0; g < GRID_CNT; g++) {
+ IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
+
+ for (int k = 0; k < ENTRY_CNT; k++)
+ assertEquals(k, cache0.get(k));
+ }
+
+ grid(0).active(false);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ for (int g = 0; g < GRID_CNT; g++) {
+ if (grid(g).active())
+ return false;
+ }
+
+ return true;
+ }
+ }, 5000);
+
+ checkInactive(GRID_CNT);
+
- forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++ forbidden.add(GridDhtPartitionSupplyMessage.class);
+ forbidden.add(GridDhtPartitionDemandMessage.class);
+
+ // Should stop without exchange.
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testActivationFromClient() throws Exception {
- forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++ forbidden.add(GridDhtPartitionSupplyMessage.class);
+ forbidden.add(GridDhtPartitionDemandMessage.class);
+
+ activeOnStart = false;
+
+ startGrids(GRID_CNT);
+
+ client = true;
+
+ startGrid(GRID_CNT);
+
+ checkInactive(GRID_CNT + 1);
+
+ Ignite cl = grid(GRID_CNT);
+
+ forbidden.clear();
+
+ cl.active(true);
+
+ awaitPartitionMapExchange();
+
+ IgniteCache<Object, Object> cache = cl.cache(CACHE_NAME);
+
+ for (int k = 0; k < ENTRY_CNT; k++)
+ cache.put(k, k);
+
+ for (int g = 0; g < GRID_CNT + 1; g++) {
+ // Tests that state changes are propagated to existing and new nodes.
+ assertTrue(grid(g).active());
+
+ IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
+
+ for (int k = 0; k < ENTRY_CNT; k++)
+ assertEquals(k, cache0.get(k));
+ }
+
+ cl.active(false);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ for (int g = 0; g < GRID_CNT + 1; g++) {
+ if (grid(g).active())
+ return false;
+ }
+
+ return true;
+ }
+ }, 5000);
+
+ checkInactive(GRID_CNT + 1);
+ }
+
+ /**
+ * Tests that state doesn't change until all acquired locks are released.
+ *
+ * @throws Exception If fails.
+ */
+ public void testDeactivationWithPendingLock() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-4931");
+
+ startGrids(GRID_CNT);
+
+ final CountDownLatch finishedLatch = new CountDownLatch(1);
+
+ Lock lock = grid(0).cache(CACHE_NAME).lock(1);
+
+ IgniteInternalFuture<?> fut;
+
+ lock.lock();
+
+ try {
+ fut = multithreadedAsync(new Runnable() {
+ @Override public void run() {
+ grid(1).active(false);
+
+ finishedLatch.countDown();
+ }
+ }, 1);
+
+ U.sleep(2000);
+
+ assert !fut.isDone();
+
+ boolean hasActive = false;
+
+ for (int g = 0; g < GRID_CNT; g++) {
+ IgniteEx grid = grid(g);
+
+ if (grid.active()) {
+ hasActive = true;
+
+ break;
+ }
+
+ }
+
+ assertTrue(hasActive);
+ }
+ finally {
+ lock.unlock();
+ }
+
+ fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
+
+ checkInactive(GRID_CNT);
+
+ finishedLatch.await();
+ }
+
+ /**
+ * Tests that state doesn't change until all pending transactions are finished.
+ *
+ * @throws Exception If fails.
+ */
+ public void testDeactivationWithPendingTransaction() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-4931");
+
+ startGrids(GRID_CNT);
+
+ final CountDownLatch finishedLatch = new CountDownLatch(1);
+
+ final Ignite ignite0 = grid(0);
+
+ final IgniteCache<Object, Object> cache0 = ignite0.cache(CACHE_NAME);
+
+ IgniteInternalFuture<?> fut;
+
+ try (Transaction tx = ignite0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache0.get(1);
+
+ fut = multithreadedAsync(new Runnable() {
+ @Override public void run() {
+ ignite0.active(false);
+
+ finishedLatch.countDown();
+ }
+ }, 1);
+
+ U.sleep(2000);
+
+ assert !fut.isDone();
+
+ boolean hasActive = false;
+
+ for (int g = 0; g < GRID_CNT; g++) {
+ IgniteEx grid = grid(g);
+
+ if (grid.active()) {
+ hasActive = true;
+
+ break;
+ }
+
+ }
+
+ assertTrue(hasActive);
+
+ cache0.put(1, 2);
+
+ tx.commit();
+ }
+
+ fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
+
+ checkInactive(GRID_CNT);
+
+ ignite0.active(true);
+
+ for (int g = 0; g < GRID_CNT; g++)
+ assertEquals(2, grid(g).cache(CACHE_NAME).get(1));
+
+ finishedLatch.await();
+ }
+
+ /**
+ *
+ */
+ private void checkInactive(int cnt) {
+ for (int g = 0; g < cnt; g++)
+ assertFalse(grid(g).active());
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+ checkForbidden((GridIoMessage)msg);
+
+ super.sendMessage(node, msg, ackC);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ checkForbidden((GridIoMessage)msg);
+
+ super.sendMessage(node, msg);
+ }
+
+ /**
+ * @param msg Message to check.
+ */
+ private void checkForbidden(GridIoMessage msg) {
+ if (forbidden.contains(msg.message().getClass())) {
+ IgniteSpiException err = new IgniteSpiException("Message is forbidden for this test: " + msg.message());
+
+ // Set error in case if this exception is not visible to the user code.
+ errEncountered.compareAndSet(null, err);
+
+ throw err;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------