You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/10/18 13:48:12 UTC
[ignite] branch master updated: IGNITE-17911 Omit checkpoint start marker if shutdown is forced (#10320)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new bc843a5b40a IGNITE-17911 Omit checkpoint start marker if shutdown is forced (#10320)
bc843a5b40a is described below
commit bc843a5b40a6da0e2bfcb77857bea499ab9a4512
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Tue Oct 18 16:48:00 2022 +0300
IGNITE-17911 Omit checkpoint start marker if shutdown is forced (#10320)
---
.../preloader/GridDhtPartitionsExchangeFuture.java | 12 +-
.../cache/persistence/checkpoint/Checkpointer.java | 39 +-
.../persistence/IgnitePdsCorruptedStoreTest.java | 44 --
.../MaintenancePersistenceTaskTest.java | 5 +-
.../db/wal/IgniteDisableWalOnRebalanceTest.java | 130 ++++
.../db/wal/WalRecoveryTxLogicalRecordsTest.java | 794 ++++++++++-----------
...teWithoutArchiverWalIteratorInvalidCrcTest.java | 7 +-
.../apache/ignite/testframework/GridTestUtils.java | 33 +-
.../ignite/testsuites/IgnitePdsTestSuite4.java | 2 +
9 files changed, 571 insertions(+), 495 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 78df47b7d59..cd4d77c918c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -225,10 +225,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* Busy lock to prevent activities from accessing exchanger while it's stopping. Stopping uses write lock, so every
* {@link #enterBusy()} will be failed as false. But regular operation uses read lock acquired multiple times.
*/
- private ReadWriteLock busyLock;
+ private final ReadWriteLock busyLock;
/** */
- private AtomicBoolean added = new AtomicBoolean(false);
+ private final AtomicBoolean added = new AtomicBoolean(false);
/** Exchange type. */
private volatile ExchangeType exchangeType;
@@ -242,7 +242,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private final CountDownLatch evtLatch = new CountDownLatch(1);
/** Exchange future init method completes this future. */
- private GridFutureAdapter<Boolean> initFut;
+ private final GridFutureAdapter<Boolean> initFut;
/** */
@GridToStringExclude
@@ -252,7 +252,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private boolean init;
/** Last committed cache version before next topology version use. */
- private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
+ private final AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
/**
* Message received from node joining cluster (if this is 'node join' exchange),
@@ -311,7 +311,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private volatile boolean cacheChangeFailureMsgSent;
/** */
- private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap<>();
+ private final ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap<>();
/** Single messages from merged 'node join' exchanges. */
@GridToStringExclude
@@ -383,7 +383,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private final TimeBag timeBag;
/** Start time of exchange. */
- private long startTime = System.currentTimeMillis();
+ private final long startTime = System.currentTimeMillis();
/** Init time of exchange in milliseconds. */
private volatile long initTime;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
index 68c993fc5e2..a870f194810 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
@@ -261,7 +261,7 @@ public class Checkpointer extends GridWorker {
if (skipCheckpointOnNodeStop && (isCancelled() || shutdownNow)) {
if (log.isInfoEnabled())
- log.warning("Skipping last checkpoint because node is stopping.");
+ log.info("Skipping last checkpoint because node is stopping.");
return;
}
@@ -274,7 +274,7 @@ public class Checkpointer extends GridWorker {
this.enableChangeApplied = null;
}
- if (checkpointsEnabled)
+ if (checkpointsEnabled && !shutdownNow)
doCheckpoint();
else {
synchronized (this) {
@@ -437,24 +437,23 @@ public class Checkpointer extends GridWorker {
if (log.isInfoEnabled()) {
long possibleJvmPauseDur = possibleLongJvmPauseDuration(tracker);
- if (log.isInfoEnabled())
- log.info(
- String.format(
- CHECKPOINT_STARTED_LOG_FORMAT,
- chp.cpEntry == null ? "" : chp.cpEntry.checkpointId(),
- chp.cpEntry == null ? "" : chp.cpEntry.checkpointMark(),
- tracker.beforeLockDuration(),
- tracker.lockWaitDuration(),
- tracker.listenersExecuteDuration(),
- tracker.lockHoldDuration(),
- tracker.walCpRecordFsyncDuration(),
- tracker.writeCheckpointEntryDuration(),
- tracker.splitAndSortCpPagesDuration(),
- possibleJvmPauseDur > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms, " : "",
- chp.pagesSize,
- chp.progress.reason()
- )
- );
+ log.info(
+ String.format(
+ CHECKPOINT_STARTED_LOG_FORMAT,
+ chp.cpEntry == null ? "" : chp.cpEntry.checkpointId(),
+ chp.cpEntry == null ? "" : chp.cpEntry.checkpointMark(),
+ tracker.beforeLockDuration(),
+ tracker.lockWaitDuration(),
+ tracker.listenersExecuteDuration(),
+ tracker.lockHoldDuration(),
+ tracker.walCpRecordFsyncDuration(),
+ tracker.writeCheckpointEntryDuration(),
+ tracker.splitAndSortCpPagesDuration(),
+ possibleJvmPauseDur > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms, " : "",
+ chp.pagesSize,
+ chp.progress.reason()
+ )
+ );
}
if (!writePages(tracker, chp.cpPages, chp.progress, this, this::isShutdownNow))
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
index e675f63f825..f4fc049a36e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
@@ -27,7 +27,6 @@ import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheRebalanceMode;
@@ -154,49 +153,6 @@ public class IgnitePdsCorruptedStoreTest extends GridCommonAbstractTest {
return ccfg;
}
- /**
- * @throws Exception If test failed.
- */
- @Test
- public void testNodeInvalidatedWhenPersistenceIsCorrupted() throws Exception {
- Ignite ignite = startGrid(0);
-
- startGrid(1);
-
- ignite.cluster().active(true);
-
- awaitPartitionMapExchange();
-
- IgniteCache<Integer, String> cache1 = ignite.cache(CACHE_NAME1);
-
- for (int i = 0; i < 100; ++i)
- cache1.put(i, String.valueOf(i));
-
- forceCheckpoint();
-
- cache1.put(2, "test");
-
- String nodeName = ignite.name().replaceAll("\\.", "_");
-
- stopAllGrids();
-
- U.delete(file(String.format("db/%s/cache-%s/part-2.bin", nodeName, CACHE_NAME1)));
-
- startGrid(1);
-
- try {
- startGrid(0);
- }
- catch (IgniteCheckedException ex) {
- if (X.hasCause(ex, StorageException.class, IOException.class))
- return; // Success;
-
- throw ex;
- }
-
- waitFailure(StorageException.class);
- }
-
/**
* Test node invalidation when page CRC is wrong and page not found in wal.
*
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MaintenancePersistenceTaskTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MaintenancePersistenceTaskTest.java
index fc9f7a7de55..9079c9bd813 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MaintenancePersistenceTaskTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MaintenancePersistenceTaskTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static java.util.Collections.singletonList;
+import static org.apache.ignite.testframework.GridTestUtils.deleteLastCheckpointEndMarker;
/**
* Tests for maintenance persistence task.
@@ -105,7 +106,9 @@ public class MaintenancePersistenceTaskTest extends GridCommonAbstractTest {
fail(e.getMessage());
}
- server1.close();
+ stopGrid("test1", false);
+
+ deleteLastCheckpointEndMarker(server1);
try {
server1 = startGrid("test1");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteDisableWalOnRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteDisableWalOnRebalanceTest.java
new file mode 100644
index 00000000000..f1087d14a69
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteDisableWalOnRebalanceTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+
+/**
+ * Additional tests for the WAL disabling during rebalance optimization.
+ */
+public class IgniteDisableWalOnRebalanceTest extends GridCommonAbstractTest {
+ /** Number of partitions in the test. */
+ private static final int PARTITION_COUNT = 32;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ )
+ )
+ .setCommunicationSpi(new TestRecordingCommunicationSpi())
+ .setCacheConfiguration(
+ defaultCacheConfiguration()
+ .setBackups(1)
+ .setAffinity(new RendezvousAffinityFunction(false, PARTITION_COUNT))
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ super.beforeTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * Tests a case when WAL for a cache gets disabled during rebalance because a node has no owning partitions, and
+ * then the node is shut down. This should not prevent the node from successfully starting up.
+ */
+ @Test
+ public void testDisabledWalOnRebalance() throws Exception {
+ IgniteEx ignite = startGrids(2);
+
+ ignite.cluster().state(ACTIVE);
+
+ IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+ int entryCnt = PARTITION_COUNT * 200;
+
+ int val = 0;
+
+ for (int k = 0; k < entryCnt; k++)
+ cache.put(k, val++);
+
+ stopGrid(1, false);
+
+ // Need to update all partitions in order to enable rebalance optimization (disabling WAL while rebalancing).
+ for (int k = 0; k < entryCnt; k++)
+ cache.put(k, val++);
+
+ // Block rebalancing process for the specified cache.
+ IgniteEx restartedNode = startGrid(1, (IgniteConfiguration cfg) ->
+ ((TestRecordingCommunicationSpi)cfg.getCommunicationSpi()).blockMessages((node, msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+ return msg0.groupId() == CU.cacheId(DEFAULT_CACHE_NAME);
+ }
+
+ return false;
+ }));
+
+ TestRecordingCommunicationSpi demanderSpi = TestRecordingCommunicationSpi.spi(restartedNode);
+
+ // Wait for the rebalance to start.
+ demanderSpi.waitForBlocked();
+
+ assertFalse(restartedNode.cachex(DEFAULT_CACHE_NAME).context().group().walEnabled());
+
+ // Stop the node and skip the checkpoint.
+ stopGrid(1, true);
+
+ restartedNode = startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ assertTrue(restartedNode.cachex(DEFAULT_CACHE_NAME).context().group().walEnabled());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
index 6b3cf37e261..eaea64bb3db 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.junit.Assert;
@@ -93,7 +94,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
public static final int PARTS = 32;
/** */
- public static final int WAL_HIST_SIZE = 30;
+ private static final int WAL_HIST_SIZE = 30;
/** */
private int pageSize = 4 * 1024;
@@ -148,11 +149,13 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
+ stopAllGrids();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
+ stopAllGrids();
cleanPersistenceDir();
}
@@ -165,71 +168,66 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
ignite.cluster().active(true);
- try {
- GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
- .cache().context().database();
+ GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
+ .cache().context().database();
- dbMgr.enableCheckpoints(false).get();
+ dbMgr.enableCheckpoints(false).get();
- IgniteCache<Integer, IndexedValue> cache = ignite.cache(CACHE_NAME);
+ IgniteCache<Integer, IndexedValue> cache = ignite.cache(CACHE_NAME);
- int txCnt = 100;
+ int txCnt = 100;
- int keysPerTx = 10;
+ int keysPerTx = 10;
- for (int i = 0; i < txCnt; i++) {
- try (Transaction tx = ignite.transactions().txStart()) {
- for (int j = 0; j < keysPerTx; j++) {
- int k = i * keysPerTx + j;
-
- cache.put(k, new IndexedValue(k));
- }
+ for (int i = 0; i < txCnt; i++) {
+ try (Transaction tx = ignite.transactions().txStart()) {
+ for (int j = 0; j < keysPerTx; j++) {
+ int k = i * keysPerTx + j;
- tx.commit();
+ cache.put(k, new IndexedValue(k));
}
+
+ tx.commit();
}
+ }
- for (int i = 0; i < txCnt; i++) {
- for (int j = 0; j < keysPerTx; j++) {
- int k = i * keysPerTx + j;
+ for (int i = 0; i < txCnt; i++) {
+ for (int j = 0; j < keysPerTx; j++) {
+ int k = i * keysPerTx + j;
- assertEquals(k, cache.get(k).value());
- }
+ assertEquals(k, cache.get(k).value());
}
+ }
- stopGrid();
+ stopGrid();
- ignite = startGrid();
+ ignite = startGrid();
- ignite.cluster().active(true);
+ ignite.cluster().active(true);
- cache = ignite.cache(CACHE_NAME);
+ cache = ignite.cache(CACHE_NAME);
- for (int i = 0; i < txCnt; i++) {
- for (int j = 0; j < keysPerTx; j++) {
- int k = i * keysPerTx + j;
+ for (int i = 0; i < txCnt; i++) {
+ for (int j = 0; j < keysPerTx; j++) {
+ int k = i * keysPerTx + j;
- assertEquals(k, cache.get(k).value());
- }
+ assertEquals(k, cache.get(k).value());
}
+ }
- for (int i = 0; i < txCnt; i++) {
- for (int j = 0; j < keysPerTx; j++) {
- int k = i * keysPerTx + j;
+ for (int i = 0; i < txCnt; i++) {
+ for (int j = 0; j < keysPerTx; j++) {
+ int k = i * keysPerTx + j;
- QueryCursor<List<?>> cur = cache.query(
- new SqlFieldsQuery("select sVal from IndexedValue where iVal=?").setArgs(k));
+ QueryCursor<List<?>> cur = cache.query(
+ new SqlFieldsQuery("select sVal from IndexedValue where iVal=?").setArgs(k));
- List<List<?>> vals = cur.getAll();
+ List<List<?>> vals = cur.getAll();
- assertEquals(vals.size(), 1);
- assertEquals("string-" + k, vals.get(0).get(0));
- }
+ assertEquals(vals.size(), 1);
+ assertEquals("string-" + k, vals.get(0).get(0));
}
}
- finally {
- stopAllGrids();
- }
}
/**
@@ -241,268 +239,255 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
ignite.cluster().active(true);
- try {
- GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
- .cache().context().database();
-
- IgniteCache<Integer, IndexedValue> cache = ignite.cache(CACHE_NAME);
-
- int txCnt = 100;
-
- int keysPerTx = 10;
+ GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
+ .cache().context().database();
- for (int i = 0; i < txCnt; i++) {
- try (Transaction tx = ignite.transactions().txStart()) {
- for (int j = 0; j < keysPerTx; j++) {
- int k = i * keysPerTx + j;
+ IgniteCache<Integer, IndexedValue> cache = ignite.cache(CACHE_NAME);
- cache.put(k, new IndexedValue(k));
- }
+ int txCnt = 100;
- tx.commit();
- }
- }
+ int keysPerTx = 10;
- for (int i = 0; i < txCnt; i++) {
+ for (int i = 0; i < txCnt; i++) {
+ try (Transaction tx = ignite.transactions().txStart()) {
for (int j = 0; j < keysPerTx; j++) {
int k = i * keysPerTx + j;
- assertEquals(k, cache.get(k).value());
+ cache.put(k, new IndexedValue(k));
}
+
+ tx.commit();
}
+ }
- dbMgr.waitForCheckpoint("test");
- dbMgr.enableCheckpoints(false).get();
+ for (int i = 0; i < txCnt; i++) {
+ for (int j = 0; j < keysPerTx; j++) {
+ int k = i * keysPerTx + j;
- for (int i = 0; i < txCnt / 2; i++) {
- try (Transaction tx = ignite.transactions().txStart()) {
- for (int j = 0; j < keysPerTx; j++) {
- int k = i * keysPerTx + j;
+ assertEquals(k, cache.get(k).value());
+ }
+ }
- cache.remove(k);
- }
+ dbMgr.waitForCheckpoint("test");
+ dbMgr.enableCheckpoints(false).get();
- tx.commit();
+ for (int i = 0; i < txCnt / 2; i++) {
+ try (Transaction tx = ignite.transactions().txStart()) {
+ for (int j = 0; j < keysPerTx; j++) {
+ int k = i * keysPerTx + j;
+
+ cache.remove(k);
}
+
+ tx.commit();
}
+ }
- stopGrid();
+ stopGrid();
- ignite = startGrid();
+ ignite = startGrid();
- ignite.cluster().active(true);
+ ignite.cluster().active(true);
- cache = ignite.cache(CACHE_NAME);
+ cache = ignite.cache(CACHE_NAME);
- for (int i = 0; i < txCnt; i++) {
- for (int j = 0; j < keysPerTx; j++) {
- int k = i * keysPerTx + j;
+ for (int i = 0; i < txCnt; i++) {
+ for (int j = 0; j < keysPerTx; j++) {
+ int k = i * keysPerTx + j;
- QueryCursor<List<?>> cur = cache.query(
- new SqlFieldsQuery("select sVal from IndexedValue where iVal=?").setArgs(k));
+ QueryCursor<List<?>> cur = cache.query(
+ new SqlFieldsQuery("select sVal from IndexedValue where iVal=?").setArgs(k));
- List<List<?>> vals = cur.getAll();
+ List<List<?>> vals = cur.getAll();
- if (i < txCnt / 2) {
- assertNull(cache.get(k));
- assertTrue(F.isEmpty(vals));
- }
- else {
- assertEquals(k, cache.get(k).value());
+ if (i < txCnt / 2) {
+ assertNull(cache.get(k));
+ assertTrue(F.isEmpty(vals));
+ }
+ else {
+ assertEquals(k, cache.get(k).value());
- assertEquals(1, vals.size());
- assertEquals("string-" + k, vals.get(0).get(0));
- }
+ assertEquals(1, vals.size());
+ assertEquals("string-" + k, vals.get(0).get(0));
}
}
}
- finally {
- stopAllGrids();
- }
}
/**
* @throws Exception if failed.
*/
+ @WithSystemProperty(key = IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
@Test
public void testHistoricalRebalanceIterator() throws Exception {
- System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0");
-
extraCcfg = new CacheConfiguration(CACHE_NAME + "2");
extraCcfg.setAffinity(new RendezvousAffinityFunction(false, PARTS));
Ignite ignite = startGrid();
- try {
- ignite.cluster().active(true);
-
- GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
- .cache().context().database();
+ ignite.cluster().active(true);
- dbMgr.waitForCheckpoint("test");
+ GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
+ .cache().context().database();
- // This number depends on wal history size.
- int entries = 25;
+ dbMgr.waitForCheckpoint("test");
- IgniteCache<Integer, Integer> cache = ignite.cache(CACHE_NAME);
- IgniteCache<Integer, Integer> cache2 = ignite.cache(CACHE_NAME + "2");
+ // This number depends on wal history size.
+ int entries = 25;
- for (int i = 0; i < entries; i++) {
- // Put to partition 0.
- cache.put(i * PARTS, i * PARTS);
+ IgniteCache<Integer, Integer> cache = ignite.cache(CACHE_NAME);
+ IgniteCache<Integer, Integer> cache2 = ignite.cache(CACHE_NAME + "2");
- // Put to partition 1.
- cache.put(i * PARTS + 1, i * PARTS + 1);
+ for (int i = 0; i < entries; i++) {
+ // Put to partition 0.
+ cache.put(i * PARTS, i * PARTS);
- // Put to another cache.
- cache2.put(i, i);
+ // Put to partition 1.
+ cache.put(i * PARTS + 1, i * PARTS + 1);
- dbMgr.waitForCheckpoint("test");
- }
+ // Put to another cache.
+ cache2.put(i, i);
- for (int i = 0; i < entries; i++) {
- assertEquals((Integer)(i * PARTS), cache.get(i * PARTS));
- assertEquals((Integer)(i * PARTS + 1), cache.get(i * PARTS + 1));
- assertEquals((Integer)(i), cache2.get(i));
- }
+ dbMgr.waitForCheckpoint("test");
+ }
- CacheGroupContext grp = ((IgniteEx)ignite).context().cache().cacheGroup(CU.cacheId(CACHE_NAME));
- IgniteCacheOffheapManager offh = grp.offheap();
- AffinityTopologyVersion topVer = grp.affinity().lastVersion();
+ for (int i = 0; i < entries; i++) {
+ assertEquals((Integer)(i * PARTS), cache.get(i * PARTS));
+ assertEquals((Integer)(i * PARTS + 1), cache.get(i * PARTS + 1));
+ assertEquals((Integer)(i), cache2.get(i));
+ }
- IgniteDhtDemandedPartitionsMap map;
+ CacheGroupContext grp = ((IgniteEx)ignite).context().cache().cacheGroup(CU.cacheId(CACHE_NAME));
+ IgniteCacheOffheapManager offh = grp.offheap();
+ AffinityTopologyVersion topVer = grp.affinity().lastVersion();
- for (int i = 0; i < entries; i++) {
- map = new IgniteDhtDemandedPartitionsMap();
- map.addHistorical(0, i, entries, PARTS);
+ IgniteDhtDemandedPartitionsMap map;
- WALPointer ptr = reserveWalPointerForIterator(grp.shared());
+ for (int i = 0; i < entries; i++) {
+ map = new IgniteDhtDemandedPartitionsMap();
+ map.addHistorical(0, i, entries, PARTS);
- try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
- assertNotNull(it);
+ WALPointer ptr = reserveWalPointerForIterator(grp.shared());
- assertTrue("Not historical for iteration: " + i, it.historical(0));
+ try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
+ assertNotNull(it);
- for (int j = i; j < entries; j++) {
- assertTrue("i=" + i + ", j=" + j, it.hasNextX());
+ assertTrue("Not historical for iteration: " + i, it.historical(0));
- CacheDataRow row = it.next();
+ for (int j = i; j < entries; j++) {
+ assertTrue("i=" + i + ", j=" + j, it.hasNextX());
- assertEquals(j * PARTS, (int)row.key().value(grp.cacheObjectContext(), false));
- assertEquals(j * PARTS, (int)row.value().value(grp.cacheObjectContext(), false));
- }
+ CacheDataRow row = it.next();
- assertFalse(it.hasNext());
- }
- finally {
- releaseWalPointerForIterator(grp.shared(), ptr);
+ assertEquals(j * PARTS, (int)row.key().value(grp.cacheObjectContext(), false));
+ assertEquals(j * PARTS, (int)row.value().value(grp.cacheObjectContext(), false));
}
- map = new IgniteDhtDemandedPartitionsMap();
- map.addHistorical(1, i, entries, PARTS);
+ assertFalse(it.hasNext());
+ }
+ finally {
+ releaseWalPointerForIterator(grp.shared(), ptr);
+ }
- ptr = reserveWalPointerForIterator(grp.shared());
+ map = new IgniteDhtDemandedPartitionsMap();
+ map.addHistorical(1, i, entries, PARTS);
- try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
- assertNotNull(it);
+ ptr = reserveWalPointerForIterator(grp.shared());
- assertTrue("Not historical for iteration: " + i, it.historical(1));
+ try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
+ assertNotNull(it);
- for (int j = i; j < entries; j++) {
- assertTrue(it.hasNextX());
+ assertTrue("Not historical for iteration: " + i, it.historical(1));
- CacheDataRow row = it.next();
+ for (int j = i; j < entries; j++) {
+ assertTrue(it.hasNextX());
- assertEquals(j * PARTS + 1, (int)row.key().value(grp.cacheObjectContext(), false));
- assertEquals(j * PARTS + 1, (int)row.value().value(grp.cacheObjectContext(), false));
- }
+ CacheDataRow row = it.next();
- assertFalse(it.hasNext());
- }
- finally {
- releaseWalPointerForIterator(grp.shared(), ptr);
+ assertEquals(j * PARTS + 1, (int)row.key().value(grp.cacheObjectContext(), false));
+ assertEquals(j * PARTS + 1, (int)row.value().value(grp.cacheObjectContext(), false));
}
- }
- stopAllGrids();
+ assertFalse(it.hasNext());
+ }
+ finally {
+ releaseWalPointerForIterator(grp.shared(), ptr);
+ }
+ }
- // Check that iterator is valid after restart.
- ignite = startGrid();
+ stopAllGrids();
- ignite.cluster().active(true);
+ // Check that iterator is valid after restart.
+ ignite = startGrid();
- grp = ((IgniteEx)ignite).context().cache().cacheGroup(CU.cacheId(CACHE_NAME));
- offh = grp.offheap();
- topVer = grp.affinity().lastVersion();
+ ignite.cluster().active(true);
- for (int i = 0; i < entries; i++) {
- long start = System.currentTimeMillis();
+ grp = ((IgniteEx)ignite).context().cache().cacheGroup(CU.cacheId(CACHE_NAME));
+ offh = grp.offheap();
+ topVer = grp.affinity().lastVersion();
- map = new IgniteDhtDemandedPartitionsMap();
- map.addHistorical(0, i, entries, PARTS);
+ for (int i = 0; i < entries; i++) {
+ long start = System.currentTimeMillis();
- WALPointer ptr = reserveWalPointerForIterator(grp.shared());
+ map = new IgniteDhtDemandedPartitionsMap();
+ map.addHistorical(0, i, entries, PARTS);
- try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
- long end = System.currentTimeMillis();
+ WALPointer ptr = reserveWalPointerForIterator(grp.shared());
- info("Time to get iterator: " + (end - start));
+ try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
+ long end = System.currentTimeMillis();
- assertTrue("Not historical for iteration: " + i, it.historical(0));
+ info("Time to get iterator: " + (end - start));
- assertNotNull(it);
+ assertTrue("Not historical for iteration: " + i, it.historical(0));
- start = System.currentTimeMillis();
+ assertNotNull(it);
- for (int j = i; j < entries; j++) {
- assertTrue("i=" + i + ", j=" + j, it.hasNextX());
+ start = System.currentTimeMillis();
- CacheDataRow row = it.next();
+ for (int j = i; j < entries; j++) {
+ assertTrue("i=" + i + ", j=" + j, it.hasNextX());
- assertEquals(j * PARTS, (int)row.key().value(grp.cacheObjectContext(), false));
- assertEquals(j * PARTS, (int)row.value().value(grp.cacheObjectContext(), false));
- }
+ CacheDataRow row = it.next();
- end = System.currentTimeMillis();
+ assertEquals(j * PARTS, (int)row.key().value(grp.cacheObjectContext(), false));
+ assertEquals(j * PARTS, (int)row.value().value(grp.cacheObjectContext(), false));
+ }
- info("Time to iterate: " + (end - start));
+ end = System.currentTimeMillis();
- assertFalse(it.hasNext());
- }
- finally {
- releaseWalPointerForIterator(grp.shared(), ptr);
- }
+ info("Time to iterate: " + (end - start));
- map = new IgniteDhtDemandedPartitionsMap();
- map.addHistorical(1, i, entries, PARTS);
+ assertFalse(it.hasNext());
+ }
+ finally {
+ releaseWalPointerForIterator(grp.shared(), ptr);
+ }
- ptr = reserveWalPointerForIterator(grp.shared());
+ map = new IgniteDhtDemandedPartitionsMap();
+ map.addHistorical(1, i, entries, PARTS);
- try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
- assertNotNull(it);
+ ptr = reserveWalPointerForIterator(grp.shared());
- assertTrue("Not historical for iteration: " + i, it.historical(1));
+ try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
+ assertNotNull(it);
- for (int j = i; j < entries; j++) {
- assertTrue(it.hasNextX());
+ assertTrue("Not historical for iteration: " + i, it.historical(1));
- CacheDataRow row = it.next();
+ for (int j = i; j < entries; j++) {
+ assertTrue(it.hasNextX());
- assertEquals(j * PARTS + 1, (int)row.key().value(grp.cacheObjectContext(), false));
- assertEquals(j * PARTS + 1, (int)row.value().value(grp.cacheObjectContext(), false));
- }
+ CacheDataRow row = it.next();
- assertFalse(it.hasNext());
- }
- finally {
- releaseWalPointerForIterator(grp.shared(), ptr);
+ assertEquals(j * PARTS + 1, (int)row.key().value(grp.cacheObjectContext(), false));
+ assertEquals(j * PARTS + 1, (int)row.value().value(grp.cacheObjectContext(), false));
}
- }
- }
- finally {
- stopAllGrids();
- System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
+ assertFalse(it.hasNext());
+ }
+ finally {
+ releaseWalPointerForIterator(grp.shared(), ptr);
+ }
}
}
@@ -515,38 +500,33 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
ignite.cluster().active(true);
- try {
- GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
- .cache().context().database();
+ GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
+ .cache().context().database();
- dbMgr.enableCheckpoints(false).get();
+ dbMgr.enableCheckpoints(false).get();
- int entries = 100;
+ int entries = 100;
- try (IgniteDataStreamer<Integer, Integer> streamer = ignite.dataStreamer(CACHE_NAME)) {
- for (int i = 0; i < entries; i++)
- streamer.addData(i, i);
- }
+ try (IgniteDataStreamer<Integer, Integer> streamer = ignite.dataStreamer(CACHE_NAME)) {
+ for (int i = 0; i < entries; i++)
+ streamer.addData(i, i);
+ }
- IgniteCache<Integer, Integer> cache = ignite.cache(CACHE_NAME);
+ IgniteCache<Integer, Integer> cache = ignite.cache(CACHE_NAME);
- for (int i = 0; i < entries; i++)
- assertEquals(new Integer(i), cache.get(i));
+ for (int i = 0; i < entries; i++)
+ assertEquals(new Integer(i), cache.get(i));
- stopGrid();
+ stopGrid();
- ignite = startGrid();
+ ignite = startGrid();
- ignite.cluster().active(true);
+ ignite.cluster().active(true);
- cache = ignite.cache(CACHE_NAME);
+ cache = ignite.cache(CACHE_NAME);
- for (int i = 0; i < entries; i++)
- assertEquals(new Integer(i), cache.get(i));
- }
- finally {
- stopAllGrids();
- }
+ for (int i = 0; i < entries; i++)
+ assertEquals(new Integer(i), cache.get(i));
}
/**
@@ -588,57 +568,52 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
*/
@Test
public void testRecoveryRandomPutRemove() throws Exception {
- try {
- pageSize = 1024;
+ pageSize = 1024;
- extraCcfg = new CacheConfiguration(CACHE2_NAME);
- extraCcfg.setAffinity(new RendezvousAffinityFunction(false, PARTS));
+ extraCcfg = new CacheConfiguration(CACHE2_NAME);
+ extraCcfg.setAffinity(new RendezvousAffinityFunction(false, PARTS));
- Ignite ignite = startGrid(0);
+ Ignite ignite = startGrid(0);
- ignite.cluster().active(true);
+ ignite.cluster().active(true);
- GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
- .cache().context().database();
+ GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
+ .cache().context().database();
- dbMgr.enableCheckpoints(false).get();
+ dbMgr.enableCheckpoints(false).get();
- IgniteCache<Integer, IndexedValue> cache1 = ignite.cache(CACHE_NAME);
- IgniteCache<Object, Object> cache2 = ignite.cache(CACHE2_NAME);
+ IgniteCache<Integer, IndexedValue> cache1 = ignite.cache(CACHE_NAME);
+ IgniteCache<Object, Object> cache2 = ignite.cache(CACHE2_NAME);
- final int KEYS1 = 100;
+ final int KEYS1 = 100;
- for (int i = 0; i < KEYS1; i++)
- cache1.put(i, new IndexedValue(i));
+ for (int i = 0; i < KEYS1; i++)
+ cache1.put(i, new IndexedValue(i));
- for (int i = 0; i < KEYS1; i++) {
- if (i % 2 == 0)
- cache1.remove(i);
- }
+ for (int i = 0; i < KEYS1; i++) {
+ if (i % 2 == 0)
+ cache1.remove(i);
+ }
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
- for (int i = 0; i < KEYS1; i++) {
- cache2.put(i, new byte[rnd.nextInt(512)]);
+ for (int i = 0; i < KEYS1; i++) {
+ cache2.put(i, new byte[rnd.nextInt(512)]);
- if (rnd.nextBoolean())
- cache2.put(i, new byte[rnd.nextInt(512)]);
+ if (rnd.nextBoolean())
+ cache2.put(i, new byte[rnd.nextInt(512)]);
- if (rnd.nextBoolean())
- cache2.remove(i);
- }
+ if (rnd.nextBoolean())
+ cache2.remove(i);
+ }
- ignite.close();
+ ignite.close();
- ignite = startGrid(0);
+ ignite = startGrid(0);
- ignite.cluster().active(true);
+ ignite.cluster().active(true);
- ignite.cache(CACHE_NAME).put(1, new IndexedValue(0));
- }
- finally {
- stopAllGrids();
- }
+ ignite.cache(CACHE_NAME).put(1, new IndexedValue(0));
}
/**
@@ -664,55 +639,50 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
*/
@Test
public void testRecoveryNoPageLost3() throws Exception {
- try {
- pageSize = 1024;
- checkpointFreq = 100L;
- extraCcfg = new CacheConfiguration(CACHE2_NAME);
- extraCcfg.setAffinity(new RendezvousAffinityFunction(false, 32));
-
- List<Integer> pages = null;
+ pageSize = 1024;
+ checkpointFreq = 100L;
+ extraCcfg = new CacheConfiguration(CACHE2_NAME);
+ extraCcfg.setAffinity(new RendezvousAffinityFunction(false, 32));
- for (int iter = 0; iter < 5; iter++) {
- log.info("Start node: " + iter);
+ List<Integer> pages = null;
- Ignite ignite = startGrid(0);
+ for (int iter = 0; iter < 5; iter++) {
+ log.info("Start node: " + iter);
- ignite.cluster().active(true);
+ Ignite ignite = startGrid(0);
- if (pages != null) {
- List<Integer> curPags = allocatedPages(ignite, CACHE2_NAME);
+ ignite.cluster().active(true);
- assertEquals("Iter = " + iter, pages, curPags);
- }
+ if (pages != null) {
+ List<Integer> curPags = allocatedPages(ignite, CACHE2_NAME);
- final IgniteCache<Integer, Object> cache = ignite.cache(CACHE2_NAME);
+ assertEquals("Iter = " + iter, pages, curPags);
+ }
- final int ops = ThreadLocalRandom.current().nextInt(10) + 10;
+ final IgniteCache<Integer, Object> cache = ignite.cache(CACHE2_NAME);
- GridTestUtils.runMultiThreaded(new Callable<Void>() {
- @Override public Void call() throws Exception {
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ final int ops = ThreadLocalRandom.current().nextInt(10) + 10;
- for (int i = 0; i < ops; i++) {
- Integer key = rnd.nextInt(1000);
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
- cache.put(key, new byte[rnd.nextInt(512)]);
+ for (int i = 0; i < ops; i++) {
+ Integer key = rnd.nextInt(1000);
- if (rnd.nextBoolean())
- cache.remove(key);
- }
+ cache.put(key, new byte[rnd.nextInt(512)]);
- return null;
+ if (rnd.nextBoolean())
+ cache.remove(key);
}
- }, 10, "update");
- pages = allocatedPages(ignite, CACHE2_NAME);
+ return null;
+ }
+ }, 10, "update");
- Ignition.stop(ignite.name(), false); //will make checkpoint
- }
- }
- finally {
- stopAllGrids();
+ pages = allocatedPages(ignite, CACHE2_NAME);
+
+ Ignition.stop(ignite.name(), false); //will make checkpoint
}
}
@@ -721,46 +691,41 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
private void recoveryNoPageLost(boolean checkpoint) throws Exception {
- try {
- pageSize = 1024;
- extraCcfg = new CacheConfiguration(CACHE2_NAME);
- extraCcfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+ pageSize = 1024;
+ extraCcfg = new CacheConfiguration(CACHE2_NAME);
+ extraCcfg.setAffinity(new RendezvousAffinityFunction(false, 32));
- List<Integer> pages = null;
+ List<Integer> pages = null;
- AtomicInteger cnt = new AtomicInteger();
+ AtomicInteger cnt = new AtomicInteger();
- for (int iter = 0; iter < 5; iter++) {
- log.info("Start node: " + iter);
+ for (int iter = 0; iter < 5; iter++) {
+ log.info("Start node: " + iter);
- Ignite ignite = startGrid(0);
+ Ignite ignite = startGrid(0);
- ignite.cluster().active(true);
+ ignite.cluster().active(true);
- GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
- .cache().context().database();
+ GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
+ .cache().context().database();
- if (!checkpoint)
- dbMgr.enableCheckpoints(false).get();
+ if (!checkpoint)
+ dbMgr.enableCheckpoints(false).get();
- if (pages != null) {
- List<Integer> curPags = allocatedPages(ignite, CACHE2_NAME);
+ if (pages != null) {
+ List<Integer> curPags = allocatedPages(ignite, CACHE2_NAME);
- assertEquals(pages, curPags);
- }
+ assertEquals(pages, curPags);
+ }
- IgniteCache<Integer, Object> cache = ignite.cache(CACHE2_NAME);
+ IgniteCache<Integer, Object> cache = ignite.cache(CACHE2_NAME);
- for (int i = 0; i < 128; i++)
- cache.put(cnt.incrementAndGet(), new byte[256 + iter * 100]);
+ for (int i = 0; i < 128; i++)
+ cache.put(cnt.incrementAndGet(), new byte[256 + iter * 100]);
- pages = allocatedPages(ignite, CACHE2_NAME);
+ pages = allocatedPages(ignite, CACHE2_NAME);
- stopGrid(0, true);
- }
- }
- finally {
- stopAllGrids();
+ stopGrid(0, true);
}
}
@@ -817,71 +782,66 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
*/
@Test
public void testFreeListRecovery() throws Exception {
- try {
- pageSize = 1024;
- extraCcfg = new CacheConfiguration(CACHE2_NAME);
+ pageSize = 1024;
+ extraCcfg = new CacheConfiguration(CACHE2_NAME);
- Ignite ignite = startGrid(0);
+ Ignite ignite = startGrid(0);
- ignite.cluster().active(true);
+ ignite.cluster().active(true);
- IgniteCache<Integer, IndexedValue> cache1 = ignite.cache(CACHE_NAME);
- IgniteCache<Object, Object> cache2 = ignite.cache(CACHE2_NAME);
+ IgniteCache<Integer, IndexedValue> cache1 = ignite.cache(CACHE_NAME);
+ IgniteCache<Object, Object> cache2 = ignite.cache(CACHE2_NAME);
- final int KEYS1 = 2048;
+ final int KEYS1 = 2048;
- for (int i = 0; i < KEYS1; i++)
- cache1.put(i, new IndexedValue(i));
+ for (int i = 0; i < KEYS1; i++)
+ cache1.put(i, new IndexedValue(i));
- for (int i = 0; i < KEYS1; i++) {
- if (i % 2 == 0)
- cache1.remove(i);
- }
+ for (int i = 0; i < KEYS1; i++) {
+ if (i % 2 == 0)
+ cache1.remove(i);
+ }
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
- for (int i = 0; i < KEYS1; i++) {
- cache2.put(i, new byte[rnd.nextInt(512)]);
+ for (int i = 0; i < KEYS1; i++) {
+ cache2.put(i, new byte[rnd.nextInt(512)]);
- if (rnd.nextBoolean())
- cache2.put(i, new byte[rnd.nextInt(512)]);
+ if (rnd.nextBoolean())
+ cache2.put(i, new byte[rnd.nextInt(512)]);
- if (rnd.nextBoolean())
- cache2.remove(i);
- }
+ if (rnd.nextBoolean())
+ cache2.remove(i);
+ }
- Map<Integer, T2<Map<Integer, long[]>, int[]>> cache1_1 = getFreeListData(ignite, CACHE_NAME);
- Map<Integer, T2<Map<Integer, long[]>, int[]>> cache2_1 = getFreeListData(ignite, CACHE2_NAME);
- T2<long[], Integer> rl1_1 = getReuseListData(ignite, CACHE_NAME);
- T2<long[], Integer> rl2_1 = getReuseListData(ignite, CACHE2_NAME);
+ Map<Integer, T2<Map<Integer, long[]>, int[]>> cache1_1 = getFreeListData(ignite, CACHE_NAME);
+ Map<Integer, T2<Map<Integer, long[]>, int[]>> cache2_1 = getFreeListData(ignite, CACHE2_NAME);
+ T2<long[], Integer> rl1_1 = getReuseListData(ignite, CACHE_NAME);
+ T2<long[], Integer> rl2_1 = getReuseListData(ignite, CACHE2_NAME);
- ignite.close();
+ stopGrid(0, false);
- ignite = startGrid(0);
+ ignite = startGrid(0);
- ignite.cluster().active(true);
+ ignite.cluster().active(true);
- cache1 = ignite.cache(CACHE_NAME);
- cache2 = ignite.cache(CACHE2_NAME);
+ cache1 = ignite.cache(CACHE_NAME);
+ cache2 = ignite.cache(CACHE2_NAME);
- for (int i = 0; i < KEYS1; i++) {
- cache1.get(i);
- cache2.get(i);
- }
+ for (int i = 0; i < KEYS1; i++) {
+ cache1.get(i);
+ cache2.get(i);
+ }
- Map<Integer, T2<Map<Integer, long[]>, int[]>> cache1_2 = getFreeListData(ignite, CACHE_NAME);
- Map<Integer, T2<Map<Integer, long[]>, int[]>> cache2_2 = getFreeListData(ignite, CACHE2_NAME);
- T2<long[], Integer> rl1_2 = getReuseListData(ignite, CACHE_NAME);
- T2<long[], Integer> rl2_2 = getReuseListData(ignite, CACHE2_NAME);
+ Map<Integer, T2<Map<Integer, long[]>, int[]>> cache1_2 = getFreeListData(ignite, CACHE_NAME);
+ Map<Integer, T2<Map<Integer, long[]>, int[]>> cache2_2 = getFreeListData(ignite, CACHE2_NAME);
+ T2<long[], Integer> rl1_2 = getReuseListData(ignite, CACHE_NAME);
+ T2<long[], Integer> rl2_2 = getReuseListData(ignite, CACHE2_NAME);
- checkEquals(cache1_1, cache1_2);
- checkEquals(cache2_1, cache2_2);
- checkEquals(rl1_1, rl1_2);
- checkEquals(rl2_1, rl2_2);
- }
- finally {
- stopAllGrids();
- }
+ checkEquals(cache1_1, cache1_2);
+ checkEquals(cache2_1, cache2_2);
+ checkEquals(rl1_1, rl1_2);
+ checkEquals(rl2_1, rl2_2);
}
/**
@@ -914,76 +874,68 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
/**
* Tests if history iterator work correctly if partition contains missed due to rollback updates.
*/
+ @WithSystemProperty(key = IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
@Test
public void testWalIteratorOverPartitionWithMissingEntries() throws Exception {
- System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0");
-
- try {
- Ignite ignite = startGrid();
+ Ignite ignite = startGrid();
- ignite.cluster().active(true);
+ ignite.cluster().active(true);
- awaitPartitionMapExchange();
+ awaitPartitionMapExchange();
- int totalKeys = 30;
+ int totalKeys = 30;
- final int part = 1;
+ final int part = 1;
- List<Integer> keys = partitionKeys(ignite.cache(CACHE_NAME), part, totalKeys, 0);
+ List<Integer> keys = partitionKeys(ignite.cache(CACHE_NAME), part, totalKeys, 0);
- ignite.cache(CACHE_NAME).put(keys.get(0), keys.get(0));
- ignite.cache(CACHE_NAME).put(keys.get(1), keys.get(1));
+ ignite.cache(CACHE_NAME).put(keys.get(0), keys.get(0));
+ ignite.cache(CACHE_NAME).put(keys.get(1), keys.get(1));
- int rolledBack = 0;
+ int rolledBack = 0;
- rolledBack += prepareTx(ignite, keys.subList(2, 6));
+ rolledBack += prepareTx(ignite, keys.subList(2, 6));
- for (Integer key : keys.subList(6, 10))
- ignite.cache(CACHE_NAME).put(key, key);
+ for (Integer key : keys.subList(6, 10))
+ ignite.cache(CACHE_NAME).put(key, key);
- rolledBack += prepareTx(ignite, keys.subList(10, 14));
+ rolledBack += prepareTx(ignite, keys.subList(10, 14));
- for (Integer key : keys.subList(14, 20))
- ignite.cache(CACHE_NAME).put(key, key);
+ for (Integer key : keys.subList(14, 20))
+ ignite.cache(CACHE_NAME).put(key, key);
- rolledBack += prepareTx(ignite, keys.subList(20, 25));
+ rolledBack += prepareTx(ignite, keys.subList(20, 25));
- for (Integer key : keys.subList(25, 30))
- ignite.cache(CACHE_NAME).put(key, key);
+ for (Integer key : keys.subList(25, 30))
+ ignite.cache(CACHE_NAME).put(key, key);
- assertEquals(totalKeys - rolledBack, ignite.cache(CACHE_NAME).size());
+ assertEquals(totalKeys - rolledBack, ignite.cache(CACHE_NAME).size());
- // Expecting counters: 1-2, missed 3-6, 7-10, missed 11-14, 15-20, missed 21-25, 26-30
- List<CacheDataRow> rows = rows(ignite, part, 0, 4);
+ // Expecting counters: 1-2, missed 3-6, 7-10, missed 11-14, 15-20, missed 21-25, 26-30
+ List<CacheDataRow> rows = rows(ignite, part, 0, 4);
- assertEquals(2, rows.size());
- assertEquals(keys.get(0), rows.get(0).key().value(null, false));
- assertEquals(keys.get(1), rows.get(1).key().value(null, false));
+ assertEquals(2, rows.size());
+ assertEquals(keys.get(0), rows.get(0).key().value(null, false));
+ assertEquals(keys.get(1), rows.get(1).key().value(null, false));
- rows = rows(ignite, part, 3, 4);
- assertEquals(0, rows.size());
+ rows = rows(ignite, part, 3, 4);
+ assertEquals(0, rows.size());
- rows = rows(ignite, part, 4, 23);
- assertEquals(10, rows.size());
+ rows = rows(ignite, part, 4, 23);
+ assertEquals(10, rows.size());
- int i = 0;
- for (Integer key : keys.subList(6, 10))
- assertEquals(key, rows.get(i++).key().value(null, false));
- for (Integer key : keys.subList(14, 20))
- assertEquals(key, rows.get(i++).key().value(null, false));
+ int i = 0;
+ for (Integer key : keys.subList(6, 10))
+ assertEquals(key, rows.get(i++).key().value(null, false));
+ for (Integer key : keys.subList(14, 20))
+ assertEquals(key, rows.get(i++).key().value(null, false));
- i = 0;
- rows = rows(ignite, part, 16, 26);
- assertEquals(5, rows.size());
- for (Integer key : keys.subList(16, 20))
- assertEquals(key, rows.get(i++).key().value(null, false));
- assertEquals(keys.get(25), rows.get(i).key().value(null, false));
- }
- finally {
- stopAllGrids();
-
- System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
- }
+ i = 0;
+ rows = rows(ignite, part, 16, 26);
+ assertEquals(5, rows.size());
+ for (Integer key : keys.subList(16, 20))
+ assertEquals(key, rows.get(i++).key().value(null, false));
+ assertEquals(keys.get(25), rows.get(i).key().value(null, false));
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteWithoutArchiverWalIteratorInvalidCrcTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteWithoutArchiverWalIteratorInvalidCrcTest.java
index 72a703264d1..d1ea7085ed5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteWithoutArchiverWalIteratorInvalidCrcTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteWithoutArchiverWalIteratorInvalidCrcTest.java
@@ -43,6 +43,7 @@ import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordPurpose.LOGICAL;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordPurpose.PHYSICAL;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
+import static org.apache.ignite.testframework.GridTestUtils.deleteLastCheckpointEndMarker;
/** */
public class IgniteWithoutArchiverWalIteratorInvalidCrcTest extends GridCommonAbstractTest {
@@ -133,7 +134,9 @@ public class IgniteWithoutArchiverWalIteratorInvalidCrcTest extends GridCommonAb
public void nodeShouldStartIfBinaryRecordCorruptedBeforeEndCheckpoint() throws Exception {
startNodeAndPopulate();
- stopGrid(0, true);
+ stopGrid(0, false);
+
+ deleteLastCheckpointEndMarker(ignite);
IgniteWriteAheadLogManager walMgr = ignite.context().cache().context().wal();
@@ -167,7 +170,7 @@ public class IgniteWithoutArchiverWalIteratorInvalidCrcTest extends GridCommonAb
public void nodeShouldNotStartIfLastCheckpointRecordCorrupted() throws Exception {
startNodeAndPopulate();
- stopGrid(0, true);
+ stopGrid(0, false);
IgniteWriteAheadLogManager walMgr = ignite.context().cache().context().wal();
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index d6585b7224d..a609ddea11d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -33,6 +33,8 @@ import java.lang.reflect.Modifier;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermission;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
@@ -48,6 +50,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
@@ -66,6 +69,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.cache.CacheException;
import javax.cache.configuration.Factory;
import javax.management.Attribute;
@@ -97,6 +101,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.util.GridBusyLock;
@@ -125,6 +131,8 @@ import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static java.lang.Long.parseLong;
+import static java.util.Comparator.comparingLong;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_HOME;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
import static org.apache.ignite.ssl.SslContextFactory.DFLT_KEY_ALGORITHM;
@@ -145,7 +153,7 @@ public final class GridTestUtils {
public static final long DFLT_TEST_TIMEOUT = 5 * 60 * 1000;
/** */
- static final String ALPHABETH = "qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM1234567890_";
+ private static final String ALPHABETH = "qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM1234567890_";
/**
* Hook object intervenes to discovery message handling
@@ -2324,6 +2332,29 @@ public final class GridTestUtils {
U.delete(cacheGrpDir);
}
+ /**
+ * Removes the last checkpoint end marker for the given node.
+ *
+ * @param ignite Ignite node.
+ */
+ public static void deleteLastCheckpointEndMarker(IgniteEx ignite) throws IOException {
+ IgniteCacheDatabaseSharedManager dbSharedMgr = ignite.context().cache().context().database();
+
+ Path cpDir = ((GridCacheDatabaseSharedManager)dbSharedMgr).checkpointDirectory().toPath();
+
+ try (Stream<Path> files = Files.list(cpDir)) {
+ Optional<Path> endMarker = files
+ .map(path -> path.getFileName().toString())
+ .filter(fileName -> fileName.endsWith("START.bin"))
+ .max(comparingLong(fileName -> parseLong(fileName.split("-")[0])))
+ .map(fileName -> fileName.replace("START.bin", "END.bin"))
+ .map(cpDir::resolve);
+
+ if (endMarker.isPresent())
+ Files.delete(endMarker.get());
+ }
+ }
+
/**
* {@link Class#getSimpleName()} does not return outer class name prefix for inner classes, for example,
* getSimpleName() returns "RegularDiscovery" instead of "GridDiscoveryManagerSelfTest$RegularDiscovery"
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 5df625c03b5..351b99f1826 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageR
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPartitionPreloadTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsStartWIthEmptyArchive;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsTransactionsHangTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteDisableWalOnRebalanceTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRebalanceRestartTest;
import org.apache.ignite.internal.processors.cache.persistence.file.FileDownloaderTest;
import org.apache.ignite.testframework.GridTestUtils;
@@ -84,6 +85,7 @@ public class IgnitePdsTestSuite4 {
GridTestUtils.addTestIfNeeded(suite, NotOptimizedRebalanceTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, BreakRebalanceChainTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, WalRebalanceRestartTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, IgniteDisableWalOnRebalanceTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsRestartAfterFailedToWriteMetaPageTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsRemoveDuringRebalancingTest.class, ignoredTests);