You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/24 08:58:21 UTC
[13/14] ignite git commit: Test for cache partitions state,
fix for client cache start.
Test for cache partitions state, fix for client cache start.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aeb9336b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aeb9336b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aeb9336b
Branch: refs/heads/ignite-5569-debug
Commit: aeb9336b3b161ddfff73f17e41cd453409b84a16
Parents: ca496f6
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 24 11:47:16 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 24 11:47:16 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 51 ++-
.../dht/GridClientPartitionTopology.java | 7 +-
.../dht/GridDhtPartitionTopology.java | 12 +-
.../dht/GridDhtPartitionTopologyImpl.java | 45 +-
.../GridDhtPartitionsExchangeFuture.java | 120 +++---
.../GridCacheDatabaseSharedManager.java | 6 +-
.../CacheLateAffinityAssignmentTest.java | 36 +-
.../distributed/CachePartitionStateTest.java | 410 +++++++++++++++++++
.../TestCacheNodeExcludingFilter.java | 53 +++
.../db/IgnitePdsCacheRestoreTest.java | 208 ++++++++++
.../testsuites/IgniteCacheTestSuite6.java | 38 ++
.../ignite/testsuites/IgnitePdsTestSuite.java | 3 +
12 files changed, 863 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 79ab183..f519b4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -517,6 +517,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
}
+ for (DynamicCacheDescriptor desc : startDescs) {
+ if (desc.cacheConfiguration().getCacheMode() != LOCAL) {
+ CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
+
+ assert grp != null;
+
+ grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topVer), true);
+ }
+ }
+
cctx.cache().initCacheProxies(topVer, null);
cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null);
@@ -1299,6 +1309,19 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
+ * @param grpId Group ID.
+ * @return Group name for debug purpose.
+ */
+ private String debugGroupName(int grpId) {
+ CacheGroupDescriptor desc = caches.group(grpId);
+
+ if (desc != null)
+ return desc.cacheOrGroupName();
+ else
+ return "Unknown group: " + grpId;
+ }
+
+ /**
* @param fut Exchange future.
* @throws IgniteCheckedException If failed.
*/
@@ -1396,19 +1419,31 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* Called on exchange initiated by server node leave.
*
* @param fut Exchange future.
+ * @param crd Coordinator flag.
* @throws IgniteCheckedException If failed.
* @return {@code True} if affinity should be assigned by coordinator.
*/
- public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
+ public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
ClusterNode leftNode = fut.discoveryEvent().eventNode();
assert !leftNode.isClient() : leftNode;
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (grp.isLocal())
- continue;
+ if (crd) {
+ // Need initialize CacheGroupHolders if this node become coordinator on this exchange.
+ forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
+ @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
+ CacheGroupHolder cache = groupHolder(fut.topologyVersion(), desc);
- grp.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+ cache.aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+ }
+ });
+ }
+ else {
+ forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+ @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
+ aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+ }
+ });
}
synchronized (mux) {
@@ -1433,12 +1468,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
@Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
CacheGroupHolder grpHolder = grpHolders.get(desc.groupId());
- if (grpHolder != null) {
- if (grpHolder.client()) // Affinity for non-client holders calculated in {@link #onServerLeft}.
- grpHolder.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
-
+ if (grpHolder != null)
return;
- }
// Need initialize holders and affinity if this node became coordinator during this exchange.
final Integer grpId = desc.groupId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index f4ed517..232ce38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -367,6 +367,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public GridDhtLocalPartition localPartition(int p) {
return localPartition(p, AffinityTopologyVersion.NONE, false);
}
@@ -830,7 +835,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public void onExchangeDone(AffinityAssignment assignment) {
+ @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 5f76d12..d9e04a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -130,6 +130,15 @@ public interface GridDhtPartitionTopology {
throws GridDhtInvalidPartitionException;
/**
+ * Unconditionally creates partition during restore of persisted partition state.
+ *
+ * @param p Partition ID.
+ * @return Partition.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException;
+
+ /**
* @param topVer Topology version at the time of creation.
* @param p Partition ID.
* @param create If {@code true}, then partition will be created if it's not there.
@@ -331,6 +340,7 @@ public interface GridDhtPartitionTopology {
* Callback on exchange done.
*
* @param assignment New affinity assignment.
+ * @param updateRebalanceVer {@code True} if need check rebalance state.
*/
- public void onExchangeDone(AffinityAssignment assignment);
+ public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 601da1b..5ef499c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -135,9 +135,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** */
private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
- /** */
- private volatile boolean treatAllPartAsLoc;
-
/**
* @param ctx Cache shared context.
* @param grp Cache group.
@@ -421,14 +418,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean affReady)
throws IgniteCheckedException {
- DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
- treatAllPartAsLoc = exchFut.activateCluster()
- || (discoEvt.type() == EventType.EVT_NODE_JOINED
- && discoEvt.eventNode().isLocal()
- && !ctx.kernalContext().clientNode()
- );
-
ClusterNode loc = ctx.localNode();
ctx.database().checkpointReadLock();
@@ -540,8 +529,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
- treatAllPartAsLoc = false;
-
boolean changed = false;
int num = grp.affinity().partitions();
@@ -692,6 +679,29 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
return loc;
}
+ /** {@inheritDoc} */
+ @Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException {
+ lock.writeLock().lock();
+
+ try {
+ GridDhtLocalPartition part = locParts.get(p);
+
+ if (part != null)
+ return part;
+
+ part = new GridDhtLocalPartition(ctx, grp, p);
+
+ locParts.set(p, part);
+
+ ctx.pageStore().onPartitionCreated(grp.groupId(), p);
+
+ return part;
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
/**
* @param p Partition number.
* @param topVer Topology version.
@@ -731,7 +741,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (loc != null && state == EVICTED) {
locParts.set(p, loc = null);
- if (!treatAllPartAsLoc && !belongs)
+ if (!belongs)
throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " +
"(often may be caused by inconsistent 'key.hashCode()' implementation) " +
"[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
@@ -741,7 +751,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
"[part=" + p + ", shouldBeMoving=" + loc.reload() + "]");
if (loc == null) {
- if (!treatAllPartAsLoc && !belongs)
+ if (!belongs)
throw new GridDhtInvalidPartitionException(p, "Creating partition which does not belong to " +
"local node (often may be caused by inconsistent 'key.hashCode()' implementation) " +
"[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
@@ -1499,12 +1509,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public void onExchangeDone(AffinityAssignment assignment) {
+ @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) {
lock.writeLock().lock();
try {
if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0)
rebuildDiff(assignment);
+
+ if (updateRebalanceVer)
+ updateRebalanceVersion(assignment.assignment());
}
finally {
lock.writeLock().unlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c4a4f83..cdb4bb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -193,9 +193,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** */
private CacheAffinityChangeMessage affChangeMsg;
- /** */
- private boolean clientOnlyExchange;
-
/** Init timestamp. Used to track the amount of time spent to complete the future. */
private long initTs;
@@ -485,26 +482,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
}
- else {
- cctx.activate();
-
- List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches =
- cctx.cache().cachesToStartOnLocalJoin();
-
- if (cctx.database().persistenceEnabled() &&
- !cctx.kernalContext().clientNode()) {
- List<DynamicCacheDescriptor> startDescs = new ArrayList<>();
-
- if (caches != null) {
- for (T2<DynamicCacheDescriptor, NearCacheConfiguration> c : caches)
- startDescs.add(c.get1());
- }
-
- cctx.database().readCheckpointAndRestoreMemory(startDescs);
- }
-
- cctx.cache().startCachesOnLocalJoin(caches, topVer);
- }
+ else
+ initCachesOnLocalJoin();
}
exchange = CU.clientNode(discoEvt.eventNode()) ?
@@ -571,6 +550,29 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @throws IgniteCheckedException If failed.
*/
+ private void initCachesOnLocalJoin() throws IgniteCheckedException {
+ cctx.activate();
+
+ List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches =
+ cctx.cache().cachesToStartOnLocalJoin();
+
+ if (cctx.database().persistenceEnabled() && !cctx.kernalContext().clientNode()) {
+ List<DynamicCacheDescriptor> startDescs = new ArrayList<>();
+
+ if (caches != null) {
+ for (T2<DynamicCacheDescriptor, NearCacheConfiguration> c : caches)
+ startDescs.add(c.get1());
+ }
+
+ cctx.database().readCheckpointAndRestoreMemory(startDescs);
+ }
+
+ cctx.cache().startCachesOnLocalJoin(caches, topologyVersion());
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
private void initTopologies() throws IgniteCheckedException {
cctx.database().checkpointReadLock();
@@ -776,7 +778,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
warnNoAffinityNodes();
- centralizedAff = cctx.affinity().onServerLeft(this);
+ centralizedAff = cctx.affinity().onServerLeft(this, crd);
}
else
cctx.affinity().onServerJoin(this, crd);
@@ -788,40 +790,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @throws IgniteCheckedException If failed.
*/
private void clientOnlyExchange() throws IgniteCheckedException {
- clientOnlyExchange = true;
-
if (crd != null) {
- if (crd.isLocal()) {
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- boolean updateTop = !grp.isLocal() &&
- exchId.topologyVersion().equals(grp.localStartVersion());
-
- if (updateTop) {
- for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
- if (top.groupId() == grp.groupId()) {
- GridDhtPartitionFullMap fullMap = top.partitionMap(true);
-
- assert fullMap != null;
-
- grp.topology().update(topologyVersion(),
- fullMap,
- top.updateCounters(false),
- Collections.<Integer>emptySet());
+ assert !crd.isLocal() : crd;
- break;
- }
- }
- }
- }
- }
- else {
- if (!centralizedAff)
- sendLocalPartitions(crd);
+ if (!centralizedAff)
+ sendLocalPartitions(crd);
- initDone();
+ initDone();
- return;
- }
+ return;
}
else {
if (centralizedAff) { // Last server node failed.
@@ -896,8 +873,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
try {
long start = U.currentTimeMillis();
- IgniteInternalFuture fut = cctx.snapshot()
- .tryStartLocalSnapshotOperation(discoEvt);
+ IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(discoEvt);
if (fut != null) {
fut.get();
@@ -1122,6 +1098,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException {
assert node != null;
+ GridDhtPartitionsSingleMessage msg;
+
// Reset lost partition before send local partition to coordinator.
if (exchActions != null) {
Set<String> caches = exchActions.cachesToResetLostPartitions();
@@ -1130,22 +1108,32 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
resetLostPartitions(caches);
}
- GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(
- node, exchangeId(), clientOnlyExchange, true);
+ if (cctx.kernalContext().clientNode()) {
+ msg = new GridDhtPartitionsSingleMessage(exchangeId(),
+ true,
+ null,
+ true);
+ }
+ else {
+ msg = cctx.exchange().createPartitionsSingleMessage(node,
+ exchangeId(),
+ false,
+ true);
+ }
Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
if (partHistReserved0 != null)
- m.partitionHistoryCounters(partHistReserved0);
+ msg.partitionHistoryCounters(partHistReserved0);
if (stateChangeExchange() && changeGlobalStateE != null)
- m.setError(changeGlobalStateE);
+ msg.setError(changeGlobalStateE);
if (log.isDebugEnabled())
- log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']');
+ log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']');
try {
- cctx.io().send(node, m, SYSTEM_POOL);
+ cctx.io().send(node, msg, SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException ignored) {
if (log.isDebugEnabled())
@@ -1318,7 +1306,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (err == null) {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (!grp.isLocal())
- grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()));
+ grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()), false);
}
}
@@ -1386,10 +1374,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
public void onReceive(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
assert msg != null;
assert msg.exchangeId().equals(exchId) : msg;
- assert msg.lastVersion() != null : msg;
- if (!msg.client())
+ if (!msg.client()) {
+ assert msg.lastVersion() != null : msg;
+
updateLastVersion(msg.lastVersion());
+ }
if (isDone()) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 39038ba..1797d64 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1560,8 +1560,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
T2<Integer, Long> fromWal = partStates.get(new T2<>(grpId, i));
- GridDhtLocalPartition part = grp.topology()
- .localPartition(i, AffinityTopologyVersion.NONE, true);
+ GridDhtLocalPartition part = grp.topology().forceCreatePartition(i);
assert part != null;
@@ -1621,8 +1620,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @param dataEntry Data entry to apply.
*/
private void applyUpdate(GridCacheContext cacheCtx, DataEntry dataEntry) throws IgniteCheckedException {
- GridDhtLocalPartition locPart = cacheCtx.topology()
- .localPartition(dataEntry.partitionId(), AffinityTopologyVersion.NONE, true);
+ GridDhtLocalPartition locPart = cacheCtx.topology().forceCreatePartition(dataEntry.partitionId());
switch (dataEntry.op()) {
case CREATE:
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 23043d1..7d8620a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -331,7 +331,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
};
- cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0)));
testAffinitySimpleSequentialStart();
@@ -351,7 +351,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
};
- cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(1)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1)));
startServer(0, 1);
@@ -391,7 +391,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
};
- cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2)));
startServer(0, 1);
startServer(1, 2);
@@ -439,7 +439,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
};
- cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0)));
Ignite ignite0 = startServer(0, 1);
@@ -467,7 +467,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
};
- cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0)));
Ignite ignite0 = startServer(0, 1);
@@ -520,7 +520,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
*/
private void cacheDestroyAndCreate(boolean cacheOnCrd) throws Exception {
if (!cacheOnCrd)
- cacheNodeFilter = new CacheNodeFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
startServer(0, 1);
@@ -2069,7 +2069,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
exclude.add("server-" + (srvIdx + rnd.nextInt(10)));
}
- ccfg.setNodeFilter(new CacheNodeFilter(exclude));
+ ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(exclude));
}
ccfg.setName(name);
@@ -2645,28 +2645,6 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
/**
*
*/
- static class CacheNodeFilter implements IgnitePredicate<ClusterNode> {
- /** */
- private Collection<String> excludeNodes;
-
- /**
- * @param excludeNodes Nodes names.
- */
- public CacheNodeFilter(Collection<String> excludeNodes) {
- this.excludeNodes = excludeNodes;
- }
-
- /** {@inheritDoc} */
- @Override public boolean apply(ClusterNode clusterNode) {
- String name = clusterNode.attribute(ATTR_IGNITE_INSTANCE_NAME).toString();
-
- return !excludeNodes.contains(name);
- }
- }
-
- /**
- *
- */
static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
/** */
private boolean blockCustomEvt;
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
new file mode 100644
index 0000000..c64ed0b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+
+/**
+ *
+ */
+public class CachePartitionStateTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** */
+ private CacheConfiguration ccfg;
+
+ /** {@inheritDoc} */
+ protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ cfg.setClientMode(client);
+
+ if (ccfg != null) {
+ cfg.setCacheConfiguration(ccfg);
+
+ ccfg = null;
+ }
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState1_1() throws Exception {
+ partitionState1(0, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState1_2() throws Exception {
+ partitionState1(1, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState1_2_NoCacheOnCoordinator() throws Exception {
+ partitionState1(1, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState1_3() throws Exception {
+ partitionState1(100, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState2_1() throws Exception {
+ partitionState2(0, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState2_2() throws Exception {
+ partitionState2(1, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState2_2_NoCacheOnCoordinator() throws Exception {
+ partitionState2(1, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState2_3() throws Exception {
+ partitionState2(100, true);
+ }
+
+ /**
+ * @param backups Number of backups.
+ * @param crdAffNode If {@code false} cache is not created on coordinator.
+ * @throws Exception If failed.
+ */
+ private void partitionState1(int backups, boolean crdAffNode) throws Exception {
+ startGrids(3);
+
+ blockSupplySend(DEFAULT_CACHE_NAME);
+
+ CacheConfiguration ccfg = cacheConfiguration(DEFAULT_CACHE_NAME, backups);
+
+ if (!crdAffNode)
+ ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0)));
+
+ ignite(1).createCache(ccfg);
+
+ AffinityAssignment assign0 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(3, 1));
+
+ awaitPartitionMapExchange();
+
+ checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ client = true;
+
+ Ignite clientNode = startGrid(4);
+
+ checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+ clientNode.cache(DEFAULT_CACHE_NAME);
+
+ checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ client = false;
+
+ startGrid(5);
+
+ checkRebalance(DEFAULT_CACHE_NAME, false);
+
+ for (int i = 0; i < 3; i++)
+ checkNodePartitions(assign0, ignite(i).cluster().localNode(), DEFAULT_CACHE_NAME, OWNING);
+
+ AffinityAssignment assign1 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(5, 0));
+
+ checkNodePartitions(assign1, ignite(5).cluster().localNode(), DEFAULT_CACHE_NAME, MOVING);
+
+ stopBlock();
+
+ awaitPartitionMapExchange();
+
+ AffinityAssignment assign2 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(5, 1));
+
+ checkPartitionsState(assign2, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ if (!crdAffNode)
+ ignite(0).cache(DEFAULT_CACHE_NAME);
+
+ checkPartitionsState(assign2, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ startGrid(6);
+
+ awaitPartitionMapExchange();
+
+ AffinityAssignment assign3 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(6, 1));
+
+ checkPartitionsState(assign3, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+ }
+
+ /**
+ * @param backups Number of backups.
+ * @param crdAffNode If {@code false} cache is not created on coordinator.
+ * @throws Exception If failed.
+ */
+ private void partitionState2(int backups, boolean crdAffNode) throws Exception {
+ startGrids(3);
+
+ blockSupplySend(DEFAULT_CACHE_NAME);
+
+ ccfg = cacheConfiguration(DEFAULT_CACHE_NAME, backups);
+
+ if (!crdAffNode)
+ ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0)));
+
+ startGrid(4);
+
+ AffinityAssignment assign0 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(4, 0));
+
+ checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ if (!crdAffNode)
+ ignite(0).cache(DEFAULT_CACHE_NAME);
+
+ checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ stopBlock();
+
+ startGrid(5);
+
+ AffinityAssignment assign1 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(5, 1));
+
+ awaitPartitionMapExchange();
+
+ checkPartitionsState(assign1, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+ }
+
+ /**
+ * @param assign Assignments.
+ * @param cacheName Cache name.
+ * @param expState Expected state.
+ */
+ private void checkPartitionsState(AffinityAssignment assign, String cacheName, GridDhtPartitionState expState) {
+ for (Ignite node : G.allGrids())
+ checkNodePartitions(assign, node.cluster().localNode(), cacheName, expState);
+ }
+
+ /**
+ * @param assign Assignments.
+ * @param clusterNode Node.
+ * @param cacheName Cache name.
+ * @param expState Expected partitions state.
+ */
+ private void checkNodePartitions(AffinityAssignment assign,
+ ClusterNode clusterNode,
+ String cacheName,
+ GridDhtPartitionState expState)
+ {
+ Affinity<Object> aff = ignite(0).affinity(cacheName);
+
+ Set<Integer> nodeParts = new HashSet<>();
+
+ nodeParts.addAll(assign.primaryPartitions(clusterNode.id()));
+ nodeParts.addAll(assign.backupPartitions(clusterNode.id()));
+
+ log.info("Test state [node=" + clusterNode.id() + ", parts=" + nodeParts.size() + ", state=" + expState + ']');
+
+ if (grid(0).context().discovery().cacheAffinityNode(clusterNode, cacheName))
+ assertFalse(nodeParts.isEmpty());
+
+ boolean check = false;
+
+ for (Ignite node : G.allGrids()) {
+ GridCacheAdapter cache =
+ ((IgniteKernal)node).context().cache().internalCache(cacheName);
+
+ if (cache != null) {
+ check = true;
+
+ GridDhtPartitionTopology top = cache.context().topology();
+
+ GridDhtPartitionMap partsMap = top.partitions(clusterNode.id());
+
+ for (int p = 0; p < aff.partitions(); p++) {
+ if (nodeParts.contains(p)) {
+ assertNotNull(partsMap);
+ assertEquals(expState, partsMap.get(p));
+ }
+ else {
+ if (partsMap != null) {
+ GridDhtPartitionState state = partsMap.get(p);
+
+ assertTrue("Unexpected state: " + state, state == null || state == EVICTED);
+ }
+ }
+ }
+ }
+ else {
+ assertEquals(0, aff.primaryPartitions(((IgniteKernal)node).localNode()).length);
+ assertEquals(0, aff.backupPartitions(((IgniteKernal)node).localNode()).length);
+ }
+ }
+
+ assertTrue(check);
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param expDone Expected rebalance finish flag.
+ */
+ private void checkRebalance(String cacheName, boolean expDone) {
+ for (Ignite node : G.allGrids()) {
+ IgniteKernal node0 = (IgniteKernal)node;
+
+ GridCacheAdapter cache = node0.context().cache().internalCache(cacheName);
+
+ AffinityTopologyVersion topVer = node0.context().cache().context().exchange().readyAffinityVersion();
+
+ if (cache != null)
+ assertEquals(expDone, cache.context().topology().rebalanceFinished(topVer));
+ else
+ node0.context().discovery().cacheAffinityNode(node0.localNode(), cacheName);
+ }
+ }
+
+ /**
+ * @param cacheName Cache name.
+ */
+ private void blockSupplySend(String cacheName) {
+ for (Ignite node : G.allGrids())
+ blockSupplySend(TestRecordingCommunicationSpi.spi(node), cacheName);
+ }
+
+ /**
+ * @param spi SPI.
+ * @param cacheName Cache name.
+ */
+ private void blockSupplySend(TestRecordingCommunicationSpi spi, final String cacheName) {
+ final int grpId = CU.cacheId(cacheName);
+
+ spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ return msg.getClass().equals(GridDhtPartitionSupplyMessage.class) &&
+ ((GridDhtPartitionSupplyMessage)msg).groupId() == grpId;
+ }
+ });
+ }
+
+ /**
+ *
+ */
+ private void stopBlock() {
+ for (Ignite node : G.allGrids())
+ TestRecordingCommunicationSpi.spi(node).stopBlock();
+ }
+
+ /**
+ * @param name Cache name.
+ * @param backups Backups number.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String name, int backups) {
+ CacheConfiguration ccfg = new CacheConfiguration(name);
+
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setBackups(backups);
+
+ return ccfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java
new file mode 100644
index 0000000..a3f7d27
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgnitePredicate;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+
+/**
+ *
+ */
+public class TestCacheNodeExcludingFilter implements IgnitePredicate<ClusterNode> {
+ /** */
+ private Collection<String> excludeNodes;
+
+ /**
+ * @param excludeNodes Nodes names.
+ */
+ public TestCacheNodeExcludingFilter(Collection<String> excludeNodes) {
+ this.excludeNodes = excludeNodes;
+ }
+ /**
+ * @param excludeNodes Nodes names.
+ */
+ public TestCacheNodeExcludingFilter(String... excludeNodes) {
+ this.excludeNodes = Arrays.asList(excludeNodes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode clusterNode) {
+ String name = clusterNode.attribute(ATTR_IGNITE_INSTANCE_NAME).toString();
+
+ return !excludeNodes.contains(name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
new file mode 100644
index 0000000..25626f4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgnitePdsCacheRestoreTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private CacheConfiguration[] ccfgs;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ if (ccfgs != null) {
+ cfg.setCacheConfiguration(ccfgs);
+
+ ccfgs = null;
+ }
+
+ MemoryConfiguration memCfg = new MemoryConfiguration();
+ memCfg.setPageSize(1024);
+ memCfg.setDefaultMemoryPolicySize(10 * 1024 * 1024);
+
+ cfg.setMemoryConfiguration(memCfg);
+
+ PersistentStoreConfiguration pCfg = new PersistentStoreConfiguration();
+
+ pCfg.setWalMode(WALMode.LOG_ONLY);
+
+ cfg.setPersistentStoreConfiguration(pCfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ GridTestUtils.deleteDbFiles();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ GridTestUtils.deleteDbFiles();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestoreAndNewCache1() throws Exception {
+ restoreAndNewCache(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestoreAndNewCache2() throws Exception {
+ restoreAndNewCache(true);
+ }
+
+ /**
+ * @param createNew If {@code true} need cache is added while node is stopped.
+ * @throws Exception If failed.
+ */
+ private void restoreAndNewCache(boolean createNew) throws Exception {
+ for (int i = 0; i < 3; i++) {
+ ccfgs = configurations1();
+
+ startGrid(i);
+ }
+
+ ignite(0).active(true);
+
+ IgniteCache<Object, Object> cache1 = ignite(2).cache("c1");
+
+ List<Integer> keys = primaryKeys(cache1, 10);
+
+ for (Integer key : keys)
+ cache1.put(key, key);
+
+ stopGrid(2);
+
+ if (createNew) {
+ // New cache is added when node is stopped.
+ ignite(0).getOrCreateCaches(Arrays.asList(configurations2()));
+ }
+ else {
+ // New cache is added on node restart.
+ ccfgs = configurations2();
+ }
+
+ startGrid(2);
+
+ cache1 = ignite(2).cache("c1");
+
+ IgniteCache<Object, Object> cache2 = ignite(2).cache("c2");
+
+ for (Integer key : keys) {
+ assertEquals(key, cache1.get(key));
+
+ assertNull(cache2.get(key));
+
+ cache2.put(key, key);
+
+ assertEquals(key, cache2.get(key));
+ }
+
+ List<Integer> nearKeys = nearKeys(cache1, 10, 0);
+
+ for (Integer key : nearKeys) {
+ assertNull(cache1.get(key));
+ assertNull(cache2.get(key));
+
+ cache2.put(key, key);
+ assertEquals(key, cache2.get(key));
+
+ cache1.put(key, key);
+ assertEquals(key, cache1.get(key));
+ }
+
+ startGrid(3);
+
+ awaitPartitionMapExchange();
+
+ for (Integer key : nearKeys) {
+ assertEquals(key, cache2.get(key));
+
+ assertEquals(key, cache1.get(key));
+ }
+ }
+
+ /**
+ * @return Configurations set 1.
+ */
+ private CacheConfiguration[] configurations1() {
+ CacheConfiguration[] ccfgs = new CacheConfiguration[1];
+
+ ccfgs[0] = cacheConfiguration("c1");
+
+ return ccfgs;
+ }
+
+ /**
+ * @return Configurations set 1.
+ */
+ private CacheConfiguration[] configurations2() {
+ CacheConfiguration[] ccfgs = new CacheConfiguration[2];
+
+ ccfgs[0] = cacheConfiguration("c1");
+ ccfgs[1] = cacheConfiguration("c2");
+
+ return ccfgs;
+ }
+
+ /**
+ * @param name Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String name) {
+ CacheConfiguration ccfg = new CacheConfiguration(name);
+
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ return ccfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
new file mode 100644
index 0000000..bb32d24
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
+
+/**
+ * Test suite.
+ */
+public class IgniteCacheTestSuite6 extends TestSuite {
+ /**
+ * @return IgniteCache test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("IgniteCache Test Suite part 6");
+
+ suite.addTestSuite(CachePartitionStateTest.class);
+
+ return suite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 5b562c3..5762c02 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactiva
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsClientNearCachePutGetTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDynamicCacheTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSingleNodePutGetPersistenceTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheRestoreTest;
import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCheckpointSimulationWithRealCpDisabledTest;
import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsEvictionTest;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.BPlusTreePageMemoryImplTest;
@@ -74,6 +75,8 @@ public class IgnitePdsTestSuite extends TestSuite {
suite.addTestSuite(IgniteClusterActivateDeactivateTestWithPersistence.class);
+ suite.addTestSuite(IgnitePdsCacheRestoreTest.class);
+
return suite;
}
}