You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by jo...@apache.org on 2018/11/21 15:20:59 UTC
ignite git commit: IGNITE-10226 Fixed wrong partition state recovery
- Fixes #5396.
Repository: ignite
Updated Branches:
refs/heads/master a8fc7af01 -> ceba2145d
IGNITE-10226 Fixed wrong partition state recovery - Fixes #5396.
Signed-off-by: Pavel Kovalenko <jo...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ceba2145
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ceba2145
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ceba2145
Branch: refs/heads/master
Commit: ceba2145d3eb39ca5430393c84390c76864ecefc
Parents: a8fc7af
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Wed Nov 21 18:19:32 2018 +0300
Committer: Pavel Kovalenko <jo...@gmail.com>
Committed: Wed Nov 21 18:19:32 2018 +0300
----------------------------------------------------------------------
.../dht/topology/GridDhtLocalPartition.java | 31 +++-
.../topology/GridDhtPartitionTopologyImpl.java | 6 +-
.../GridCacheDatabaseSharedManager.java | 2 +-
.../IgnitePdsPartitionsStateRecoveryTest.java | 170 +++++++++++++++++++
.../ignite/testsuites/IgnitePdsTestSuite2.java | 3 +
5 files changed, 203 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceba2145/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index 833a2b1..d1cd1eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -32,6 +32,8 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
@@ -172,10 +174,14 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @param ctx Context.
* @param grp Cache group.
* @param id Partition ID.
- */
- public GridDhtLocalPartition(GridCacheSharedContext ctx,
- CacheGroupContext grp,
- int id) {
+ * @param recovery Flag indicates that partition is created during recovery phase.
+ */
+ public GridDhtLocalPartition(
+ GridCacheSharedContext ctx,
+ CacheGroupContext grp,
+ int id,
+ boolean recovery
+ ) {
super(ENTRY_FACTORY);
this.id = id;
@@ -212,7 +218,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
store = grp.offheap().createCacheDataStore(id);
// Log partition creation for further crash recovery purposes.
- if (grp.walEnabled())
+ if (grp.walEnabled() && !recovery)
ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, state(), updateCounter()));
// Inject row cache cleaner on store creation
@@ -559,6 +565,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to log partition state change to WAL.", e);
+
+ ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
}
if (log.isDebugEnabled())
@@ -973,6 +981,19 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
if (grp.sharedGroup())
grp.onPartitionCounterUpdate(cacheId, id, primaryCntr != null ? primaryCntr : nextCntr, topVer, primary);
+ // This is first update in partition, we should log partition state information for further crash recovery.
+ if (nextCntr == 1) {
+ if (grp.persistenceEnabled() && grp.walEnabled())
+ try {
+ ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, state(), 0));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to log partition state snapshot to WAL.", e);
+
+ ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+ }
+ }
+
return nextCntr;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceba2145/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index f64c1c4..fb30ecd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -864,7 +864,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (loc != null)
loc.awaitDestroy();
- locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p));
+ locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, false));
long updCntr = cntrMap.updateCounter(p);
@@ -895,7 +895,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (part != null && part.state() != EVICTED)
return part;
- part = new GridDhtLocalPartition(ctx, grp, p);
+ part = new GridDhtLocalPartition(ctx, grp, p, true);
locParts.set(p, part);
@@ -972,7 +972,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
"[grp=" + grp.cacheOrGroupName() + ", part=" + p + ", topVer=" + topVer +
", this.topVer=" + this.readyTopVer + ']');
- locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p));
+ locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, false));
this.updateSeq.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceba2145/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index c74972a..ed54f65 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2989,7 +2989,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
while (!isCancelled()) {
waitCheckpointEvent();
- if (skipCheckpointOnNodeStop && (isStopping() || shutdownNow)) {
+ if (skipCheckpointOnNodeStop && (isCancelled() || shutdownNow)) {
if (log.isInfoEnabled())
log.warning("Skipping last checkpoint because node is stopping.");
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceba2145/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionsStateRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionsStateRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionsStateRecoveryTest.java
new file mode 100644
index 0000000..112fc59
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionsStateRecoveryTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.util.Arrays;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+/**
+ *
+ */
+public class IgnitePdsPartitionsStateRecoveryTest extends GridCommonAbstractTest {
+ /** Cache name. */
+ private static final String CACHE = "cache";
+
+ /** Partitions count. */
+ private static final int PARTS_CNT = 32;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+ .setWalMode(WALMode.LOG_ONLY)
+ .setWalSegmentSize(16 * 1024 * 1024)
+ .setCheckpointFrequency(20 * 60 * 1000)
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setMaxSize(512 * 1024 * 1024)
+ .setPersistenceEnabled(true)
+ );
+
+ cfg.setDataStorageConfiguration(dsCfg);
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE)
+ .setBackups(0)
+ .setRebalanceMode(CacheRebalanceMode.NONE) // Disable rebalance to prevent owning MOVING partitions.
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT));
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ System.setProperty(GridCacheDatabaseSharedManager.IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, "true");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ System.clearProperty(GridCacheDatabaseSharedManager.IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP);
+ }
+
+ /**
+ * Test checks that partition state is recovered properly if last checkpoint was skipped and there are logical updates to apply.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPartitionsStateConsistencyAfterRecovery() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ IgniteCache<Object, Object> cache = ignite.getOrCreateCache(CACHE);
+
+ for (int key = 0; key < 4096; key++)
+ cache.put(key, key);
+
+ forceCheckpoint();
+
+ for (int key = 0; key < 4096; key++) {
+ int[] payload = new int[4096];
+ Arrays.fill(payload, key);
+
+ cache.put(key, payload);
+ }
+
+ GridDhtPartitionTopology topology = ignite.cachex(CACHE).context().topology();
+
+ Assert.assertFalse(topology.hasMovingPartitions());
+
+ log.info("Stopping grid...");
+
+ stopGrid(0);
+
+ ignite = startGrid(0);
+
+ awaitPartitionMapExchange();
+
+ topology = ignite.cachex(CACHE).context().topology();
+
+ Assert.assertFalse("Node restored moving partitions after join to topology.", topology.hasMovingPartitions());
+ }
+
+ /**
+ * Test checks that partition state is recovered properly if only logical updates exist.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPartitionsStateConsistencyAfterRecoveryNoCheckpoints() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ IgniteCache<Object, Object> cache = ignite.getOrCreateCache(CACHE);
+
+ forceCheckpoint();
+
+ for (int key = 0; key < 4096; key++) {
+ int[] payload = new int[4096];
+ Arrays.fill(payload, key);
+
+ cache.put(key, payload);
+ }
+
+ GridDhtPartitionTopology topology = ignite.cachex(CACHE).context().topology();
+
+ Assert.assertFalse(topology.hasMovingPartitions());
+
+ log.info("Stopping grid...");
+
+ stopGrid(0);
+
+ ignite = startGrid(0);
+
+ awaitPartitionMapExchange();
+
+ topology = ignite.cachex(CACHE).context().topology();
+
+ Assert.assertFalse("Node restored moving partitions after join to topology.", topology.hasMovingPartitions());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceba2145/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 75a8af7..c12f515 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorrupte
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsExchangeDuringCheckpointTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPageSizesTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPartitionFilesDestroyTest;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPartitionsStateRecoveryTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreDataStructuresTest;
import org.apache.ignite.internal.processors.cache.persistence.IgniteRebalanceScheduleResendPartitionsTest;
import org.apache.ignite.internal.processors.cache.persistence.LocalWacModeNoChangeDuringRebalanceOnNonNodeAssignTest;
@@ -194,5 +195,7 @@ public class IgnitePdsTestSuite2 extends TestSuite {
suite.addTestSuite(WalRolloverTypesTest.class);
suite.addTestSuite(FsyncWalRolloverDoesNotBlockTest.class);
+
+ suite.addTestSuite(IgnitePdsPartitionsStateRecoveryTest.class);
}
}