You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/05/21 09:01:07 UTC
[ignite] branch master updated: IGNITE-13036 Add event for page
replacement start - Fixes #7815.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3298657 IGNITE-13036 Add event for page replacement start - Fixes #7815.
3298657 is described below
commit 32986571b98ba2ae2661ff68dd42ee3cc27d0ad4
Author: zstan <st...@gmail.com>
AuthorDate: Thu May 21 11:58:01 2020 +0300
IGNITE-13036 Add event for page replacement start - Fixes #7815.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../java/org/apache/ignite/events/EventType.java | 140 +++++----
.../ignite/events/PageReplacementStartedEvent.java | 53 ++++
.../cache/persistence/pagemem/PageMemoryImpl.java | 34 +-
...itePdsPageEvictionDuringPartitionClearTest.java | 163 ----------
...PdsPageReplacementDuringPartitionClearTest.java | 343 +++++++++++++++++++++
.../CheckpointFailBeforeWriteMarkTest.java | 2 +-
.../IgnitePageMemReplaceDelayedWriteUnitTest.java | 11 +
.../persistence/pagemem/PageMemoryImplTest.java | 4 +
.../ignite/testsuites/IgnitePdsTestSuite4.java | 4 +-
9 files changed, 521 insertions(+), 233 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index 90f738d..25b1a5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -434,17 +434,17 @@ public interface EventType {
*
* @see CacheEvent
*/
- public static final int EVT_CACHE_ENTRY_CREATED = 60;
+ public static final int EVT_CACHE_ENTRY_CREATED = 60;
- /**
- * Built-in event type: entry destroyed.
- * <p>
- * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
- * internal Ignite events and should not be used by user-defined events.
- *
- * @see CacheEvent
- */
- public static final int EVT_CACHE_ENTRY_DESTROYED = 61;
+ /**
+ * Built-in event type: entry destroyed.
+ * <p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see CacheEvent
+ */
+ public static final int EVT_CACHE_ENTRY_DESTROYED = 61;
/**
* Built-in event type: entry evicted.
@@ -454,57 +454,57 @@ public interface EventType {
*
* @see CacheEvent
*/
- public static final int EVT_CACHE_ENTRY_EVICTED = 62;
-
- /**
- * Built-in event type: object put.
- * <p>
- * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
- * internal Ignite events and should not be used by user-defined events.
- *
- * @see CacheEvent
- */
- public static final int EVT_CACHE_OBJECT_PUT = 63;
-
- /**
- * Built-in event type: object read.
- * <p>
- * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
- * internal Ignite events and should not be used by user-defined events.
- *
- * @see CacheEvent
- */
- public static final int EVT_CACHE_OBJECT_READ = 64;
-
- /**
- * Built-in event type: object removed.
- * <p>
- * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
- * internal Ignite events and should not be used by user-defined events.
- *
- * @see CacheEvent
- */
- public static final int EVT_CACHE_OBJECT_REMOVED = 65;
-
- /**
- * Built-in event type: object locked.
- * <p>
- * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
- * internal Ignite events and should not be used by user-defined events.
- *
- * @see CacheEvent
- */
- public static final int EVT_CACHE_OBJECT_LOCKED = 66;
-
- /**
- * Built-in event type: object unlocked.
- * <p>
- * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
- * internal Ignite events and should not be used by user-defined events.
- *
- * @see CacheEvent
- */
- public static final int EVT_CACHE_OBJECT_UNLOCKED = 67;
+ public static final int EVT_CACHE_ENTRY_EVICTED = 62;
+
+ /**
+ * Built-in event type: object put.
+ * <p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see CacheEvent
+ */
+ public static final int EVT_CACHE_OBJECT_PUT = 63;
+
+ /**
+ * Built-in event type: object read.
+ * <p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see CacheEvent
+ */
+ public static final int EVT_CACHE_OBJECT_READ = 64;
+
+ /**
+ * Built-in event type: object removed.
+ * <p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see CacheEvent
+ */
+ public static final int EVT_CACHE_OBJECT_REMOVED = 65;
+
+ /**
+ * Built-in event type: object locked.
+ * <p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see CacheEvent
+ */
+ public static final int EVT_CACHE_OBJECT_LOCKED = 66;
+
+ /**
+ * Built-in event type: object unlocked.
+ * <p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see CacheEvent
+ */
+ public static final int EVT_CACHE_OBJECT_UNLOCKED = 67;
/**
* Built-in event type: cache object was expired when reading it.
@@ -923,6 +923,24 @@ public interface EventType {
public static final int EVT_CLUSTER_DEACTIVATED = 141;
/**
+ * Built-in event type: page replacement started in one of the data regions. The name of the data region will
+ * be indicated in the event.
+ * <p>
+ * Fired whan all existing free pages are exhausted and Ignite replaces one of the loaded pages with a
+ * cold page from disk.
+ * <p>
+ * When started, page replacement negatively affects performance; it is recommended to monitor page replacement
+ * metrics and set data region size accordingly.
+ * <p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see org.apache.ignite.configuration.DataRegionConfiguration#setMaxSize(long)
+ * @see PageReplacementStartedEvent
+ */
+ public static final int EVT_PAGE_REPLACEMENT_STARTED = 142;
+
+ /**
* Built-in event type: Cluster state changed.
* <p>
* Fired when cluster state changed.
diff --git a/modules/core/src/main/java/org/apache/ignite/events/PageReplacementStartedEvent.java b/modules/core/src/main/java/org/apache/ignite/events/PageReplacementStartedEvent.java
new file mode 100644
index 0000000..a447a06
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/events/PageReplacementStartedEvent.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.events;
+
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ * Event type indicating that page replacement started in one of the configured data regions.
+ *
+ * @see EventType#EVT_PAGE_REPLACEMENT_STARTED
+ */
+public class PageReplacementStartedEvent extends EventAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final String dataRegionName;
+
+ /**
+ * @param node Node on which the event was fired.
+ * @param msg Optional event message.
+ * @param dataRegionName Data region name.
+ */
+ public PageReplacementStartedEvent(ClusterNode node, String msg, String dataRegionName) {
+ super(node, msg, EventType.EVT_PAGE_REPLACEMENT_STARTED);
+
+ this.dataRegionName = dataRegionName;
+ }
+
+ /**
+ * Gets data region name for which page replacement started.
+ *
+ * @return Data region name.
+ */
+ public String dataRegionName() {
+ return dataRegionName;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index b17989cd..a4e3830 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
@@ -41,6 +42,8 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.events.PageReplacementStartedEvent;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -86,6 +89,7 @@ import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.OffheapReadWriteLock;
import org.apache.ignite.internal.util.future.CountDownFuture;
import org.apache.ignite.internal.util.lang.GridInClosure3X;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -247,8 +251,12 @@ public class PageMemoryImpl implements PageMemoryEx {
/** Checkpoint progress provider. Null disables throttling. */
@Nullable private final IgniteOutClosure<CheckpointProgress> cpProgressProvider;
+ /** Field updater. */
+ private static final AtomicIntegerFieldUpdater<PageMemoryImpl> pageReplacementWarnedFieldUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(PageMemoryImpl.class, "pageReplacementWarned");
+
/** Flag indicating page replacement started (rotation with disk), allocating new page requires freeing old one. */
- private volatile boolean pageReplacementWarned;
+ private volatile int pageReplacementWarned;
/** */
private long[] sizes;
@@ -2218,11 +2226,25 @@ public class PageMemoryImpl implements PageMemoryEx {
private long removePageForReplacement(PageStoreWriter saveDirtyPage) throws IgniteCheckedException {
assert getWriteHoldCount() > 0;
- if (!pageReplacementWarned) {
- pageReplacementWarned = true;
-
- U.warn(log, "Page replacements started, pages will be rotated with disk, " +
- "this will affect storage performance (consider increasing DataRegionConfiguration#setMaxSize).");
+ if (pageReplacementWarned == 0) {
+ if (pageReplacementWarnedFieldUpdater.compareAndSet(PageMemoryImpl.this, 0, 1)) {
+ String msg = "Page replacements started, pages will be rotated with disk, this will affect " +
+ "storage performance (consider increasing DataRegionConfiguration#setMaxSize for " +
+ "data region): " + memMetrics.getName();
+
+ U.warn(log, msg);
+
+ if (ctx.gridEvents().isRecordable(EventType.EVT_PAGE_REPLACEMENT_STARTED)) {
+ ctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
+ @Override public void run() {
+ ctx.gridEvents().record(new PageReplacementStartedEvent(
+ ctx.localNode(),
+ msg,
+ memMetrics.getName()));
+ }
+ });
+ }
+ }
}
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsPageEvictionDuringPartitionClearTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsPageEvictionDuringPartitionClearTest.java
deleted file mode 100644
index ad3bf51..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsPageEvictionDuringPartitionClearTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.db;
-
-import java.util.concurrent.Callable;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheRebalanceMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Test;
-
-/**
- *
- */
-public class IgnitePdsPageEvictionDuringPartitionClearTest extends GridCommonAbstractTest {
- /** */
- public static final String CACHE_NAME = "cache";
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME)
- .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
- .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
- .setAffinity(new RendezvousAffinityFunction(false, 128))
- .setRebalanceMode(CacheRebalanceMode.SYNC)
- .setBackups(1);
-
- cfg.setCacheConfiguration(ccfg);
-
- // Intentionally set small page cache size.
- DataStorageConfiguration memCfg = new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(
- new DataRegionConfiguration().setMaxSize(70L * 1024 * 1024).setPersistenceEnabled(true))
- .setWalMode(WALMode.LOG_ONLY);
-
- cfg.setDataStorageConfiguration(memCfg);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return 20 * 60 * 1000;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- System.setProperty(GridCacheDatabaseSharedManager.IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC, "true");
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- System.clearProperty(GridCacheDatabaseSharedManager.IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC);
- }
-
- /**
- * @throws Exception if failed.
- */
- @Test
- public void testPageEvictionOnNodeStart() throws Exception {
- for (int r = 0; r < 3; r++) {
- cleanPersistenceDir();
-
- startGrids(2);
-
- try {
- Ignite ig = ignite(0);
-
- ig.active(true);
-
- IgniteDataStreamer<Object, Object> streamer = ig.dataStreamer(CACHE_NAME);
-
- for (int i = 0; i < 300_000; i++) {
- streamer.addData(i, new TestValue(i));
-
- if (i > 0 && i % 10_000 == 0)
- info("Done: " + i);
- }
-
- streamer.flush();
-
- IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- IgniteEx ig = startGrid(2);
-
- info(">>>>>>>>>>>");
- info(">>>>>>>>>>>");
- info(">>>>>>>>>>>");
-
- return ig;
- }
- });
-
- for (int i = 500_000; i < 1_000_000; i++) {
- streamer.addData(i, new TestValue(i));
-
- if (i > 0 && i % 10_000 == 0) {
- info("Done: " + i);
-
- U.sleep(1000);
- }
- }
-
- streamer.close();
-
- fut.get();
- }
- finally {
- stopAllGrids();
-
- cleanPersistenceDir();
- }
- }
- }
-
- /**
- *
- */
- private static class TestValue {
- /** */
- private int id;
-
- /** */
- private byte[] payload = new byte[512];
-
- /**
- * @param id ID.
- */
- private TestValue(int id) {
- this.id = id;
- }
- }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsPageReplacementDuringPartitionClearTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsPageReplacementDuringPartitionClearTest.java
new file mode 100644
index 0000000..e28e5a1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsPageReplacementDuringPartitionClearTest.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.events.CacheRebalancingEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
+import static org.apache.ignite.events.EventType.EVT_PAGE_REPLACEMENT_STARTED;
+
+/**
+ *
+ */
+public class IgnitePdsPageReplacementDuringPartitionClearTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CACHE_NAME = "cache";
+
+ /** Number of partitions in the test. */
+ private static final int PARTS = 128;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setAffinity(new RendezvousAffinityFunction(false, PARTS))
+ .setRebalanceMode(CacheRebalanceMode.SYNC)
+ .setBackups(1);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ // Intentionally set small page cache size.
+ DataStorageConfiguration memCfg = new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setMaxSize(50L * 1024 * 1024).setPersistenceEnabled(true))
+ .setWalMode(WALMode.LOG_ONLY);
+
+ cfg.setDataStorageConfiguration(memCfg);
+
+ cfg.setIncludeEventTypes(EVT_CACHE_REBALANCE_PART_UNLOADED, EVT_PAGE_REPLACEMENT_STARTED);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 20 * 60 * 1000;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ @WithSystemProperty(key = GridCacheDatabaseSharedManager.IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC, value = "true")
+ public void testPageEvictionOnNodeStart() throws Exception {
+ cleanPersistenceDir();
+
+ startGrids(2);
+
+ AtomicBoolean stop = new AtomicBoolean(false);
+
+ try {
+ Ignite ig = ignite(0);
+
+ ig.cluster().active(true);
+
+ ig.cluster().baselineAutoAdjustEnabled(false);
+
+ int last = loadDataUntilPageReplacement(ignite(0), ignite(1));
+
+ IgniteInternalFuture<?> fut = loadAsync(ig, stop, last);
+
+ EvictionListener evictLsnr = new EvictionListener();
+
+ ignite(0).events().localListen(evictLsnr, EVT_CACHE_REBALANCE_PART_UNLOADED);
+ ignite(1).events().localListen(evictLsnr, EVT_CACHE_REBALANCE_PART_UNLOADED);
+
+ IgniteEx igNew = startGrid(2);
+
+ info(">>>>>>>>>>>");
+ info(">>>>>>>>>>>");
+ info(">>>>>>>>>>>");
+
+ igNew.cluster().setBaselineTopology(3);
+
+ awaitPartitionMapExchange();
+
+ Map<ClusterNode, GridLongList> affinityAfter = allPartitions(igNew);
+
+ evictLsnr.waitPartitionsEvicted(igNew.cluster().localNode(), affinityAfter);
+
+ stop.set(true);
+
+ fut.get();
+ }
+ finally {
+ stop.set(true);
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+ }
+
+ /**
+ * @param ig1 Ignite to load.
+ * @param ig2 Ignite to load.
+ * @return Start index for next updates.
+ * @throws IgniteCheckedException If failed.
+ */
+ private int loadDataUntilPageReplacement(Ignite ig1, Ignite ig2) throws IgniteCheckedException {
+ AtomicInteger idx = new AtomicInteger();
+
+ CoundDownFilter lsnr = new CoundDownFilter(EVT_PAGE_REPLACEMENT_STARTED, 2);
+
+ ig1.events().localListen(lsnr, EVT_PAGE_REPLACEMENT_STARTED);
+ ig2.events().localListen(lsnr, EVT_PAGE_REPLACEMENT_STARTED);
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new GridPlainRunnable() {
+ @Override public void run() {
+ IgniteCache<Object, Object> cache = ig1.cache(CACHE_NAME);
+
+ while (!lsnr.isReady()) {
+ int start = idx.getAndAdd(100);
+
+ Map<Integer, TestValue> putMap = new HashMap<>(100, 1.f);
+
+ for (int i = 0; i < 100; i++)
+ putMap.put(start + i, new TestValue(start + i));
+
+ cache.putAll(putMap);
+ }
+ }
+ }, Runtime.getRuntime().availableProcessors(), "initial-load-runner");
+
+ fut.get();
+
+ return idx.get();
+ }
+
+ /**
+ * Calculates mapping from nodes to partitions.
+ *
+ * @param ig Ignite instance.
+ * @return Map.
+ */
+ private Map<ClusterNode, GridLongList> allPartitions(Ignite ig) {
+ Map<ClusterNode, GridLongList> res = new HashMap<>(ig.cluster().nodes().size(), 1.f);
+
+ Affinity<Object> aff = ig.affinity(CACHE_NAME);
+
+ for (int i = 0; i < PARTS; i++) {
+ Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(i);
+
+ for (ClusterNode node : nodes)
+ res.computeIfAbsent(node, k -> new GridLongList(2)).add(i);
+ }
+
+ return res;
+ }
+
+ /**
+ * @param ig Ignite instance.
+ * @param stopFlag Stop flag.
+ * @param start Load key start index.
+ * @return Completion future.
+ */
+ private IgniteInternalFuture<?> loadAsync(Ignite ig, AtomicBoolean stopFlag, int start) {
+ AtomicInteger generator = new AtomicInteger(start);
+
+ return GridTestUtils.runMultiThreadedAsync(new GridPlainRunnable() {
+ @Override public void run() {
+ IgniteCache<Integer, TestValue> cache = ig.cache(CACHE_NAME);
+
+ while (!stopFlag.get()) {
+ int idx = generator.getAndAdd(100);
+
+ Map<Integer, TestValue> putMap = new HashMap<>(100, 1.f);
+
+ for (int i = 0; i < 100; i++)
+ putMap.put(idx + i, new TestValue(idx + i));
+
+ cache.putAll(putMap);
+ }
+ }
+ }, Runtime.getRuntime().availableProcessors(), "load-runner");
+ }
+
+ /**
+ *
+ */
+ private static class CoundDownFilter implements IgnitePredicate<Event> {
+ /** */
+ private final int evtType;
+
+ /** */
+ private final AtomicInteger cnt;
+
+ /**
+ * @param evtType Event type.
+ */
+ private CoundDownFilter(int evtType, int cnt) {
+ this.evtType = evtType;
+
+ this.cnt = new AtomicInteger(cnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == evtType)
+ cnt.decrementAndGet();
+
+ return cnt.get() > 0;
+ }
+
+ /**
+ * @return Await result.
+ */
+ public boolean isReady() {
+ return cnt.get() <= 0;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class EvictionListener implements IgnitePredicate<Event> {
+ /** */
+ private final GridLongList unloadedParts = new GridLongList();
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CACHE_REBALANCE_PART_UNLOADED) {
+ CacheRebalancingEvent rebEvt = (CacheRebalancingEvent)evt;
+
+ synchronized (this) {
+ unloadedParts.add(rebEvt.partition());
+
+ unloadedParts.sort();
+
+ notifyAll();
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * @param node Node to wait.
+ * @param affinityAfter Affinity after rebalancing.
+ * @throws InterruptedException If calling thread is interrupted.
+ */
+ public void waitPartitionsEvicted(
+ ClusterNode node,
+ Map<ClusterNode, GridLongList> affinityAfter
+ ) throws InterruptedException {
+ GridLongList movedParts = affinityAfter.get(node);
+
+ synchronized (this) {
+ while (!unloadedParts.equals(movedParts))
+ wait();
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestValue {
+ /** */
+ private int id;
+
+ /** */
+ private final byte[] payload = new byte[512];
+
+ /**
+ * @param id ID.
+ */
+ private TestValue(int id) {
+ this.id = id;
+ }
+
+ /**
+ * @return ID.
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * @return Payload.
+ */
+ public boolean hasPayload() {
+ return payload != null;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFailBeforeWriteMarkTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFailBeforeWriteMarkTest.java
index bddfaa7..0178b3d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFailBeforeWriteMarkTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFailBeforeWriteMarkTest.java
@@ -171,7 +171,7 @@ public class CheckpointFailBeforeWriteMarkTest extends GridCommonAbstractTest {
}, 3, "LOAD-DATA");
//and: Page replacement was started.
- assertTrue(waitForCondition(() -> U.field(pageMemory, "pageReplacementWarned"), 60_000));
+ assertTrue(waitForCondition(() -> (int)U.field(pageMemory, "pageReplacementWarned") > 0, 60_000));
//and: Node was failed during checkpoint after write lock was released and before checkpoint marker was stored to disk.
interceptorIOFactory.triggerIOException((file) -> file.getName().contains("START.bin"));
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
index a075c51..886f5a3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.mem.DirectMemoryProvider;
import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagemem.FullPageId;
@@ -51,6 +52,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
+import org.apache.ignite.spi.eventstorage.NoopEventStorageSpi;
import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.NotNull;
@@ -253,6 +255,12 @@ public class IgnitePageMemReplaceDelayedWriteUnitTest {
when(sctx.kernalContext()).thenReturn(kernalCtx);
+ when(sctx.gridEvents()).thenAnswer(new Answer<Object>() {
+ @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return new GridEventStorageManager(kernalCtx);
+ }
+ });
+
DataRegionConfiguration regCfg = cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration();
DataRegionMetricsImpl memMetrics = new DataRegionMetricsImpl(regCfg, kernalCtx.metric(), NO_OP_METRICS);
@@ -283,8 +291,11 @@ public class IgnitePageMemReplaceDelayedWriteUnitTest {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setEncryptionSpi(new NoopEncryptionSpi());
+
cfg.setMetricExporterSpi(new NoopMetricExporterSpi());
+ cfg.setEventStorageSpi(new NoopEventStorageSpi());
+
cfg.setDataStorageConfiguration(
new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index 4be75b7..28d2859 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.failure.NoOpFailureHandler;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.mem.DirectMemoryProvider;
import org.apache.ignite.internal.mem.IgniteOutOfMemoryException;
import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
@@ -65,6 +66,7 @@ import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi;
import org.apache.ignite.spi.systemview.jmx.JmxSystemViewExporterSpi;
+import org.apache.ignite.spi.eventstorage.NoopEventStorageSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -585,6 +587,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
igniteCfg.setEncryptionSpi(new NoopEncryptionSpi());
igniteCfg.setMetricExporterSpi(new NoopMetricExporterSpi());
igniteCfg.setSystemViewExporterSpi(new JmxSystemViewExporterSpi());
+ igniteCfg.setEventStorageSpi(new NoopEventStorageSpi());
GridTestKernalContext kernalCtx = new GridTestKernalContext(new GridTestLog4jLogger(), igniteCfg);
@@ -593,6 +596,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
kernalCtx.add(new GridEncryptionManager(kernalCtx));
kernalCtx.add(new GridMetricManager(kernalCtx));
kernalCtx.add(new GridSystemViewManager(kernalCtx));
+ kernalCtx.add(new GridEventStorageManager(kernalCtx));
FailureProcessor failureProc = new FailureProcessor(kernalCtx);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index d824cd0..f47f910 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -36,7 +36,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRestartA
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSpuriousRebalancingOnNodeJoinTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCancelingTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheWalDisabledOnRebalancingTest;
-import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionDuringPartitionClearTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageReplacementDuringPartitionClearTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPartitionPreloadTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsStartWIthEmptyArchive;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsTransactionsHangTest;
@@ -110,7 +110,7 @@ public class IgnitePdsTestSuite4 {
*/
private static void addRealPageStoreTestsNotForDirectIo(List<Class<?>> suite, Collection<Class> ignoredTests) {
GridTestUtils.addTestIfNeeded(suite, IgnitePdsTransactionsHangTest.class, ignoredTests);
- GridTestUtils.addTestIfNeeded(suite, IgnitePdsPageEvictionDuringPartitionClearTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, IgnitePdsPageReplacementDuringPartitionClearTest.class, ignoredTests);
// Rebalancing test
GridTestUtils.addTestIfNeeded(suite, IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes.class, ignoredTests);