You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/11/11 22:16:29 UTC
[2/2] ignite git commit: IGNITE-10207 fix checking lost partition for
many cases
IGNITE-10207 fix checking lost partition for many cases
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19d8d902
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19d8d902
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19d8d902
Branch: refs/heads/ignite-10207
Commit: 19d8d902f6ee2fb77244549a1a891a901ea2386d
Parents: 162e573
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Mon Nov 12 01:16:18 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Mon Nov 12 01:16:18 2018 +0300
----------------------------------------------------------------------
.../cache/distributed/dht/GridDhtGetFuture.java | 28 ++-
.../distributed/dht/GridDhtGetSingleFuture.java | 29 ++-
.../dht/GridDhtTopologyFutureAdapter.java | 230 ++++++++++++-------
.../dht/GridPartitionedSingleGetFuture.java | 7 +
.../GridDhtPartitionsExchangeFuture.java | 4 +-
5 files changed, 205 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/19d8d902/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 0bdc6b1..20c94c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -37,8 +37,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.ReaderArguments;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.LostPolicyValidator;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -55,6 +57,9 @@ import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static java.util.Collections.singleton;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.READ;
+
/**
*
*/
@@ -314,14 +319,31 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
if (part == null)
return false;
+ if (part.state() == GridDhtPartitionState.LOST && !recovery) {
+ Throwable error = LostPolicyValidator.validate(cctx, key, READ, singleton(part.id()));
+
+ if (error != null) {
+ onDone(null, error);
+
+ return false;
+ }
+ }
+
if (parts == null || !F.contains(parts, part.id())) {
// By reserving, we make sure that partition won't be unloaded while processed.
if (part.reserve()) {
- parts = parts == null ? new int[1] : Arrays.copyOf(parts, parts.length + 1);
+ if (part.state() == GridDhtPartitionState.OWNING || part.state() == GridDhtPartitionState.LOST) {
+ parts = parts == null ? new int[1] : Arrays.copyOf(parts, parts.length + 1);
- parts[parts.length - 1] = part.id();
+ parts[parts.length - 1] = part.id();
- return true;
+ return true;
+ }
+ else {
+ part.release();
+
+ return false;
+ }
}
else
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/19d8d902/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index ee46168..94f07e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -35,8 +35,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.ReaderArguments;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.LostPolicyValidator;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -47,6 +49,9 @@ import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static java.util.Collections.singleton;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.READ;
+
/**
*
*/
@@ -255,7 +260,8 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
if (!map(key)) {
retry = cctx.affinity().partition(key);
- onDone((GridCacheEntryInfo)null);
+ if (!isDone())
+ onDone((GridCacheEntryInfo)null);
return;
}
@@ -285,11 +291,28 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
assert this.part == -1;
+ if (part.state() == GridDhtPartitionState.LOST && !recovery) {
+ Throwable error = LostPolicyValidator.validate(cctx, key, READ, singleton(part.id()));
+
+ if (error != null) {
+ onDone(null, error);
+
+ return false;
+ }
+ }
+
// By reserving, we make sure that partition won't be unloaded while processed.
if (part.reserve()) {
- this.part = part.id();
+ if (part.state() == GridDhtPartitionState.OWNING || part.state() == GridDhtPartitionState.LOST) {
+ this.part = part.id();
+
+ return true;
+ }
+ else {
+ part.release();
- return true;
+ return false;
+ }
}
else
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/19d8d902/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
index 9214308..23d0524 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
@@ -29,12 +29,13 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_ALL;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
-import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_ALL;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.WRITE;
/**
*
@@ -42,7 +43,7 @@ import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<AffinityTopologyVersion>
implements GridDhtTopologyFuture {
/** Cache groups validation results. */
- protected volatile Map<Integer, CacheValidation> grpValidRes;
+ protected volatile Map<Integer, CacheGroupValidation> grpValidRes;
/** Whether or not cluster is active. */
protected volatile boolean clusterIsActive = true;
@@ -52,7 +53,7 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
* @param topNodes Topology nodes.
* @return Validation result.
*/
- protected final CacheValidation validateCacheGroup(CacheGroupContext grp, Collection<ClusterNode> topNodes) {
+ protected final CacheGroupValidation validateCacheGroup(CacheGroupContext grp, Collection<ClusterNode> topNodes) {
Collection<Integer> lostParts = grp.isLocal() ?
Collections.<Integer>emptyList() : grp.topology().lostPartitions();
@@ -65,11 +66,11 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
valid = validator.validate(topNodes);
}
- return new CacheValidation(valid, lostParts);
+ return new CacheGroupValidation(valid, lostParts);
}
/** {@inheritDoc} */
- @Nullable @Override public final Throwable validateCache(
+ @Override public final @Nullable Throwable validateCache(
GridCacheContext cctx,
boolean recovery,
boolean read,
@@ -87,115 +88,174 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
return new CacheInvalidStateException(
"Failed to perform cache operation (cluster is not activated): " + cctx.name());
+ if (!cctx.shared().kernalContext().state().publicApiActiveState(true))
+ return new CacheInvalidStateException(
+ "Failed to perform cache operation (cluster is not activated): " + cctx.name());
+
+ OperationType opType = read ? OperationType.READ : WRITE;
+
CacheGroupContext grp = cctx.group();
- PartitionLossPolicy partLossPlc = grp.config().getPartitionLossPolicy();
+ CacheGroupValidation validation = grpValidRes.get(grp.groupId());
- if (grp.needsRecovery() && !recovery) {
- if (!read && (partLossPlc == READ_ONLY_SAFE || partLossPlc == READ_ONLY_ALL))
- return new IgniteCheckedException("Failed to write to cache (cache is moved to a read-only state): " +
- cctx.name());
- }
+ if (validation == null)
+ return null;
- if (cctx.shared().readOnlyMode() && !read)
- return new IgniteCheckedException("Failed to perform cache operation (cluster is in read only mode)" );
+ if (opType == WRITE && !validation.isValid()) {
+ return new IgniteCheckedException("Failed to perform cache operation " +
+ "(cache topology is not valid): " + cctx.name());
+ }
- if (grp.needsRecovery() || grp.topologyValidator() != null) {
- CacheValidation validation = grpValidRes.get(grp.groupId());
+ if (recovery)
+ return null;
- if (validation == null)
- return null;
+ if (validation.hasLostPartitions()) {
+ if (key != null)
+ return LostPolicyValidator.validate(cctx, key, opType, validation.lostPartitions());
- if (!validation.valid && !read)
- return new IgniteCheckedException("Failed to perform cache operation " +
- "(cache topology is not valid): " + cctx.name());
+ if (keys != null)
+ return LostPolicyValidator.validate(cctx, keys, opType, validation.lostPartitions());
+ }
- if (recovery || !grp.needsRecovery())
- return null;
+ return null;
+ }
- if (key != null) {
- int p = cctx.affinity().partition(key);
+ /**
+ * Cache group validation result.
+ */
+ protected static class CacheGroupValidation {
+ /** Topology validation result. */
+ private final boolean valid;
- CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, key, p,
- validation.lostParts, partLossPlc);
+ /** Lost partitions on this topology version. */
+ private final Collection<Integer> lostParts;
- if (ex != null)
- return ex;
- }
+ /**
+ * @param valid Valid flag.
+ * @param lostParts Lost partitions.
+ */
+ private CacheGroupValidation(boolean valid, Collection<Integer> lostParts) {
+ this.valid = valid;
+ this.lostParts = lostParts;
+ }
- if (keys != null) {
- for (Object k : keys) {
- int p = cctx.affinity().partition(k);
+ /**
+ * @return True if valid, False if invalide.
+ */
+ public boolean isValid() {
+ return valid;
+ }
- CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, k, p,
- validation.lostParts, partLossPlc);
+ /**
+ * @return True if lost partition is present, False if not.
+ */
+ public boolean hasLostPartitions() {
+ return !F.isEmpty(lostParts);
+ }
- if (ex != null)
- return ex;
- }
- }
+ /**
+ * @return Lost patition ID collection.
+ */
+ public Collection<Integer> lostPartitions() {
+ return lostParts;
}
+ }
- return null;
+ /**
+ *
+ */
+ public enum OperationType {
+ /**
+ * Read operation.
+ */
+ READ,
+ /**
+ * Write operation.
+ */
+ WRITE
}
/**
- * @param cacheName Cache name.
- * @param read Read flag.
- * @param key Key to check.
- * @param part Partition this key belongs to.
- * @param lostParts Collection of lost partitions.
- * @param plc Partition loss policy.
- * @return Invalid state exception if this operation is disallowed.
+ * Lost policy validator.
*/
- private CacheInvalidStateException validatePartitionOperation(
- String cacheName,
- boolean read,
- Object key,
- int part,
- Collection<Integer> lostParts,
- PartitionLossPolicy plc
- ) {
- if (lostParts.contains(part)) {
- if (!read) {
- assert plc == READ_WRITE_ALL || plc == READ_WRITE_SAFE;
+ public static class LostPolicyValidator {
+ /**
+ *
+ */
+ public static Throwable validate(
+ GridCacheContext cctx,
+ Object key,
+ OperationType opType,
+ Collection<Integer> lostParts
+ ) {
+ CacheGroupContext grp = cctx.group();
+
+ PartitionLossPolicy lostPlc = grp.config().getPartitionLossPolicy();
+
+ int partition = cctx.affinity().partition(key);
+
+ return validate(cctx, key, partition, opType, lostPlc, lostParts);
+ }
+
+ /**
+ *
+ */
+ public static Throwable validate(
+ GridCacheContext cctx,
+ Collection<?> keys,
+ OperationType opType,
+ Collection<Integer> lostParts
+ ) {
+ CacheGroupContext grp = cctx.group();
- if (plc == READ_WRITE_SAFE) {
+ PartitionLossPolicy lostPlc = grp.config().getPartitionLossPolicy();
+
+ for (Object key : keys) {
+ int partition = cctx.affinity().partition(key);
+
+ Throwable res = validate(cctx, key, partition, opType, lostPlc, lostParts);
+
+ if (res != null)
+ return res;
+ }
+
+ return null;
+ }
+
+ /**
+ *
+ */
+ private static Throwable validate(
+ GridCacheContext cctx,
+ Object key,
+ int partition,
+ OperationType opType,
+ PartitionLossPolicy lostPlc,
+ Collection<Integer> lostParts
+ ) {
+ if (opType == WRITE) {
+ if (lostPlc == READ_ONLY_SAFE || lostPlc == READ_ONLY_ALL) {
+ return new IgniteCheckedException(
+ "Failed to write to cache (cache is moved to a read-only state): " + cctx.name()
+ );
+ }
+
+ if (lostParts.contains(partition) && lostPlc == READ_WRITE_SAFE) {
return new CacheInvalidStateException("Failed to execute cache operation " +
"(all partition owners have left the grid, partition data has been lost) [" +
- "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']');
+ "cacheName=" + cctx.name() + ", part=" + partition + ", key=" + key + ']');
}
}
- else {
- // Read.
- if (plc == READ_ONLY_SAFE || plc == READ_WRITE_SAFE)
+
+ if (opType == OperationType.READ) {
+ if (lostParts.contains(partition) && (lostPlc == READ_ONLY_SAFE || lostPlc == READ_WRITE_SAFE))
return new CacheInvalidStateException("Failed to execute cache operation " +
"(all partition owners have left the grid, partition data has been lost) [" +
- "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']');
+ "cacheName=" + cctx.name() + ", part=" + partition + ", key=" + key + ']'
+ );
}
- }
-
- return null;
- }
-
- /**
- * Cache validation result.
- */
- protected static class CacheValidation {
- /** Topology validation result. */
- private boolean valid;
- /** Lost partitions on this topology version. */
- private Collection<Integer> lostParts;
-
- /**
- * @param valid Valid flag.
- * @param lostParts Lost partitions.
- */
- private CacheValidation(boolean valid, Collection<Integer> lostParts) {
- this.valid = valid;
- this.lostParts = lostParts;
+ return null;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/19d8d902/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 4dc72b2..93cc2ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -764,6 +764,13 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
private void remap(final AffinityTopologyVersion topVer) {
cctx.closures().runLocalSafe(new Runnable() {
@Override public void run() {
+ GridDhtTopologyFuture lastFut = cctx.shared().exchange().lastFinishedFuture();
+
+ Throwable error = lastFut.validateCache(cctx, recovery, true, key, null);
+
+ if (error != null)
+ onDone(error);
+
map(topVer);
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/19d8d902/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3702a51..c8471c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2077,10 +2077,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
- if (serverNodeDiscoveryEvent())
+ if (serverNodeDiscoveryEvent() || localJoinExchange())
detectLostPartitions(res);
- Map<Integer, CacheValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
+ Map<Integer, CacheGroupValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
for (CacheGroupContext grp : cctx.cache().cacheGroups())
m.put(grp.groupId(), validateCacheGroup(grp, events().lastEvent().topologyNodes()));