You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/11/16 13:39:03 UTC
[05/13] ignite git commit: IGNITE-10207 Fixed missed loss policy
checks - Fixes #5360.
IGNITE-10207 Fixed missed loss policy checks - Fixes #5360.
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e4760980
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e4760980
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e4760980
Branch: refs/heads/ignite-10043
Commit: e4760980ab6077ee398965f551fdf8302820ae0e
Parents: 665aa95
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Thu Nov 15 19:15:13 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Thu Nov 15 19:15:13 2018 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheContext.java | 19 +-
.../cache/distributed/dht/GridDhtGetFuture.java | 40 +-
.../distributed/dht/GridDhtGetSingleFuture.java | 41 +-
.../dht/GridDhtTopologyFutureAdapter.java | 233 ++++---
.../dht/GridPartitionedGetFuture.java | 2 +-
.../dht/GridPartitionedSingleGetFuture.java | 9 +-
.../GridDhtPartitionsExchangeFuture.java | 4 +-
.../distributed/near/GridNearGetFuture.java | 2 +-
...CacheResultIsNotNullOnPartitionLossTest.java | 23 +-
.../IgniteCachePartitionLossPolicySelfTest.java | 652 +++++++------------
...ndexingCachePartitionLossPolicySelfTest.java | 2 +-
11 files changed, 483 insertions(+), 544 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 44d067c..53c0bf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2241,9 +2241,14 @@ public class GridCacheContext<K, V> implements Externalizable {
*
* @param affNodes All affinity nodes.
* @param canRemap Flag indicating that 'get' should be done on a locked topology version.
+ * @param partitionId Partition ID.
* @return Affinity node to get key from or {@code null} if there is no suitable alive node.
*/
- @Nullable public ClusterNode selectAffinityNodeBalanced(List<ClusterNode> affNodes, boolean canRemap) {
+ @Nullable public ClusterNode selectAffinityNodeBalanced(
+ List<ClusterNode> affNodes,
+ int partitionId,
+ boolean canRemap
+ ) {
if (!readLoadBalancingEnabled) {
if (!canRemap) {
for (ClusterNode node : affNodes) {
@@ -2267,7 +2272,7 @@ public class GridCacheContext<K, V> implements Externalizable {
ClusterNode n0 = null;
for (ClusterNode node : affNodes) {
- if (canRemap || discovery().alive(node)) {
+ if ((canRemap || discovery().alive(node) && isOwner(node, partitionId))) {
if (locMacs.equals(node.attribute(ATTR_MACS)))
return node;
@@ -2282,6 +2287,16 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * Check that node is owner for partition.
+ * @param node Cluster node.
+ * @param partitionId Partition ID.
+ * @return {@code}
+ */
+ private boolean isOwner(ClusterNode node, int partitionId) {
+ return topology().partitionState(node.id(), partitionId) == OWNING;
+ }
+
+ /**
* Prepare affinity field for builder (if possible).
*
* @param buider Builder.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 024e262..96d1769 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.ReaderArguments;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.LostPolicyValidator;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
@@ -55,6 +56,11 @@ import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static java.util.Collections.singleton;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.READ;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+
/**
*
*/
@@ -185,6 +191,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
* Initializes future.
*/
void init() {
+ // TODO get rid of force keys request https://issues.apache.org/jira/browse/IGNITE-10251
GridDhtFuture<Object> fut = cctx.group().preloader().request(cctx, keys.keySet(), topVer);
if (fut != null) {
@@ -209,14 +216,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
return;
}
- map0(keys);
+ map0(keys, true);
markInitialized();
}
});
}
else {
- map0(keys);
+ map0(keys, false);
markInitialized();
}
@@ -257,7 +264,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
/**
* @param keys Keys to map.
*/
- private void map0(Map<KeyCacheObject, Boolean> keys) {
+ private void map0(Map<KeyCacheObject, Boolean> keys, boolean forceKeys) {
Map<KeyCacheObject, Boolean> mappedKeys = null;
// Assign keys to primary nodes.
@@ -265,7 +272,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
int part = cctx.affinity().partition(key.getKey());
if (retries == null || !retries.contains(part)) {
- if (!map(key.getKey())) {
+ if (!map(key.getKey(), forceKeys)) {
if (retries == null)
retries = new HashSet<>();
@@ -309,7 +316,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
* @param key Key.
* @return {@code True} if mapped.
*/
- private boolean map(KeyCacheObject key) {
+ private boolean map(KeyCacheObject key, boolean forceKeys) {
try {
int keyPart = cctx.affinity().partition(key);
@@ -320,14 +327,31 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
if (part == null)
return false;
+ if (!forceKeys && part.state() == LOST && !recovery) {
+ Throwable error = LostPolicyValidator.validate(cctx, key, READ, singleton(part.id()));
+
+ if (error != null) {
+ onDone(null, error);
+
+ return false;
+ }
+ }
+
if (parts == null || !F.contains(parts, part.id())) {
// By reserving, we make sure that partition won't be unloaded while processed.
if (part.reserve()) {
- parts = parts == null ? new int[1] : Arrays.copyOf(parts, parts.length + 1);
+ if (forceKeys || (part.state() == OWNING || part.state() == LOST)) {
+ parts = parts == null ? new int[1] : Arrays.copyOf(parts, parts.length + 1);
- parts[parts.length - 1] = part.id();
+ parts[parts.length - 1] = part.id();
- return true;
+ return true;
+ }
+ else {
+ part.release();
+
+ return false;
+ }
}
else
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index 88f6848..e0fe8be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.ReaderArguments;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.LostPolicyValidator;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
@@ -47,6 +48,11 @@ import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static java.util.Collections.singleton;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.READ;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+
/**
*
*/
@@ -207,6 +213,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
*
*/
private void map() {
+ // TODO get rid of force keys request https://issues.apache.org/jira/browse/IGNITE-10251
if (cctx.group().preloader().needForceKeys()) {
GridDhtFuture<Object> fut = cctx.group().preloader().request(
cctx,
@@ -240,7 +247,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
onDone(e);
}
else
- map0();
+ map0(true);
}
}
);
@@ -249,19 +256,20 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
}
}
- map0();
+ map0(false);
}
/**
*
*/
- private void map0() {
+ private void map0(boolean forceKeys) {
assert retry == null : retry;
- if (!map(key)) {
+ if (!map(key, forceKeys)) {
retry = cctx.affinity().partition(key);
- onDone((GridCacheEntryInfo)null);
+ if (!isDone())
+ onDone((GridCacheEntryInfo)null);
return;
}
@@ -278,7 +286,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
* @param key Key.
* @return {@code True} if mapped.
*/
- private boolean map(KeyCacheObject key) {
+ private boolean map(KeyCacheObject key, boolean forceKeys) {
try {
int keyPart = cctx.affinity().partition(key);
@@ -291,11 +299,28 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
assert this.part == -1;
+ if (!forceKeys && part.state() == LOST && !recovery) {
+ Throwable error = LostPolicyValidator.validate(cctx, key, READ, singleton(part.id()));
+
+ if (error != null) {
+ onDone(null, error);
+
+ return false;
+ }
+ }
+
// By reserving, we make sure that partition won't be unloaded while processed.
if (part.reserve()) {
- this.part = part.id();
+ if (forceKeys || (part.state() == OWNING || part.state() == LOST)) {
+ this.part = part.id();
+
+ return true;
+ }
+ else {
+ part.release();
- return true;
+ return false;
+ }
}
else
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
index 9214308..8a6a5ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
@@ -29,12 +29,13 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_ALL;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
-import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_ALL;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.WRITE;
/**
*
@@ -42,7 +43,7 @@ import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<AffinityTopologyVersion>
implements GridDhtTopologyFuture {
/** Cache groups validation results. */
- protected volatile Map<Integer, CacheValidation> grpValidRes;
+ protected volatile Map<Integer, CacheGroupValidation> grpValidRes = Collections.emptyMap();
/** Whether or not cluster is active. */
protected volatile boolean clusterIsActive = true;
@@ -52,7 +53,7 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
* @param topNodes Topology nodes.
* @return Validation result.
*/
- protected final CacheValidation validateCacheGroup(CacheGroupContext grp, Collection<ClusterNode> topNodes) {
+ protected final CacheGroupValidation validateCacheGroup(CacheGroupContext grp, Collection<ClusterNode> topNodes) {
Collection<Integer> lostParts = grp.isLocal() ?
Collections.<Integer>emptyList() : grp.topology().lostPartitions();
@@ -65,11 +66,11 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
valid = validator.validate(topNodes);
}
- return new CacheValidation(valid, lostParts);
+ return new CacheGroupValidation(valid, lostParts);
}
/** {@inheritDoc} */
- @Nullable @Override public final Throwable validateCache(
+ @Override public final @Nullable Throwable validateCache(
GridCacheContext cctx,
boolean recovery,
boolean read,
@@ -87,115 +88,181 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
return new CacheInvalidStateException(
"Failed to perform cache operation (cluster is not activated): " + cctx.name());
+ OperationType opType = read ? OperationType.READ : WRITE;
+
CacheGroupContext grp = cctx.group();
- PartitionLossPolicy partLossPlc = grp.config().getPartitionLossPolicy();
+ PartitionLossPolicy lossPlc = grp.config().getPartitionLossPolicy();
+
+ if (cctx.shared().readOnlyMode() && opType == WRITE)
+ return new IgniteCheckedException("Failed to perform cache operation (cluster is in read only mode)");
if (grp.needsRecovery() && !recovery) {
- if (!read && (partLossPlc == READ_ONLY_SAFE || partLossPlc == READ_ONLY_ALL))
- return new IgniteCheckedException("Failed to write to cache (cache is moved to a read-only state): " +
- cctx.name());
+ if (opType == WRITE && (lossPlc == READ_ONLY_SAFE || lossPlc == READ_ONLY_ALL))
+ return new IgniteCheckedException(
+ "Failed to write to cache (cache is moved to a read-only state): " + cctx.name());
}
- if (cctx.shared().readOnlyMode() && !read)
- return new IgniteCheckedException("Failed to perform cache operation (cluster is in read only mode)" );
+ CacheGroupValidation validation = grpValidRes.get(grp.groupId());
- if (grp.needsRecovery() || grp.topologyValidator() != null) {
- CacheValidation validation = grpValidRes.get(grp.groupId());
+ if (validation == null)
+ return null;
- if (validation == null)
- return null;
+ if (opType == WRITE && !validation.isValid()) {
+ return new IgniteCheckedException("Failed to perform cache operation " +
+ "(cache topology is not valid): " + cctx.name());
+ }
- if (!validation.valid && !read)
- return new IgniteCheckedException("Failed to perform cache operation " +
- "(cache topology is not valid): " + cctx.name());
+ if (recovery)
+ return null;
- if (recovery || !grp.needsRecovery())
- return null;
+ if (validation.hasLostPartitions()) {
+ if (key != null)
+ return LostPolicyValidator.validate(cctx, key, opType, validation.lostPartitions());
- if (key != null) {
- int p = cctx.affinity().partition(key);
+ if (keys != null)
+ return LostPolicyValidator.validate(cctx, keys, opType, validation.lostPartitions());
+ }
- CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, key, p,
- validation.lostParts, partLossPlc);
+ return null;
+ }
- if (ex != null)
- return ex;
- }
+ /**
+ * Cache group validation result.
+ */
+ protected static class CacheGroupValidation {
+ /** Topology validation result. */
+ private final boolean valid;
- if (keys != null) {
- for (Object k : keys) {
- int p = cctx.affinity().partition(k);
+ /** Lost partitions on this topology version. */
+ private final Collection<Integer> lostParts;
- CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, k, p,
- validation.lostParts, partLossPlc);
+ /**
+ * @param valid Valid flag.
+ * @param lostParts Lost partitions.
+ */
+ private CacheGroupValidation(boolean valid, Collection<Integer> lostParts) {
+ this.valid = valid;
+ this.lostParts = lostParts;
+ }
- if (ex != null)
- return ex;
- }
- }
+ /**
+ * @return True if valid, False if invalide.
+ */
+ public boolean isValid() {
+ return valid;
}
- return null;
+ /**
+ * @return True if lost partition is present, False if not.
+ */
+ public boolean hasLostPartitions() {
+ return !F.isEmpty(lostParts);
+ }
+
+ /**
+ * @return Lost patition ID collection.
+ */
+ public Collection<Integer> lostPartitions() {
+ return lostParts;
+ }
}
/**
- * @param cacheName Cache name.
- * @param read Read flag.
- * @param key Key to check.
- * @param part Partition this key belongs to.
- * @param lostParts Collection of lost partitions.
- * @param plc Partition loss policy.
- * @return Invalid state exception if this operation is disallowed.
+ *
*/
- private CacheInvalidStateException validatePartitionOperation(
- String cacheName,
- boolean read,
- Object key,
- int part,
- Collection<Integer> lostParts,
- PartitionLossPolicy plc
- ) {
- if (lostParts.contains(part)) {
- if (!read) {
- assert plc == READ_WRITE_ALL || plc == READ_WRITE_SAFE;
+ public enum OperationType {
+ /**
+ * Read operation.
+ */
+ READ,
+ /**
+ * Write operation.
+ */
+ WRITE
+ }
+
+ /**
+ * Lost policy validator.
+ */
+ public static class LostPolicyValidator {
+ /**
+ *
+ */
+ public static Throwable validate(
+ GridCacheContext cctx,
+ Object key,
+ OperationType opType,
+ Collection<Integer> lostParts
+ ) {
+ CacheGroupContext grp = cctx.group();
+
+ PartitionLossPolicy lostPlc = grp.config().getPartitionLossPolicy();
+
+ int partition = cctx.affinity().partition(key);
+
+ return validate(cctx, key, partition, opType, lostPlc, lostParts);
+ }
+
+ /**
+ *
+ */
+ public static Throwable validate(
+ GridCacheContext cctx,
+ Collection<?> keys,
+ OperationType opType,
+ Collection<Integer> lostParts
+ ) {
+ CacheGroupContext grp = cctx.group();
- if (plc == READ_WRITE_SAFE) {
+ PartitionLossPolicy lostPlc = grp.config().getPartitionLossPolicy();
+
+ for (Object key : keys) {
+ int partition = cctx.affinity().partition(key);
+
+ Throwable res = validate(cctx, key, partition, opType, lostPlc, lostParts);
+
+ if (res != null)
+ return res;
+ }
+
+ return null;
+ }
+
+ /**
+ *
+ */
+ private static Throwable validate(
+ GridCacheContext cctx,
+ Object key,
+ int partition,
+ OperationType opType,
+ PartitionLossPolicy lostPlc,
+ Collection<Integer> lostParts
+ ) {
+ if (opType == WRITE) {
+ if (lostPlc == READ_ONLY_SAFE || lostPlc == READ_ONLY_ALL) {
+ return new IgniteCheckedException(
+ "Failed to write to cache (cache is moved to a read-only state): " + cctx.name()
+ );
+ }
+
+ if (lostParts.contains(partition) && lostPlc == READ_WRITE_SAFE) {
return new CacheInvalidStateException("Failed to execute cache operation " +
"(all partition owners have left the grid, partition data has been lost) [" +
- "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']');
+ "cacheName=" + cctx.name() + ", part=" + partition + ", key=" + key + ']');
}
}
- else {
- // Read.
- if (plc == READ_ONLY_SAFE || plc == READ_WRITE_SAFE)
+
+ if (opType == OperationType.READ) {
+ if (lostParts.contains(partition) && (lostPlc == READ_ONLY_SAFE || lostPlc == READ_WRITE_SAFE))
return new CacheInvalidStateException("Failed to execute cache operation " +
"(all partition owners have left the grid, partition data has been lost) [" +
- "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']');
+ "cacheName=" + cctx.name() + ", part=" + partition + ", key=" + key + ']'
+ );
}
- }
-
- return null;
- }
-
- /**
- * Cache validation result.
- */
- protected static class CacheValidation {
- /** Topology validation result. */
- private boolean valid;
- /** Lost partitions on this topology version. */
- private Collection<Integer> lostParts;
-
- /**
- * @param valid Valid flag.
- * @param lostParts Lost partitions.
- */
- private CacheValidation(boolean valid, Collection<Integer> lostParts) {
- this.valid = valid;
- this.lostParts = lostParts;
+ return null;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 8725e05..2fcd677 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -483,7 +483,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
}
}
- ClusterNode node = cctx.selectAffinityNodeBalanced(affNodes, canRemap);
+ ClusterNode node = cctx.selectAffinityNodeBalanced(affNodes, part, canRemap);
if (node == null) {
onDone(serverNotFoundError(part, topVer));
http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index ad716e6..4d0e129 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -367,7 +367,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
}
}
- ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, canRemap);
+ ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, part, canRemap);
if (affNode == null) {
onDone(serverNotFoundError(part, topVer));
@@ -775,6 +775,13 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
private void remap(final AffinityTopologyVersion topVer) {
cctx.closures().runLocalSafe(new Runnable() {
@Override public void run() {
+ GridDhtTopologyFuture lastFut = cctx.shared().exchange().lastFinishedFuture();
+
+ Throwable error = lastFut.validateCache(cctx, recovery, true, key, null);
+
+ if (error != null)
+ onDone(error);
+
map(topVer);
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3702a51..c8471c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2077,10 +2077,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
- if (serverNodeDiscoveryEvent())
+ if (serverNodeDiscoveryEvent() || localJoinExchange())
detectLostPartitions(res);
- Map<Integer, CacheValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
+ Map<Integer, CacheGroupValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
for (CacheGroupContext grp : cctx.cache().cacheGroups())
m.put(grp.groupId(), validateCacheGroup(grp, events().lastEvent().topologyNodes()));
http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 0350e1a..54c3cae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -494,7 +494,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
}
}
- ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, canRemap);
+ ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, part, canRemap);
if (affNode == null) {
onDone(serverNotFoundError(part, topVer));
http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java
index ceafc9e..0958f83 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java
@@ -17,15 +17,21 @@
package org.apache.ignite.internal.processors.cache.distributed;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.PartitionLossPolicy;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.EventType;
@@ -48,13 +54,13 @@ public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractT
private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** Number of servers to be started. */
- private static final int SERVERS = 10;
+ private static final int SERVERS = 5;
/** Index of node that is goning to be the only client node. */
private static final int CLIENT_IDX = SERVERS;
/** Number of cache entries to insert into the test cache. */
- private static final int CACHE_ENTRIES_CNT = 10_000;
+ private static final int CACHE_ENTRIES_CNT = 60;
/** True if {@link #getConfiguration(String)} is expected to configure client node on next invocations. */
private boolean isClient;
@@ -75,6 +81,7 @@ public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractT
.setCacheMode(CacheMode.PARTITIONED)
.setBackups(0)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setAffinity(new RendezvousAffinityFunction(false, 50))
.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE)
);
@@ -90,7 +97,12 @@ public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractT
cleanPersistenceDir();
- startGrids(SERVERS);
+ List<Integer> list = IntStream.range(0, SERVERS).boxed().collect(Collectors.toList());
+
+ Collections.shuffle(list);
+
+ for (Integer i : list)
+ startGrid(i);
isClient = true;
@@ -178,9 +190,9 @@ public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractT
readerThreadStarted.await(1, TimeUnit.SECONDS);
for (int i = 0; i < SERVERS - 1; i++) {
- Thread.sleep(50L);
-
grid(i).close();
+
+ Thread.sleep(400L);
}
}
finally {
@@ -204,6 +216,7 @@ public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractT
private boolean expectedThrowableClass(Throwable throwable) {
return X.hasCause(
throwable,
+ IgniteClientDisconnectedException.class,
CacheInvalidStateException.class,
ClusterTopologyCheckedException.class,
IllegalStateException.class,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
index caf0829..f02563d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
@@ -18,14 +18,11 @@
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@@ -39,8 +36,6 @@ import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.events.Event;
@@ -48,7 +43,7 @@ import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestDelayingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.util.typedef.F;
@@ -58,18 +53,19 @@ import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import static java.util.Arrays.asList;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
/**
*
*/
public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTest {
/** */
- private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
private boolean client;
@@ -78,43 +74,36 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
private PartitionLossPolicy partLossPlc;
/** */
- protected static final String CACHE_NAME = "partitioned";
+ private int backups;
/** */
- private int backups = 0;
+ private final AtomicBoolean delayPartExchange = new AtomicBoolean();
/** */
- private final AtomicBoolean delayPartExchange = new AtomicBoolean(false);
-
- /** */
- private final TopologyChanger killSingleNode = new TopologyChanger(false, Collections.singletonList(3), Arrays.asList(0, 1, 2, 4), 0);
-
- /** */
- private boolean isPersistenceEnabled;
+ private final TopologyChanger killSingleNode = new TopologyChanger(
+ false, asList(3), asList(0, 1, 2, 4), 0
+ );
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
cfg.setCommunicationSpi(new TestDelayingCommunicationSpi() {
+ /** {@inheritDoc} */
@Override protected boolean delayMessage(Message msg, GridIoMessage ioMsg) {
- return delayPartExchange.get() && (msg instanceof GridDhtPartitionsFullMessage || msg instanceof GridDhtPartitionsAbstractMessage);
+ return delayPartExchange.get() &&
+ (msg instanceof GridDhtPartitionsFullMessage || msg instanceof GridDhtPartitionsAbstractMessage);
}
- @Override protected int delayMillis() {
- return 250;
- }
});
cfg.setClientMode(client);
- cfg.setCacheConfiguration(cacheConfiguration());
+ cfg.setConsistentId(gridName);
- cfg.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration(
- new DataRegionConfiguration().setPersistenceEnabled(isPersistenceEnabled)
- ));
+ cfg.setCacheConfiguration(cacheConfiguration());
return cfg;
}
@@ -123,7 +112,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
* @return Cache configuration.
*/
protected CacheConfiguration<Integer, Integer> cacheConfiguration() {
- CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(CACHE_NAME);
+ CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
cacheCfg.setCacheMode(PARTITIONED);
cacheCfg.setBackups(backups);
@@ -135,44 +124,27 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
}
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- delayPartExchange.set(false);
-
- partLossPlc = PartitionLossPolicy.IGNORE;
-
- backups = 0;
-
- isPersistenceEnabled = false;
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
}
/** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
cleanPersistenceDir();
- super.afterTest();
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testReadOnlySafe() throws Exception {
- partLossPlc = PartitionLossPolicy.READ_ONLY_SAFE;
+ delayPartExchange.set(false);
- checkLostPartition(false, true, killSingleNode);
+ backups = 0;
}
/**
* @throws Exception if failed.
*/
- public void testReadOnlySafeWithPersistence() throws Exception {
+ public void testReadOnlySafe() throws Exception {
partLossPlc = PartitionLossPolicy.READ_ONLY_SAFE;
- isPersistenceEnabled = true;
-
checkLostPartition(false, true, killSingleNode);
}
@@ -188,19 +160,6 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
/**
* @throws Exception if failed.
*/
- public void testReadOnlyAllWithPersistence() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-10041");
-
- partLossPlc = PartitionLossPolicy.READ_ONLY_ALL;
-
- isPersistenceEnabled = true;
-
- checkLostPartition(false, false, killSingleNode);
- }
-
- /**
- * @throws Exception if failed.
- */
public void testReadWriteSafe() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
@@ -210,17 +169,6 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
/**
* @throws Exception if failed.
*/
- public void testReadWriteSafeWithPersistence() throws Exception {
- partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
- isPersistenceEnabled = true;
-
- checkLostPartition(true, true, killSingleNode);
- }
-
- /**
- * @throws Exception if failed.
- */
public void testReadWriteAll() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_ALL;
@@ -230,34 +178,10 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
/**
* @throws Exception if failed.
*/
- public void testReadWriteAllWithPersistence() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-10041");
-
- partLossPlc = PartitionLossPolicy.READ_WRITE_ALL;
-
- isPersistenceEnabled = true;
-
- checkLostPartition(true, false, killSingleNode);
- }
-
- /**
- * @throws Exception if failed.
- */
public void testReadWriteSafeAfterKillTwoNodes() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
- checkLostPartition(true, true, new TopologyChanger(false, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0));
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testReadWriteSafeAfterKillTwoNodesWithPersistence() throws Exception {
- partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
- isPersistenceEnabled = true;
-
- checkLostPartition(true, true, new TopologyChanger(false, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0));
+ checkLostPartition(true, true, new TopologyChanger(false, asList(3, 2), asList(0, 1, 4), 0));
}
/**
@@ -266,18 +190,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
public void testReadWriteSafeAfterKillTwoNodesWithDelay() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
- checkLostPartition(true, true, new TopologyChanger(false, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 20));
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testReadWriteSafeAfterKillTwoNodesWithDelayWithPersistence() throws Exception {
- partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
- isPersistenceEnabled = true;
-
- checkLostPartition(true, true, new TopologyChanger(false, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 20));
+ checkLostPartition(true, true, new TopologyChanger(false, asList(3, 2), asList(0, 1, 4), 20));
}
/**
@@ -288,22 +201,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
backups = 1;
- checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 2, 1), Arrays.asList(0, 4), 0));
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testReadWriteSafeWithBackupsAfterKillThreeNodesWithPersistence() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-10043");
-
- partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
- backups = 1;
-
- isPersistenceEnabled = true;
-
- checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 2, 1), Arrays.asList(0, 4), 0));
+ checkLostPartition(true, true, new TopologyChanger(true, asList(3, 2, 1), asList(0, 4), 0));
}
/**
@@ -312,18 +210,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
public void testReadWriteSafeAfterKillCrd() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
- checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0));
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testReadWriteSafeAfterKillCrdWithPersistence() throws Exception {
- partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
- isPersistenceEnabled = true;
-
- checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0));
+ checkLostPartition(true, true, new TopologyChanger(true, asList(3, 0), asList(1, 2, 4), 0));
}
/**
@@ -334,20 +221,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
backups = 1;
- checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0));
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testReadWriteSafeWithBackupsWithPersistence() throws Exception {
- partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
- backups = 1;
-
- isPersistenceEnabled = true;
-
- checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0));
+ checkLostPartition(true, true, new TopologyChanger(true, asList(3, 2), asList(0, 1, 4), 0));
}
/**
@@ -358,95 +232,26 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
backups = 1;
- checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0));
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testReadWriteSafeWithBackupsAfterKillCrdWithPersistence() throws Exception {
- partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
- backups = 1;
-
- isPersistenceEnabled = true;
-
- checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0));
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testIgnore() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-5078");
-
- partLossPlc = PartitionLossPolicy.IGNORE;
-
- checkIgnore(killSingleNode);
+ checkLostPartition(true, true, new TopologyChanger(true, asList(3, 0), asList(1, 2, 4), 0));
}
/**
+ * @param topChanger topology changer.
* @throws Exception if failed.
*/
- public void testIgnoreWithPersistence() throws Exception {
+ public void testIgnore(TopologyChanger topChanger) throws Exception {
fail("https://issues.apache.org/jira/browse/IGNITE-5078");
- fail("https://issues.apache.org/jira/browse/IGNITE-10041");
-
- partLossPlc = PartitionLossPolicy.IGNORE;
-
- isPersistenceEnabled = true;
-
- checkIgnore(killSingleNode);
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testIgnoreKillThreeNodes() throws Exception {
- partLossPlc = PartitionLossPolicy.IGNORE;
-
- // TODO aliveNodes should include node 4, but it fails due to https://issues.apache.org/jira/browse/IGNITE-5078.
- // TODO need to add 4 to the aliveNodes after IGNITE-5078 is fixed.
- // TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Arrays.asList(0, 4), 0);
- TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Collections.singletonList(0), 0);
-
- checkIgnore(onlyCrdIsAlive);
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testIgnoreKillThreeNodesWithPersistence() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-10041");
-
- partLossPlc = PartitionLossPolicy.IGNORE;
-
- isPersistenceEnabled = true;
-
- // TODO aliveNodes should include node 4, but it fails due to https://issues.apache.org/jira/browse/IGNITE-5078.
- // TODO need to add 4 to the aliveNodes after IGNITE-5078 is fixed.
- // TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Arrays.asList(0, 4), 0);
- TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Collections.singletonList(0), 0);
-
- checkIgnore(onlyCrdIsAlive);
- }
-
- /**
- * @param topChanger topology changer.
- * @throws Exception if failed.
- */
- private void checkIgnore(TopologyChanger topChanger) throws Exception {
topChanger.changeTopology();
for (Ignite ig : G.allGrids()) {
- IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME);
+ IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
Collection<Integer> lost = cache.lostPartitions();
assertTrue("[grid=" + ig.name() + ", lost=" + lost.toString() + ']', lost.isEmpty());
- int parts = ig.affinity(CACHE_NAME).partitions();
+ int parts = ig.affinity(DEFAULT_CACHE_NAME).partitions();
for (int i = 0; i < parts; i++) {
cache.get(i);
@@ -465,127 +270,109 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
private void checkLostPartition(boolean canWrite, boolean safe, TopologyChanger topChanger) throws Exception {
assert partLossPlc != null;
- List<Integer> lostParts = topChanger.changeTopology();
-
- // Wait for all grids (servers and client) have same topology version
- // to make sure that all nodes received map with lost partition.
- boolean success = GridTestUtils.waitForCondition(() -> {
- AffinityTopologyVersion last = null;
- for (Ignite ig : G.allGrids()) {
- AffinityTopologyVersion ver = ((IgniteEx)ig).context().cache().context().exchange().readyAffinityVersion();
-
- if (last != null && !last.equals(ver))
- return false;
-
- last = ver;
- }
-
- return true;
- }, 10000);
-
- assertTrue("Failed to wait for new topology", success);
+ int part = topChanger.changeTopology().get(0);
for (Ignite ig : G.allGrids()) {
info("Checking node: " + ig.cluster().localNode().id());
- IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME);
+ IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
- verifyLostPartitions(ig, lostParts);
-
- verifyCacheOps(canWrite, safe, ig);
-
- validateQuery(safe, ig);
+ verifyCacheOps(canWrite, safe, part, ig);
- // TODO withPartitionRecover doesn't work with BLT - https://issues.apache.org/jira/browse/IGNITE-10041.
- if (!isPersistenceEnabled) {
- // Check we can read and write to lost partition in recovery mode.
- IgniteCache<Integer, Integer> recoverCache = cache.withPartitionRecover();
+ // Check we can read and write to lost partition in recovery mode.
+ IgniteCache<Integer, Integer> recoverCache = cache.withPartitionRecover();
- for (int lostPart : recoverCache.lostPartitions()) {
- recoverCache.get(lostPart);
- recoverCache.put(lostPart, lostPart);
- }
-
- // Check that writing in recover mode does not clear partition state.
- verifyLostPartitions(ig, lostParts);
+ for (int lostPart : recoverCache.lostPartitions()) {
+ recoverCache.get(lostPart);
+ recoverCache.put(lostPart, lostPart);
+ }
- verifyCacheOps(canWrite, safe, ig);
+ // Check that writing in recover mode does not clear partition state.
+ verifyCacheOps(canWrite, safe, part, ig);
- validateQuery(safe, ig);
- }
+ // Validate queries.
+ validateQuery(safe, ig);
}
- // Bring all nodes back.
- for (int i : topChanger.killNodes) {
- IgniteEx grd = startGrid(i);
+ checkNewNode(true, canWrite, safe, part);
+ checkNewNode(false, canWrite, safe, part);
- info("Newly started node: " + grd.cluster().localNode().id());
+ // Check that partition state does not change after we start a new node.
+ IgniteEx grd = startGrid(3);
- // Check that partition state does not change after we start each node.
- // TODO With persistence enabled LOST partitions become OWNING after a node joins back - https://issues.apache.org/jira/browse/IGNITE-10044.
- if (!isPersistenceEnabled) {
- for (Ignite ig : G.allGrids()) {
- verifyCacheOps(canWrite, safe, ig);
+ info("Newly started node: " + grd.cluster().localNode().id());
- // TODO Query effectively waits for rebalance due to https://issues.apache.org/jira/browse/IGNITE-10057
- // TODO and after resetLostPartition there is another OWNING copy in the cluster due to https://issues.apache.org/jira/browse/IGNITE-10058.
- // TODO Uncomment after https://issues.apache.org/jira/browse/IGNITE-10058 is fixed.
-// validateQuery(safe, ig);
- }
- }
- }
+ for (Ignite ig : G.allGrids())
+ verifyCacheOps(canWrite, safe, part, ig);
- ignite(4).resetLostPartitions(Collections.singletonList(CACHE_NAME));
+ ignite(4).resetLostPartitions(Collections.singletonList(DEFAULT_CACHE_NAME));
awaitPartitionMapExchange(true, true, null);
for (Ignite ig : G.allGrids()) {
- IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME);
+ IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
assertTrue(cache.lostPartitions().isEmpty());
- int parts = ig.affinity(CACHE_NAME).partitions();
+ int parts = ig.affinity(DEFAULT_CACHE_NAME).partitions();
for (int i = 0; i < parts; i++) {
cache.get(i);
cache.put(i, i);
}
-
- for (int i = 0; i < parts; i++) {
- checkQueryPasses(ig, false, i);
-
- if (shouldExecuteLocalQuery(ig, i))
- checkQueryPasses(ig, true, i);
-
- }
-
- checkQueryPasses(ig, false);
}
}
/**
- * @param node Node.
- * @param lostParts Lost partition IDs.
+ * @param client Client flag.
+ * @param canWrite Can write flag.
+ * @param safe Safe flag.
+ * @param part List of lost partitions.
+ * @throws Exception If failed to start a new node.
*/
- private void verifyLostPartitions(Ignite node, List<Integer> lostParts) {
- IgniteCache<Integer, Integer> cache = node.cache(CACHE_NAME);
+ private void checkNewNode(
+ boolean client,
+ boolean canWrite,
+ boolean safe,
+ int part
+ ) throws Exception {
+ this.client = client;
+
+ try {
+ IgniteEx cl = startGrid("newNode");
+
+ CacheGroupContext grpCtx = cl.context().cache().cacheGroup(cacheId(DEFAULT_CACHE_NAME));
+
+ assertTrue(grpCtx.needsRecovery());
- Set<Integer> actualSortedLostParts = new TreeSet<>(cache.lostPartitions());
- Set<Integer> expSortedLostParts = new TreeSet<>(lostParts);
+ verifyCacheOps(canWrite, safe, part, cl);
- assertEqualsCollections(expSortedLostParts, actualSortedLostParts);
+ validateQuery(safe, cl);
+ }
+ finally {
+ stopGrid("newNode", false);
+
+ this.client = false;
+ }
}
/**
+ *
* @param canWrite {@code True} if writes are allowed.
* @param safe {@code True} if lost partition should trigger exception.
+ * @param part Lost partition ID.
* @param ig Ignite instance.
*/
- private void verifyCacheOps(boolean canWrite, boolean safe, Ignite ig) {
- IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME);
+ private void verifyCacheOps(boolean canWrite, boolean safe, int part, Ignite ig) {
+ IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
- int parts = ig.affinity(CACHE_NAME).partitions();
+ Collection<Integer> lost = cache.lostPartitions();
+
+ assertTrue("Failed to find expected lost partition [exp=" + part + ", lost=" + lost + ']',
+ lost.contains(part));
+
+ int parts = ig.affinity(DEFAULT_CACHE_NAME).partitions();
// Check read.
for (int i = 0; i < parts; i++) {
@@ -632,8 +419,8 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
* @param nodes List of nodes to find partition.
* @return List of partitions that aren't primary or backup for specified nodes.
*/
- private List<Integer> noPrimaryOrBackupPartition(List<Integer> nodes) {
- Affinity<Object> aff = ignite(4).affinity(CACHE_NAME);
+ protected List<Integer> noPrimaryOrBackupPartition(List<Integer> nodes) {
+ Affinity<Object> aff = ignite(4).affinity(DEFAULT_CACHE_NAME);
List<Integer> parts = new ArrayList<>();
@@ -657,6 +444,127 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
return parts;
}
+ /** */
+ private class TopologyChanger {
+ /** Flag to delay partition exchange */
+ private boolean delayExchange;
+
+ /** List of nodes to kill */
+ private List<Integer> killNodes;
+
+ /** List of nodes to be alive */
+ private List<Integer> aliveNodes;
+
+ /** Delay between node stops */
+ private long stopDelay;
+
+ /**
+ * @param delayExchange Flag for delay partition exchange.
+ * @param killNodes List of nodes to kill.
+ * @param aliveNodes List of nodes to be alive.
+ * @param stopDelay Delay between stopping nodes.
+ */
+ public TopologyChanger(
+ boolean delayExchange,
+ List<Integer> killNodes,
+ List<Integer> aliveNodes,
+ long stopDelay
+ ) {
+ this.delayExchange = delayExchange;
+ this.killNodes = killNodes;
+ this.aliveNodes = aliveNodes;
+ this.stopDelay = stopDelay;
+ }
+
+ /**
+ * @return Lost partition ID.
+ * @throws Exception If failed.
+ */
+ protected List<Integer> changeTopology() throws Exception {
+ startGrids(4);
+
+ Affinity<Object> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < aff.partitions(); i++)
+ ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i);
+
+ client = true;
+
+ startGrid(4);
+
+ client = false;
+
+ for (int i = 0; i < 5; i++)
+ info(">>> Node [idx=" + i + ", nodeId=" + ignite(i).cluster().localNode().id() + ']');
+
+ awaitPartitionMapExchange();
+
+ final List<Integer> parts = noPrimaryOrBackupPartition(aliveNodes);
+
+ if (parts.isEmpty())
+ throw new IllegalStateException("No partition on nodes: " + killNodes);
+
+ final List<Map<Integer, Semaphore>> lostMap = new ArrayList<>();
+
+ for (int i : aliveNodes) {
+ HashMap<Integer, Semaphore> semaphoreMap = new HashMap<>();
+
+ for (Integer part : parts)
+ semaphoreMap.put(part, new Semaphore(0));
+
+ lostMap.add(semaphoreMap);
+
+ grid(i).events().localListen(new P1<Event>() {
+ @Override public boolean apply(Event evt) {
+ assert evt.type() == EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
+
+ CacheRebalancingEvent cacheEvt = (CacheRebalancingEvent)evt;
+
+ if (F.eq(DEFAULT_CACHE_NAME, cacheEvt.cacheName())) {
+ if (semaphoreMap.containsKey(cacheEvt.partition()))
+ semaphoreMap.get(cacheEvt.partition()).release();
+ }
+
+ return true;
+ }
+ }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
+ }
+
+ if (delayExchange)
+ delayPartExchange.set(true);
+
+ ExecutorService executor = Executors.newFixedThreadPool(killNodes.size());
+
+ for (Integer node : killNodes) {
+ executor.submit(new Runnable() {
+ @Override public void run() {
+ grid(node).close();
+ }
+ });
+
+ Thread.sleep(stopDelay);
+ }
+
+ executor.shutdown();
+
+ delayPartExchange.set(false);
+
+ Thread.sleep(5_000L);
+
+ for (Map<Integer, Semaphore> map : lostMap) {
+ for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
+ assertTrue("Failed to wait for partition LOST event for partition:" + entry.getKey(), entry.getValue().tryAcquire(1));
+ }
+
+ for (Map<Integer, Semaphore> map : lostMap) {
+ for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
+ assertFalse("Partition LOST event raised twice for partition:" + entry.getKey(), entry.getValue().tryAcquire(1));
+ }
+
+ return parts;
+ }
+ }
+
/**
* Validate query execution on a node.
*
@@ -665,7 +573,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
*/
private void validateQuery(boolean safe, Ignite node) {
// Get node lost and remaining partitions.
- IgniteCache<?, ?> cache = node.cache(CACHE_NAME);
+ IgniteCache<?, ?> cache = node.cache(DEFAULT_CACHE_NAME);
Collection<Integer> lostParts = cache.lostPartitions();
@@ -673,7 +581,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
Integer remainingPart = null;
- for (int i = 0; i < node.affinity(CACHE_NAME).partitions(); i++) {
+ for (int i = 0; i < node.affinity(DEFAULT_CACHE_NAME).partitions(); i++) {
if (lostParts.contains(i))
continue;
@@ -730,7 +638,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
int numOfPrimaryParts = 0;
- for (int nodePrimaryPart : node.affinity(CACHE_NAME).primaryPartitions(node.cluster().localNode())) {
+ for (int nodePrimaryPart : node.affinity(DEFAULT_CACHE_NAME).primaryPartitions(node.cluster().localNode())) {
for (int part : parts) {
if (part == nodePrimaryPart)
numOfPrimaryParts++;
@@ -754,7 +662,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
if (loc)
return;
- IgniteCache cache = node.cache(CACHE_NAME);
+ IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
ScanQuery qry = new ScanQuery();
@@ -777,124 +685,4 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
// TODO Need to add an actual check after https://issues.apache.org/jira/browse/IGNITE-9902 is fixed.
// No-op.
}
-
- /** */
- private class TopologyChanger {
- /** Flag to delay partition exchange */
- private boolean delayExchange;
-
- /** List of nodes to kill */
- private List<Integer> killNodes;
-
- /** List of nodes to be alive */
- private List<Integer> aliveNodes;
-
- /** Delay between node stops */
- private long stopDelay;
-
- /**
- * @param delayExchange Flag for delay partition exchange.
- * @param killNodes List of nodes to kill.
- * @param aliveNodes List of nodes to be alive.
- * @param stopDelay Delay between stopping nodes.
- */
- private TopologyChanger(boolean delayExchange, List<Integer> killNodes, List<Integer> aliveNodes,
- long stopDelay) {
- this.delayExchange = delayExchange;
- this.killNodes = killNodes;
- this.aliveNodes = aliveNodes;
- this.stopDelay = stopDelay;
- }
-
- /**
- * @return Lost partition ID.
- * @throws Exception If failed.
- */
- private List<Integer> changeTopology() throws Exception {
- startGrids(4);
-
- if (isPersistenceEnabled)
- grid(0).cluster().active(true);
-
- Affinity<Object> aff = ignite(0).affinity(CACHE_NAME);
-
- for (int i = 0; i < aff.partitions(); i++)
- ignite(0).cache(CACHE_NAME).put(i, i);
-
- client = true;
-
- startGrid(4);
-
- client = false;
-
- for (int i = 0; i < 5; i++)
- info(">>> Node [idx=" + i + ", nodeId=" + ignite(i).cluster().localNode().id() + ']');
-
- awaitPartitionMapExchange();
-
- final List<Integer> parts = noPrimaryOrBackupPartition(aliveNodes);
-
- if (parts.isEmpty())
- throw new IllegalStateException("No partition on nodes: " + killNodes);
-
- final List<Map<Integer, Semaphore>> lostMap = new ArrayList<>();
-
- for (int i : aliveNodes) {
- HashMap<Integer, Semaphore> semaphoreMap = new HashMap<>();
-
- for (Integer part : parts)
- semaphoreMap.put(part, new Semaphore(0));
-
- lostMap.add(semaphoreMap);
-
- grid(i).events().localListen(new P1<Event>() {
- @Override public boolean apply(Event evt) {
- assert evt.type() == EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
-
- CacheRebalancingEvent cacheEvt = (CacheRebalancingEvent)evt;
-
- if (F.eq(CACHE_NAME, cacheEvt.cacheName())) {
- if (semaphoreMap.containsKey(cacheEvt.partition()))
- semaphoreMap.get(cacheEvt.partition()).release();
- }
-
- return true;
- }
- }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
- }
-
- if (delayExchange)
- delayPartExchange.set(true);
-
- ExecutorService executor = Executors.newFixedThreadPool(killNodes.size());
-
- for (Integer node : killNodes) {
- executor.submit(new Runnable() {
- @Override public void run() {
- grid(node).close();
- }
- });
-
- Thread.sleep(stopDelay);
- }
-
- executor.shutdown();
-
- delayPartExchange.set(false);
-
- Thread.sleep(5_000L);
-
- for (Map<Integer, Semaphore> map : lostMap) {
- for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
- assertTrue("Failed to wait for partition LOST event for partition:" + entry.getKey(), entry.getValue().tryAcquire(1));
- }
-
- for (Map<Integer, Semaphore> map : lostMap) {
- for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
- assertFalse("Partition LOST event raised twice for partition:" + entry.getKey(), entry.getValue().tryAcquire(1));
- }
-
- return parts;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java
index a31a1c6..7007499 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java
@@ -69,7 +69,7 @@ public class IndexingCachePartitionLossPolicySelfTest extends IgniteCachePartiti
* @param loc Local flag.
*/
private static void executeQuery(Ignite node, boolean loc, int... parts) {
- IgniteCache cache = node.cache(CACHE_NAME);
+ IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
SqlFieldsQuery qry = new SqlFieldsQuery("SELECT * FROM Integer");