You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/11/11 22:16:29 UTC

[2/2] ignite git commit: IGNITE-10207 fix checking lost partition for many cases

IGNITE-10207 fix checking lost partition for many cases

Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19d8d902
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19d8d902
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19d8d902

Branch: refs/heads/ignite-10207
Commit: 19d8d902f6ee2fb77244549a1a891a901ea2386d
Parents: 162e573
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Mon Nov 12 01:16:18 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Mon Nov 12 01:16:18 2018 +0300

----------------------------------------------------------------------
 .../cache/distributed/dht/GridDhtGetFuture.java |  28 ++-
 .../distributed/dht/GridDhtGetSingleFuture.java |  29 ++-
 .../dht/GridDhtTopologyFutureAdapter.java       | 230 ++++++++++++-------
 .../dht/GridPartitionedSingleGetFuture.java     |   7 +
 .../GridDhtPartitionsExchangeFuture.java        |   4 +-
 5 files changed, 205 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/19d8d902/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 0bdc6b1..20c94c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -37,8 +37,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.ReaderArguments;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.LostPolicyValidator;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -55,6 +57,9 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Collections.singleton;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.READ;
+
 /**
  *
  */
@@ -314,14 +319,31 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
             if (part == null)
                 return false;
 
+            if (part.state() == GridDhtPartitionState.LOST && !recovery) {
+                Throwable error = LostPolicyValidator.validate(cctx, key, READ, singleton(part.id()));
+
+                if (error != null) {
+                    onDone(null, error);
+
+                    return false;
+                }
+            }
+
             if (parts == null || !F.contains(parts, part.id())) {
                 // By reserving, we make sure that partition won't be unloaded while processed.
                 if (part.reserve()) {
-                    parts = parts == null ? new int[1] : Arrays.copyOf(parts, parts.length + 1);
+                    if (part.state() == GridDhtPartitionState.OWNING || part.state() == GridDhtPartitionState.LOST) {
+                        parts = parts == null ? new int[1] : Arrays.copyOf(parts, parts.length + 1);
 
-                    parts[parts.length - 1] = part.id();
+                        parts[parts.length - 1] = part.id();
 
-                    return true;
+                        return true;
+                    }
+                    else {
+                        part.release();
+
+                        return false;
+                    }
                 }
                 else
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/19d8d902/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index ee46168..94f07e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -35,8 +35,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.ReaderArguments;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.LostPolicyValidator;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -47,6 +49,9 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Collections.singleton;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.READ;
+
 /**
  *
  */
@@ -255,7 +260,8 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
         if (!map(key)) {
             retry = cctx.affinity().partition(key);
 
-            onDone((GridCacheEntryInfo)null);
+            if (!isDone())
+                onDone((GridCacheEntryInfo)null);
 
             return;
         }
@@ -285,11 +291,28 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
 
             assert this.part == -1;
 
+            if (part.state() == GridDhtPartitionState.LOST && !recovery) {
+                Throwable error = LostPolicyValidator.validate(cctx, key, READ, singleton(part.id()));
+
+                if (error != null) {
+                    onDone(null, error);
+
+                    return false;
+                }
+            }
+
             // By reserving, we make sure that partition won't be unloaded while processed.
             if (part.reserve()) {
-                this.part = part.id();
+                if (part.state() == GridDhtPartitionState.OWNING || part.state() == GridDhtPartitionState.LOST) {
+                    this.part = part.id();
+
+                    return true;
+                }
+                else {
+                    part.release();
 
-                return true;
+                    return false;
+                }
             }
             else
                 return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/19d8d902/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
index 9214308..23d0524 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
@@ -29,12 +29,13 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_ALL;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
-import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_ALL;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.WRITE;
 
 /**
  *
@@ -42,7 +43,7 @@ import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
 public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<AffinityTopologyVersion>
     implements GridDhtTopologyFuture {
     /** Cache groups validation results. */
-    protected volatile Map<Integer, CacheValidation> grpValidRes;
+    protected volatile Map<Integer, CacheGroupValidation> grpValidRes;
 
     /** Whether or not cluster is active. */
     protected volatile boolean clusterIsActive = true;
@@ -52,7 +53,7 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
      * @param topNodes Topology nodes.
      * @return Validation result.
      */
-    protected final CacheValidation validateCacheGroup(CacheGroupContext grp, Collection<ClusterNode> topNodes) {
+    protected final CacheGroupValidation validateCacheGroup(CacheGroupContext grp, Collection<ClusterNode> topNodes) {
         Collection<Integer> lostParts = grp.isLocal() ?
             Collections.<Integer>emptyList() : grp.topology().lostPartitions();
 
@@ -65,11 +66,11 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
                 valid = validator.validate(topNodes);
         }
 
-        return new CacheValidation(valid, lostParts);
+        return new CacheGroupValidation(valid, lostParts);
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public final Throwable validateCache(
+    @Override public final @Nullable Throwable validateCache(
         GridCacheContext cctx,
         boolean recovery,
         boolean read,
@@ -87,115 +88,174 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
             return new CacheInvalidStateException(
                 "Failed to perform cache operation (cluster is not activated): " + cctx.name());
 
+        if (!cctx.shared().kernalContext().state().publicApiActiveState(true))
+            return new CacheInvalidStateException(
+                "Failed to perform cache operation (cluster is not activated): " + cctx.name());
+
+        OperationType opType = read ? OperationType.READ : WRITE;
+
         CacheGroupContext grp = cctx.group();
 
-        PartitionLossPolicy partLossPlc = grp.config().getPartitionLossPolicy();
+        CacheGroupValidation validation = grpValidRes.get(grp.groupId());
 
-        if (grp.needsRecovery() && !recovery) {
-            if (!read && (partLossPlc == READ_ONLY_SAFE || partLossPlc == READ_ONLY_ALL))
-                return new IgniteCheckedException("Failed to write to cache (cache is moved to a read-only state): " +
-                    cctx.name());
-        }
+        if (validation == null)
+            return null;
 
-        if (cctx.shared().readOnlyMode() && !read)
-            return new IgniteCheckedException("Failed to perform cache operation (cluster is in read only mode)" );
+        if (opType == WRITE && !validation.isValid()) {
+            return new IgniteCheckedException("Failed to perform cache operation " +
+                "(cache topology is not valid): " + cctx.name());
+        }
 
-        if (grp.needsRecovery() || grp.topologyValidator() != null) {
-            CacheValidation validation = grpValidRes.get(grp.groupId());
+        if (recovery)
+            return null;
 
-            if (validation == null)
-                return null;
+        if (validation.hasLostPartitions()) {
+            if (key != null)
+                return LostPolicyValidator.validate(cctx, key, opType, validation.lostPartitions());
 
-            if (!validation.valid && !read)
-                return new IgniteCheckedException("Failed to perform cache operation " +
-                    "(cache topology is not valid): " + cctx.name());
+            if (keys != null)
+                return LostPolicyValidator.validate(cctx, keys, opType, validation.lostPartitions());
+        }
 
-            if (recovery || !grp.needsRecovery())
-                return null;
+        return null;
+    }
 
-            if (key != null) {
-                int p = cctx.affinity().partition(key);
+    /**
+     * Cache group validation result.
+     */
+    protected static class CacheGroupValidation {
+        /** Topology validation result. */
+        private final boolean valid;
 
-                CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, key, p,
-                    validation.lostParts, partLossPlc);
+        /** Lost partitions on this topology version. */
+        private final Collection<Integer> lostParts;
 
-                if (ex != null)
-                    return ex;
-            }
+        /**
+         * @param valid Valid flag.
+         * @param lostParts Lost partitions.
+         */
+        private CacheGroupValidation(boolean valid, Collection<Integer> lostParts) {
+            this.valid = valid;
+            this.lostParts = lostParts;
+        }
 
-            if (keys != null) {
-                for (Object k : keys) {
-                    int p = cctx.affinity().partition(k);
+        /**
+         * @return True if valid, False if invalide.
+         */
+        public boolean isValid() {
+            return valid;
+        }
 
-                    CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, k, p,
-                        validation.lostParts, partLossPlc);
+        /**
+         * @return True if lost partition is present, False if not.
+         */
+        public boolean hasLostPartitions() {
+            return !F.isEmpty(lostParts);
+        }
 
-                    if (ex != null)
-                        return ex;
-                }
-            }
+        /**
+         * @return Lost patition ID collection.
+         */
+        public Collection<Integer> lostPartitions() {
+            return lostParts;
         }
+    }
 
-        return null;
+    /**
+     *
+     */
+    public enum OperationType {
+        /**
+         * Read operation.
+         */
+        READ,
+        /**
+         * Write operation.
+         */
+        WRITE
     }
 
     /**
-     * @param cacheName Cache name.
-     * @param read Read flag.
-     * @param key Key to check.
-     * @param part Partition this key belongs to.
-     * @param lostParts Collection of lost partitions.
-     * @param plc Partition loss policy.
-     * @return Invalid state exception if this operation is disallowed.
+     * Lost policy validator.
      */
-    private CacheInvalidStateException validatePartitionOperation(
-        String cacheName,
-        boolean read,
-        Object key,
-        int part,
-        Collection<Integer> lostParts,
-        PartitionLossPolicy plc
-    ) {
-        if (lostParts.contains(part)) {
-            if (!read) {
-                assert plc == READ_WRITE_ALL || plc == READ_WRITE_SAFE;
+    public static class LostPolicyValidator {
+        /**
+         *
+         */
+        public static Throwable validate(
+            GridCacheContext cctx,
+            Object key,
+            OperationType opType,
+            Collection<Integer> lostParts
+        ) {
+            CacheGroupContext grp = cctx.group();
+
+            PartitionLossPolicy lostPlc = grp.config().getPartitionLossPolicy();
+
+            int partition = cctx.affinity().partition(key);
+
+            return validate(cctx, key, partition, opType, lostPlc, lostParts);
+        }
+
+        /**
+         *
+         */
+        public static Throwable validate(
+            GridCacheContext cctx,
+            Collection<?> keys,
+            OperationType opType,
+            Collection<Integer> lostParts
+        ) {
+            CacheGroupContext grp = cctx.group();
 
-                if (plc == READ_WRITE_SAFE) {
+            PartitionLossPolicy lostPlc = grp.config().getPartitionLossPolicy();
+
+            for (Object key : keys) {
+                int partition = cctx.affinity().partition(key);
+
+                Throwable res = validate(cctx, key, partition, opType, lostPlc, lostParts);
+
+                if (res != null)
+                    return res;
+            }
+
+            return null;
+        }
+
+        /**
+         *
+         */
+        private static Throwable validate(
+            GridCacheContext cctx,
+            Object key,
+            int partition,
+            OperationType opType,
+            PartitionLossPolicy lostPlc,
+            Collection<Integer> lostParts
+        ) {
+            if (opType == WRITE) {
+                if (lostPlc == READ_ONLY_SAFE || lostPlc == READ_ONLY_ALL) {
+                    return new IgniteCheckedException(
+                        "Failed to write to cache (cache is moved to a read-only state): " + cctx.name()
+                    );
+                }
+
+                if (lostParts.contains(partition) && lostPlc == READ_WRITE_SAFE) {
                     return new CacheInvalidStateException("Failed to execute cache operation " +
                         "(all partition owners have left the grid, partition data has been lost) [" +
-                        "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']');
+                        "cacheName=" + cctx.name() + ", part=" + partition + ", key=" + key + ']');
                 }
             }
-            else {
-                // Read.
-                if (plc == READ_ONLY_SAFE || plc == READ_WRITE_SAFE)
+
+            if (opType == OperationType.READ) {
+                if (lostParts.contains(partition) && (lostPlc == READ_ONLY_SAFE || lostPlc == READ_WRITE_SAFE))
                     return new CacheInvalidStateException("Failed to execute cache operation " +
                         "(all partition owners have left the grid, partition data has been lost) [" +
-                        "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']');
+                        "cacheName=" + cctx.name() + ", part=" + partition + ", key=" + key + ']'
+                    );
             }
-        }
-
-        return null;
-    }
-
-    /**
-     * Cache validation result.
-     */
-    protected static class CacheValidation {
-        /** Topology validation result. */
-        private boolean valid;
 
-        /** Lost partitions on this topology version. */
-        private Collection<Integer> lostParts;
-
-        /**
-         * @param valid Valid flag.
-         * @param lostParts Lost partitions.
-         */
-        private CacheValidation(boolean valid, Collection<Integer> lostParts) {
-            this.valid = valid;
-            this.lostParts = lostParts;
+            return null;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/19d8d902/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 4dc72b2..93cc2ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -764,6 +764,13 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
     private void remap(final AffinityTopologyVersion topVer) {
         cctx.closures().runLocalSafe(new Runnable() {
             @Override public void run() {
+                GridDhtTopologyFuture lastFut = cctx.shared().exchange().lastFinishedFuture();
+
+                Throwable error = lastFut.validateCache(cctx, recovery, true, key, null);
+
+                if (error != null)
+                    onDone(error);
+
                 map(topVer);
             }
         });

http://git-wip-us.apache.org/repos/asf/ignite/blob/19d8d902/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3702a51..c8471c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2077,10 +2077,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
-            if (serverNodeDiscoveryEvent())
+            if (serverNodeDiscoveryEvent() || localJoinExchange())
                 detectLostPartitions(res);
 
-            Map<Integer, CacheValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
+            Map<Integer, CacheGroupValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
 
             for (CacheGroupContext grp : cctx.cache().cacheGroups())
                 m.put(grp.groupId(), validateCacheGroup(grp, events().lastEvent().topologyNodes()));