You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by jo...@apache.org on 2018/11/21 15:20:59 UTC

ignite git commit: IGNITE-10226 Fixed wrong partition state recovery - Fixes #5396.

Repository: ignite
Updated Branches:
  refs/heads/master a8fc7af01 -> ceba2145d


IGNITE-10226 Fixed wrong partition state recovery - Fixes #5396.

Signed-off-by: Pavel Kovalenko <jo...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ceba2145
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ceba2145
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ceba2145

Branch: refs/heads/master
Commit: ceba2145d3eb39ca5430393c84390c76864ecefc
Parents: a8fc7af
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Wed Nov 21 18:19:32 2018 +0300
Committer: Pavel Kovalenko <jo...@gmail.com>
Committed: Wed Nov 21 18:19:32 2018 +0300

----------------------------------------------------------------------
 .../dht/topology/GridDhtLocalPartition.java     |  31 +++-
 .../topology/GridDhtPartitionTopologyImpl.java  |   6 +-
 .../GridCacheDatabaseSharedManager.java         |   2 +-
 .../IgnitePdsPartitionsStateRecoveryTest.java   | 170 +++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   3 +
 5 files changed, 203 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ceba2145/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index 833a2b1..d1cd1eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -32,6 +32,8 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
@@ -172,10 +174,14 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * @param ctx Context.
      * @param grp Cache group.
      * @param id Partition ID.
-     */
-    public GridDhtLocalPartition(GridCacheSharedContext ctx,
-        CacheGroupContext grp,
-        int id) {
+     * @param recovery Flag indicates that partition is created during recovery phase.
+     */
+    public GridDhtLocalPartition(
+            GridCacheSharedContext ctx,
+            CacheGroupContext grp,
+            int id,
+            boolean recovery
+    ) {
         super(ENTRY_FACTORY);
 
         this.id = id;
@@ -212,7 +218,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
             store = grp.offheap().createCacheDataStore(id);
 
             // Log partition creation for further crash recovery purposes.
-            if (grp.walEnabled())
+            if (grp.walEnabled() && !recovery)
                 ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, state(), updateCounter()));
 
             // Inject row cache cleaner on store creation
@@ -559,6 +565,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to log partition state change to WAL.", e);
+
+                        ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
                     }
 
                     if (log.isDebugEnabled())
@@ -973,6 +981,19 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
         if (grp.sharedGroup())
             grp.onPartitionCounterUpdate(cacheId, id, primaryCntr != null ? primaryCntr : nextCntr, topVer, primary);
 
+        // This is first update in partition, we should log partition state information for further crash recovery.
+        if (nextCntr == 1) {
+            if (grp.persistenceEnabled() && grp.walEnabled())
+                try {
+                    ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, state(), 0));
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to log partition state snapshot to WAL.", e);
+
+                    ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+                }
+        }
+
         return nextCntr;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ceba2145/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index f64c1c4..fb30ecd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -864,7 +864,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (loc != null)
                 loc.awaitDestroy();
 
-            locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p));
+            locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, false));
 
             long updCntr = cntrMap.updateCounter(p);
 
@@ -895,7 +895,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (part != null && part.state() != EVICTED)
                 return part;
 
-            part = new GridDhtLocalPartition(ctx, grp, p);
+            part = new GridDhtLocalPartition(ctx, grp, p, true);
 
             locParts.set(p, part);
 
@@ -972,7 +972,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                             "[grp=" + grp.cacheOrGroupName() + ", part=" + p + ", topVer=" + topVer +
                             ", this.topVer=" + this.readyTopVer + ']');
 
-                    locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p));
+                    locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, false));
 
                     this.updateSeq.incrementAndGet();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ceba2145/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index c74972a..ed54f65 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2989,7 +2989,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 while (!isCancelled()) {
                     waitCheckpointEvent();
 
-                    if (skipCheckpointOnNodeStop && (isStopping() || shutdownNow)) {
+                    if (skipCheckpointOnNodeStop && (isCancelled() || shutdownNow)) {
                         if (log.isInfoEnabled())
                             log.warning("Skipping last checkpoint because node is stopping.");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ceba2145/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionsStateRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionsStateRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionsStateRecoveryTest.java
new file mode 100644
index 0000000..112fc59
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionsStateRecoveryTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.util.Arrays;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+/**
+ *
+ */
+public class IgnitePdsPartitionsStateRecoveryTest extends GridCommonAbstractTest {
+    /** Cache name. */
+    private static final String CACHE = "cache";
+
+    /** Partitions count. */
+    private static final int PARTS_CNT = 32;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+            .setWalMode(WALMode.LOG_ONLY)
+            .setWalSegmentSize(16 * 1024 * 1024)
+            .setCheckpointFrequency(20 * 60 * 1000)
+            .setDefaultDataRegionConfiguration(
+                new DataRegionConfiguration()
+                    .setMaxSize(512 * 1024 * 1024)
+                    .setPersistenceEnabled(true)
+            );
+
+        cfg.setDataStorageConfiguration(dsCfg);
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE)
+            .setBackups(0)
+            .setRebalanceMode(CacheRebalanceMode.NONE) // Disable rebalance to prevent owning MOVING partitions.
+            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+            .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT));
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        System.setProperty(GridCacheDatabaseSharedManager.IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, "true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        System.clearProperty(GridCacheDatabaseSharedManager.IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP);
+    }
+
+    /**
+     * Test checks that partition state is recovered properly if last checkpoint was skipped and there are logical updates to apply.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPartitionsStateConsistencyAfterRecovery() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Object, Object> cache = ignite.getOrCreateCache(CACHE);
+
+        for (int key = 0; key < 4096; key++)
+            cache.put(key, key);
+
+        forceCheckpoint();
+
+        for (int key = 0; key < 4096; key++) {
+            int[] payload = new int[4096];
+            Arrays.fill(payload, key);
+
+            cache.put(key, payload);
+        }
+
+        GridDhtPartitionTopology topology = ignite.cachex(CACHE).context().topology();
+
+        Assert.assertFalse(topology.hasMovingPartitions());
+
+        log.info("Stopping grid...");
+
+        stopGrid(0);
+
+        ignite = startGrid(0);
+
+        awaitPartitionMapExchange();
+
+        topology = ignite.cachex(CACHE).context().topology();
+
+        Assert.assertFalse("Node restored moving partitions after join to topology.", topology.hasMovingPartitions());
+    }
+
+    /**
+     * Test checks that partition state is recovered properly if only logical updates exist.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPartitionsStateConsistencyAfterRecoveryNoCheckpoints() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Object, Object> cache = ignite.getOrCreateCache(CACHE);
+
+        forceCheckpoint();
+
+        for (int key = 0; key < 4096; key++) {
+            int[] payload = new int[4096];
+            Arrays.fill(payload, key);
+
+            cache.put(key, payload);
+        }
+
+        GridDhtPartitionTopology topology = ignite.cachex(CACHE).context().topology();
+
+        Assert.assertFalse(topology.hasMovingPartitions());
+
+        log.info("Stopping grid...");
+
+        stopGrid(0);
+
+        ignite = startGrid(0);
+
+        awaitPartitionMapExchange();
+
+        topology = ignite.cachex(CACHE).context().topology();
+
+        Assert.assertFalse("Node restored moving partitions after join to topology.", topology.hasMovingPartitions());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ceba2145/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 75a8af7..c12f515 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorrupte
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsExchangeDuringCheckpointTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPageSizesTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPartitionFilesDestroyTest;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPartitionsStateRecoveryTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreDataStructuresTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteRebalanceScheduleResendPartitionsTest;
 import org.apache.ignite.internal.processors.cache.persistence.LocalWacModeNoChangeDuringRebalanceOnNonNodeAssignTest;
@@ -194,5 +195,7 @@ public class IgnitePdsTestSuite2 extends TestSuite {
         suite.addTestSuite(WalRolloverTypesTest.class);
 
         suite.addTestSuite(FsyncWalRolloverDoesNotBlockTest.class);
+
+        suite.addTestSuite(IgnitePdsPartitionsStateRecoveryTest.class);
     }
 }