You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/08/17 18:29:15 UTC
[ignite] branch ignite-2.7.6_12081 updated: IGNITE-12081 Page
replacement can reload invalid page during checkpoint
This is an automated email from the ASF dual-hosted git repository.
dgovorukhin pushed a commit to branch ignite-2.7.6_12081
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-2.7.6_12081 by this push:
new 710ccff IGNITE-12081 Page replacement can reload invalid page during checkpoint
710ccff is described below
commit 710ccff10dd9fed72f50d8225da61c60d3dd0a3b
Author: Dmitriy Govorukhin <dm...@gmail.com>
AuthorDate: Sat Aug 17 21:29:00 2019 +0300
IGNITE-12081 Page replacement can reload invalid page during checkpoint
---
.../internal/pagemem/wal/record/PageSnapshot.java | 6 -
.../GridCacheDatabaseSharedManager.java | 80 +++---
...eplacedPageWriter.java => PageStoreWriter.java} | 23 +-
...eWrite.java => DelayedDirtyPageStoreWrite.java} | 14 +-
.../pagemem/DelayedPageReplacementTracker.java | 19 +-
.../cache/persistence/pagemem/PageMemoryEx.java | 19 +-
.../cache/persistence/pagemem/PageMemoryImpl.java | 161 ++++++++----
.../cache/persistence/tree/io/PageIO.java | 31 ++-
.../IgnitePdsRecoveryAfterFileCorruptionTest.java | 46 ++--
...CheckpointSimulationWithRealCpDisabledTest.java | 15 +-
.../IgnitePageMemReplaceDelayedWriteUnitTest.java | 7 +-
.../persistence/pagemem/PageMemoryImplTest.java | 272 +++++++++++++++++++--
12 files changed, 522 insertions(+), 171 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
index d3a465d..7be735f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
@@ -19,8 +19,6 @@ package org.apache.ignite.internal.pagemem.wal.record;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.util.Arrays;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.util.GridUnsafe;
@@ -101,10 +99,6 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{
+ "],\nsuper = ["
+ super.toString() + "]]";
}
- catch (IgniteCheckedException ignored) {
- return "Error during call'toString' of PageSnapshot [fullPageId=" + fullPageId() +
- ", pageData = " + Arrays.toString(pageData) + ", super=" + super.toString() + "]";
- }
finally {
GridUnsafe.cleanDirectBuffer(buf);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index c4abf8f..3baef37 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -177,7 +177,10 @@ import static org.apache.ignite.failure.FailureType.SYSTEM_CRITICAL_OPERATION_TI
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
+import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType;
+import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getVersion;
import static org.apache.ignite.internal.util.IgniteUtils.checkpointBufferSize;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
/**
*
@@ -2724,6 +2727,19 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
int cpPagesCnt = 0;
+ PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> {
+ assert tag != PageMemoryImpl.TRY_AGAIN_TAG : "Lock is held by other thread for page " + fullPageId;
+
+ int groupId = fullPageId.groupId();
+ long pageId = fullPageId.pageId();
+
+ // Write buf to page store.
+ PageStore store = storeMgr.writeInternal(groupId, pageId, buf, tag, true);
+
+ // Save store for future fsync.
+ updStores.add(store);
+ };
+
for (IgniteBiTuple<PageMemory, Collection<FullPageId>> e : cpEntities) {
PageMemoryEx pageMem = (PageMemoryEx)e.get1();
@@ -2734,20 +2750,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
for (FullPageId fullId : cpPages) {
tmpWriteBuf.rewind();
- Integer tag = pageMem.getForCheckpoint(fullId, tmpWriteBuf, null);
-
- assert tag == null || tag != PageMemoryImpl.TRY_AGAIN_TAG :
- "Lock is held by other thread for page " + fullId;
-
- if (tag != null) {
- tmpWriteBuf.rewind();
-
- PageStore store = storeMgr.writeInternal(fullId.groupId(), fullId.pageId(), tmpWriteBuf, tag, true);
-
- tmpWriteBuf.rewind();
-
- updStores.add(store);
- }
+ // Write page content to page store via pageStoreWriter.
+ // Tracker is null, because no need to track checkpoint metrics on recovery.
+ pageMem.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, null);
}
}
@@ -4135,11 +4140,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @return pagesToRetry Pages which should be retried.
*/
private List<FullPageId> writePages(Collection<FullPageId> writePageIds) throws IgniteCheckedException {
- ByteBuffer tmpWriteBuf = threadBuf.get();
+ List<FullPageId> pagesToRetry = new ArrayList<>();
- long writeAddr = GridUnsafe.bufferAddress(tmpWriteBuf);
+ CheckpointMetricsTracker tracker = persStoreMetrics.metricsEnabled() ? this.tracker : null;
- List<FullPageId> pagesToRetry = new ArrayList<>();
+ PageStoreWriter pageStoreWriter = createPageStoreWriter(pagesToRetry);
+
+ ByteBuffer tmpWriteBuf = threadBuf.get();
for (FullPageId fullId : writePageIds) {
if (checkpointer.shutdownNow)
@@ -4171,18 +4178,35 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
pageMem = (PageMemoryEx)region.pageMemory();
}
- Integer tag = pageMem.getForCheckpoint(
- fullId, tmpWriteBuf, persStoreMetrics.metricsEnabled() ? tracker : null);
+ pageMem.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, tracker);
+ }
+
+ return pagesToRetry;
+ }
- if (tag != null) {
+ /**
+ * Factory method for create {@link PageStoreWriter}.
+ *
+ * @param pagesToRetry List pages for retry.
+ * @return Checkpoint page write context.
+ */
+ private PageStoreWriter createPageStoreWriter(List<FullPageId> pagesToRetry) {
+ return new PageStoreWriter() {
+ /** {@inheritDoc} */
+ @Override public void writePage(FullPageId fullPageId, ByteBuffer tmpWriteBuf, int tag) throws IgniteCheckedException {
if (tag == PageMemoryImpl.TRY_AGAIN_TAG) {
- pagesToRetry.add(fullId);
+ pagesToRetry.add(fullPageId);
- continue;
+ return;
}
- assert PageIO.getType(tmpWriteBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId());
- assert PageIO.getVersion(tmpWriteBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId());
+ long writeAddr = GridUnsafe.bufferAddress(tmpWriteBuf);
+
+ int groupId = fullPageId.groupId();
+ long pageId = fullPageId.pageId();
+
+ assert getType(tmpWriteBuf) != 0 : "Invalid state. Type is 0! pageId = " + hexLong(pageId);
+ assert getVersion(tmpWriteBuf) != 0 : "Invalid state. Version is 0! pageId = " + hexLong(pageId);
tmpWriteBuf.rewind();
@@ -4201,17 +4225,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
int curWrittenPages = writtenPagesCntr.incrementAndGet();
- snapshotMgr.onPageWrite(fullId, tmpWriteBuf, curWrittenPages, totalPagesToWrite);
+ snapshotMgr.onPageWrite(fullPageId, tmpWriteBuf, curWrittenPages, totalPagesToWrite);
tmpWriteBuf.rewind();
- PageStore store = storeMgr.writeInternal(grpId, fullId.pageId(), tmpWriteBuf, tag, false);
+ PageStore store = storeMgr.writeInternal(groupId, pageId, tmpWriteBuf, tag, false);
updStores.computeIfAbsent(store, k -> new LongAdder()).increment();
}
- }
-
- return pagesToRetry;
+ };
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java
similarity index 51%
rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java
rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java
index 30f9633..d9f1625 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java
@@ -14,22 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+package org.apache.ignite.internal.processors.cache.persistence;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
/**
- * Flush (write) dirty page implementation for freed page during page replacement. When possible, will be called by
- * removePageForReplacement().
+ * Interface for write page to {@link PageStore}.
*/
-public interface ReplacedPageWriter {
+public interface PageStoreWriter {
/**
- * @param fullPageId Full page ID being evicted.
- * @param byteBuf Buffer with page data.
- * @param tag partition update tag, increasing counter.
- * @throws IgniteCheckedException if page write failed.
+ * Callback for write page. {@link PageMemoryEx} will copy page content to buffer before call.
+ *
+ * @param fullPageId Page ID to get byte buffer for. The page ID must be present in the collection returned by
+ * the {@link PageMemoryEx#beginCheckpoint()} method call.
+ * @param buf Temporary buffer to write changes into.
+ * @param tag {@code Partition generation} if data was read, {@code null} otherwise (data already saved to storage).
+ * @throws IgniteCheckedException If write page failed.
*/
- void writePage(FullPageId fullPageId, ByteBuffer byteBuf, int tag) throws IgniteCheckedException;
+ void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteCheckedException;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageStoreWrite.java
similarity index 90%
rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java
rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageStoreWrite.java
index b08ddc2..2061b4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageStoreWrite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
import org.apache.ignite.internal.util.GridUnsafe;
import org.jetbrains.annotations.Nullable;
@@ -28,9 +29,9 @@ import org.jetbrains.annotations.Nullable;
* content without holding segment lock. Page data is copied into temp buffer during {@link #writePage(FullPageId,
* ByteBuffer, int)} and then sent to real implementation by {@link #finishReplacement()}.
*/
-public class DelayedDirtyPageWrite implements ReplacedPageWriter {
+public class DelayedDirtyPageStoreWrite implements PageStoreWriter {
/** Real flush dirty page implementation. */
- private final ReplacedPageWriter flushDirtyPage;
+ private final PageStoreWriter flushDirtyPage;
/** Page size. */
private final int pageSize;
@@ -56,9 +57,12 @@ public class DelayedDirtyPageWrite implements ReplacedPageWriter {
* @param pageSize page size.
* @param tracker tracker to lock/unlock page reads.
*/
- public DelayedDirtyPageWrite(ReplacedPageWriter flushDirtyPage,
- ThreadLocal<ByteBuffer> byteBufThreadLoc, int pageSize,
- DelayedPageReplacementTracker tracker) {
+ public DelayedDirtyPageStoreWrite(
+ PageStoreWriter flushDirtyPage,
+ ThreadLocal<ByteBuffer> byteBufThreadLoc,
+ int pageSize,
+ DelayedPageReplacementTracker tracker
+ ) {
this.flushDirtyPage = flushDirtyPage;
this.pageSize = pageSize;
this.byteBufThreadLoc = byteBufThreadLoc;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
index aa1b061..e1d9137 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
/**
* Delayed page writes tracker. Provides delayed write implementations and allows to check if page is actually being
@@ -36,7 +37,7 @@ public class DelayedPageReplacementTracker {
private final int pageSize;
/** Flush dirty page real implementation. */
- private final ReplacedPageWriter flushDirtyPage;
+ private final PageStoreWriter flushDirtyPage;
/** Logger. */
private final IgniteLogger log;
@@ -57,11 +58,11 @@ public class DelayedPageReplacementTracker {
};
/**
- * Dirty page write for replacement operations thread local. Because page write {@link DelayedDirtyPageWrite} is
+ * Dirty page write for replacement operations thread local. Because page write {@link PageStoreWriter} is
* stateful and not thread safe, this thread local protects from GC pressure on pages replacement. <br> Map is used
* instead of build-in thread local to allow GC to remove delayed writers for alive threads after node stop.
*/
- private final Map<Long, DelayedDirtyPageWrite> delayedPageWriteThreadLocMap = new ConcurrentHashMap<>();
+ private final Map<Long, DelayedDirtyPageStoreWrite> delayedPageWriteThreadLocMap = new ConcurrentHashMap<>();
/**
* @param pageSize Page size.
@@ -69,8 +70,12 @@ public class DelayedPageReplacementTracker {
* @param log Logger.
* @param segmentCnt Segments count.
*/
- public DelayedPageReplacementTracker(int pageSize, ReplacedPageWriter flushDirtyPage,
- IgniteLogger log, int segmentCnt) {
+ public DelayedPageReplacementTracker(
+ int pageSize,
+ PageStoreWriter flushDirtyPage,
+ IgniteLogger log,
+ int segmentCnt
+ ) {
this.pageSize = pageSize;
this.flushDirtyPage = flushDirtyPage;
this.log = log;
@@ -83,9 +88,9 @@ public class DelayedPageReplacementTracker {
/**
* @return delayed page write implementation, finish method to be called to actually write page.
*/
- public DelayedDirtyPageWrite delayedPageWrite() {
+ public DelayedDirtyPageStoreWrite delayedPageWrite() {
return delayedPageWriteThreadLocMap.computeIfAbsent(Thread.currentThread().getId(),
- id -> new DelayedDirtyPageWrite(flushDirtyPage, byteBufThreadLoc, pageSize, this));
+ id -> new DelayedDirtyPageStoreWrite(flushDirtyPage, byteBufThreadLoc, pageSize, this));
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
index af204dd..5cd848f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
@@ -23,9 +23,9 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
-import org.jetbrains.annotations.Nullable;
/**
*
@@ -112,17 +112,22 @@ public interface PageMemoryEx extends PageMemory {
public void finishCheckpoint();
/**
- * Gets page byte buffer for the checkpoint procedure.
+ * Prepare page for write during checkpoint.
+ *{@link PageStoreWriter} will be called when the page will be ready to write.
*
* @param pageId Page ID to get byte buffer for. The page ID must be present in the collection returned by
* the {@link #beginCheckpoint()} method call.
- * @param outBuf Temporary buffer to write changes into.
+ * @param buf Temporary buffer to write changes into.
+ * @param pageWriter Checkpoint page write context.
* @param tracker Checkpoint metrics tracker.
- * @return {@code Partition generation} if data was read, {@code null} otherwise (data already saved to storage).
- * @throws IgniteException If failed to obtain page data.
+ * @throws IgniteCheckedException If failed to obtain page data.
*/
- @Nullable public Integer getForCheckpoint(FullPageId pageId, ByteBuffer outBuf, CheckpointMetricsTracker tracker);
-
+ public void checkpointWritePage(
+ FullPageId pageId,
+ ByteBuffer buf,
+ PageStoreWriter pageWriter,
+ CheckpointMetricsTracker tracker
+ ) throws IgniteCheckedException;
/**
* Marks partition as invalid / outdated.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index 6e00a10..19b90e7 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.freelist.io.PagesListMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
@@ -85,7 +86,6 @@ import org.apache.ignite.internal.util.offheap.GridOffHeapOutOfMemoryException;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.spi.encryption.EncryptionSpi;
import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -246,7 +246,7 @@ public class PageMemoryImpl implements PageMemoryEx {
private OffheapReadWriteLock rwLock;
/** Flush dirty page closure. When possible, will be called by evictPage(). */
- private final ReplacedPageWriter flushDirtyPage;
+ private final PageStoreWriter flushDirtyPage;
/**
* Delayed page replacement (rotation with disk) tracker. Because other thread may require exactly the same page to be loaded from store,
@@ -301,7 +301,7 @@ public class PageMemoryImpl implements PageMemoryEx {
long[] sizes,
GridCacheSharedContext<?, ?> ctx,
int pageSize,
- ReplacedPageWriter flushDirtyPage,
+ PageStoreWriter flushDirtyPage,
@Nullable GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker,
CheckpointLockStateChecker stateChecker,
DataRegionMetricsImpl memMetrics,
@@ -523,7 +523,7 @@ public class PageMemoryImpl implements PageMemoryEx {
// because there is no crc inside them.
Segment seg = segment(grpId, pageId);
- DelayedDirtyPageWrite delayedWriter = delayedPageReplacementTracker != null
+ DelayedDirtyPageStoreWrite delayedWriter = delayedPageReplacementTracker != null
? delayedPageReplacementTracker.delayedPageWrite() : null;
FullPageId fullId = new FullPageId(pageId, grpId);
@@ -717,7 +717,7 @@ public class PageMemoryImpl implements PageMemoryEx {
seg.readLock().unlock();
}
- DelayedDirtyPageWrite delayedWriter = delayedPageReplacementTracker != null
+ DelayedDirtyPageStoreWrite delayedWriter = delayedPageReplacementTracker != null
? delayedPageReplacementTracker.delayedPageWrite() : null;
seg.writeLock().lock();
@@ -1108,8 +1108,13 @@ public class PageMemoryImpl implements PageMemoryEx {
}
/** {@inheritDoc} */
- @Override public Integer getForCheckpoint(FullPageId fullId, ByteBuffer outBuf, CheckpointMetricsTracker tracker) {
- assert outBuf.remaining() == pageSize();
+ @Override public void checkpointWritePage(
+ FullPageId fullId,
+ ByteBuffer buf,
+ PageStoreWriter pageStoreWriter,
+ CheckpointMetricsTracker metricsTracker
+ ) throws IgniteCheckedException {
+ assert buf.remaining() == pageSize();
Segment seg = segment(fullId.groupId(), fullId.pageId());
@@ -1125,21 +1130,13 @@ public class PageMemoryImpl implements PageMemoryEx {
try {
if (!isInCheckpoint(fullId))
- return null;
+ return;
- tag = seg.partGeneration(fullId.groupId(), PageIdUtils.partId(fullId.pageId()));
-
- relPtr = seg.loadedPages.get(
- fullId.groupId(),
- PageIdUtils.effectivePageId(fullId.pageId()),
- tag,
- INVALID_REL_PTR,
- OUTDATED_REL_PTR
- );
+ relPtr = resolveRelativePointer(seg, fullId, tag = generationTag(seg, fullId));
// Page may have been cleared during eviction. We have nothing to do in this case.
if (relPtr == INVALID_REL_PTR)
- return null;
+ return;
if (relPtr != OUTDATED_REL_PTR) {
absPtr = seg.absolute(relPtr);
@@ -1160,19 +1157,10 @@ public class PageMemoryImpl implements PageMemoryEx {
try {
// Double-check.
- relPtr = seg.loadedPages.get(
- fullId.groupId(),
- PageIdUtils.effectivePageId(fullId.pageId()),
- seg.partGeneration(
- fullId.groupId(),
- PageIdUtils.partId(fullId.pageId())
- ),
- INVALID_REL_PTR,
- OUTDATED_REL_PTR
- );
+ relPtr = resolveRelativePointer(seg, fullId, generationTag(seg, fullId));
if (relPtr == INVALID_REL_PTR)
- return null;
+ return;
if (relPtr == OUTDATED_REL_PTR) {
relPtr = refreshOutdatedPage(
@@ -1185,36 +1173,40 @@ public class PageMemoryImpl implements PageMemoryEx {
seg.pool.releaseFreePage(relPtr);
}
- return null;
+ return;
}
finally {
seg.writeLock().unlock();
}
}
- else
- return copyPageForCheckpoint(absPtr, fullId, outBuf, pageSingleAcquire, tracker) ? tag : TRY_AGAIN_TAG;
+
+ copyPageForCheckpoint(absPtr, fullId, buf, tag, pageSingleAcquire, pageStoreWriter, metricsTracker);
}
/**
* @param absPtr Absolute ptr.
* @param fullId Full id.
- * @param outBuf Output buffer to write page content into.
+ * @param buf Buffer for copy page content for future write via {@link PageStoreWriter}.
* @param pageSingleAcquire Page is acquired only once. We don't pin the page second time (until page will not be
* copied) in case checkpoint temporary buffer is used.
- * @param tracker Checkpoint statistics tracker.
- *
- * @return False if someone else holds lock on page.
+ * @param pageStoreWriter Checkpoint page write context.
*/
- private boolean copyPageForCheckpoint(
+ private void copyPageForCheckpoint(
long absPtr,
FullPageId fullId,
- ByteBuffer outBuf,
+ ByteBuffer buf,
+ Integer tag,
boolean pageSingleAcquire,
+ PageStoreWriter pageStoreWriter,
CheckpointMetricsTracker tracker
- ) {
+ ) throws IgniteCheckedException {
assert absPtr != 0;
assert PageHeader.isAcquired(absPtr);
+ // Exception protection flag.
+ // No need to write if exception occurred.
+ boolean canWrite = false;
+
boolean locked = rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS);
if (!locked) {
@@ -1223,7 +1215,11 @@ public class PageMemoryImpl implements PageMemoryEx {
if (!pageSingleAcquire)
PageHeader.releasePage(absPtr);
- return false;
+ buf.clear();
+
+ pageStoreWriter.writePage(fullId, buf, TRY_AGAIN_TAG);
+
+ return;
}
try {
@@ -1238,7 +1234,7 @@ public class PageMemoryImpl implements PageMemoryEx {
long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr);
- copyInBuffer(tmpAbsPtr, outBuf);
+ copyInBuffer(tmpAbsPtr, buf);
GridUnsafe.setMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize(), (byte)0);
@@ -1251,24 +1247,31 @@ public class PageMemoryImpl implements PageMemoryEx {
// and page did not have tmp buffer page.
if (!pageSingleAcquire)
PageHeader.releasePage(absPtr);
-
}
else {
- copyInBuffer(absPtr, outBuf);
+ copyInBuffer(absPtr, buf);
PageHeader.dirty(absPtr, false);
}
- assert PageIO.getType(outBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId());
- assert PageIO.getVersion(outBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId());
-
- memMetrics.onPageWritten();
+ assert PageIO.getType(buf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId());
+ assert PageIO.getVersion(buf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId());
- return true;
+ canWrite = true;
}
finally {
rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS);
+ if (canWrite){
+ buf.rewind();
+
+ pageStoreWriter.writePage(fullId, buf, tag);
+
+ memMetrics.onPageWritten();
+
+ buf.rewind();
+ }
+
// We pinned the page either when allocated the temp buffer, or when resolved abs pointer.
// Must release the page only after write unlock.
PageHeader.releasePage(absPtr);
@@ -1298,6 +1301,38 @@ public class PageMemoryImpl implements PageMemoryEx {
}
}
+ /**
+ * Get current prartition generation tag.
+ *
+ * @param seg Segment.
+ * @param fullId Full page id.
+ * @return Current partition generation tag.
+ */
+ private int generationTag(Segment seg, FullPageId fullId) {
+ return seg.partGeneration(
+ fullId.groupId(),
+ PageIdUtils.partId(fullId.pageId())
+ );
+ }
+
+ /**
+ * Resolver relative pointer via {@link LoadedPagesMap}.
+ *
+ * @param seg Segment.
+ * @param fullId Full page id.
+ * @param reqVer Required version.
+ * @return Relative pointer.
+ */
+ private long resolveRelativePointer(Segment seg, FullPageId fullId, int reqVer) {
+ return seg.loadedPages.get(
+ fullId.groupId(),
+ PageIdUtils.effectivePageId(fullId.pageId()),
+ reqVer,
+ INVALID_REL_PTR,
+ OUTDATED_REL_PTR
+ );
+ }
+
/** {@inheritDoc} */
@Override public int invalidate(int grpId, int partId) {
int tag = 0;
@@ -1408,6 +1443,30 @@ public class PageMemoryImpl implements PageMemoryEx {
}
/**
+ * @param fullPageId Full page ID to check.
+ * @return {@code true} if the page is contained in the loaded pages table, {@code false} otherwise.
+ */
+ public boolean hasLoadedPage(FullPageId fullPageId) {
+ int grpId = fullPageId.groupId();
+ long pageId = PageIdUtils.effectivePageId(fullPageId.pageId());
+ int partId = PageIdUtils.partId(pageId);
+
+ Segment seg = segment(grpId, pageId);
+
+ seg.readLock().lock();
+
+ try {
+ long res =
+ seg.loadedPages.get(grpId, pageId, seg.partGeneration(grpId, partId), INVALID_REL_PTR, INVALID_REL_PTR);
+
+ return res != INVALID_REL_PTR;
+ }
+ finally {
+ seg.readLock().unlock();
+ }
+ }
+
+ /**
* @param absPtr Absolute pointer to read lock.
* @param fullId Full page ID.
* @param force Force flag.
@@ -2095,7 +2154,7 @@ public class PageMemoryImpl implements PageMemoryEx {
* @return {@code True} if it is ok to replace this page, {@code false} if another page should be selected.
* @throws IgniteCheckedException If failed to write page to the underlying store during eviction.
*/
- private boolean preparePageRemoval(FullPageId fullPageId, long absPtr, ReplacedPageWriter saveDirtyPage) throws IgniteCheckedException {
+ private boolean preparePageRemoval(FullPageId fullPageId, long absPtr, PageStoreWriter saveDirtyPage) throws IgniteCheckedException {
assert writeLock().isHeldByCurrentThread();
// Do not evict cache meta pages.
@@ -2188,7 +2247,7 @@ public class PageMemoryImpl implements PageMemoryEx {
* @throws IgniteCheckedException If failed to evict page.
* @param saveDirtyPage Replaced page writer, implementation to save dirty page to persistent storage.
*/
- private long removePageForReplacement(ReplacedPageWriter saveDirtyPage) throws IgniteCheckedException {
+ private long removePageForReplacement(PageStoreWriter saveDirtyPage) throws IgniteCheckedException {
assert getWriteHoldCount() > 0;
if (!pageReplacementWarned) {
@@ -2360,7 +2419,7 @@ public class PageMemoryImpl implements PageMemoryEx {
* @param cap Capacity.
* @param saveDirtyPage Evicted page writer.
*/
- private long tryToFindSequentially(int cap, ReplacedPageWriter saveDirtyPage) throws IgniteCheckedException {
+ private long tryToFindSequentially(int cap, PageStoreWriter saveDirtyPage) throws IgniteCheckedException {
assert getWriteHoldCount() > 0;
long prevAddr = INVALID_REL_PTR;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
index ee61e25..8341e1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
@@ -21,7 +21,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.spi.encryption.EncryptionSpi;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
@@ -36,17 +35,18 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHan
import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageLockListener;
import org.apache.ignite.internal.processors.cache.tree.CacheIdAwareDataInnerIO;
import org.apache.ignite.internal.processors.cache.tree.CacheIdAwareDataLeafIO;
-import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccCacheIdAwareDataInnerIO;
-import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccCacheIdAwareDataLeafIO;
import org.apache.ignite.internal.processors.cache.tree.CacheIdAwarePendingEntryInnerIO;
import org.apache.ignite.internal.processors.cache.tree.CacheIdAwarePendingEntryLeafIO;
import org.apache.ignite.internal.processors.cache.tree.DataInnerIO;
import org.apache.ignite.internal.processors.cache.tree.DataLeafIO;
-import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataInnerIO;
-import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataLeafIO;
import org.apache.ignite.internal.processors.cache.tree.PendingEntryInnerIO;
import org.apache.ignite.internal.processors.cache.tree.PendingEntryLeafIO;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccCacheIdAwareDataInnerIO;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccCacheIdAwareDataLeafIO;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataInnerIO;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataLeafIO;
import org.apache.ignite.internal.util.GridStringBuilder;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
/**
* Base format for all the page types.
@@ -717,17 +717,22 @@ public abstract class PageIO {
/**
* @param addr Address.
*/
- public static String printPage(long addr, int pageSize) throws IgniteCheckedException {
- PageIO io = getPageIO(addr);
-
+ public static String printPage(long addr, int pageSize) {
GridStringBuilder sb = new GridStringBuilder("Header [\n\ttype=");
- sb.a(getType(addr)).a(" (").a(io.getClass().getSimpleName())
- .a("),\n\tver=").a(getVersion(addr)).a(",\n\tcrc=").a(getCrc(addr))
- .a(",\n\t").a(PageIdUtils.toDetailString(getPageId(addr)))
- .a("\n],\n");
+ try {
+ PageIO io = getPageIO(addr);
- io.printPage(addr, pageSize, sb);
+ sb.a(getType(addr)).a(" (").a(io.getClass().getSimpleName())
+ .a("),\n\tver=").a(getVersion(addr)).a(",\n\tcrc=").a(getCrc(addr))
+ .a(",\n\t").a(PageIdUtils.toDetailString(getPageId(addr)))
+ .a("\n],\n");
+
+ io.printPage(addr, pageSize, sb);
+ }
+ catch (IgniteCheckedException e) {
+ sb.a("Failed to print page: ").a(e.getMessage());
+ }
return sb.toString();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
index 15205e0..4af7298 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheRebalanceMode;
@@ -322,44 +323,43 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract
info("Acquired pages for checkpoint: " + pageIds.size());
try {
- ByteBuffer tmpBuf = ByteBuffer.allocate(mem.pageSize());
-
- tmpBuf.order(ByteOrder.nativeOrder());
-
long begin = System.currentTimeMillis();
long cp = 0;
- long write = 0;
+ AtomicLong write = new AtomicLong();
- for (FullPageId fullId : pages) {
- if (pageIds.contains(fullId)) {
- long cpStart = System.nanoTime();
+ PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> {
+ int groupId = fullPageId.groupId();
+ long pageId = fullPageId.pageId();
- Integer tag = mem.getForCheckpoint(fullId, tmpBuf, null);
+ for (int j = PageIO.COMMON_HEADER_END; j < mem.realPageSize(groupId); j += 4)
+ assertEquals(j + (int)pageId, buf.getInt(j));
- if (tag == null)
- continue;
+ buf.rewind();
- long cpEnd = System.nanoTime();
+ long writeStart = System.nanoTime();
- cp += cpEnd - cpStart;
- tmpBuf.rewind();
+ storeMgr.write(cacheId, pageId, buf, tag);
- for (int j = PageIO.COMMON_HEADER_END; j < mem.realPageSize(fullId.groupId()); j += 4)
- assertEquals(j + (int)fullId.pageId(), tmpBuf.getInt(j));
+ long writeEnd = System.nanoTime();
- tmpBuf.rewind();
+ write.getAndAdd(writeEnd - writeStart);
+ };
- long writeStart = System.nanoTime();
+ ByteBuffer tmpBuf = ByteBuffer.allocate(mem.pageSize());
- storeMgr.write(cacheId, fullId.pageId(), tmpBuf, tag);
+ tmpBuf.order(ByteOrder.nativeOrder());
- long writeEnd = System.nanoTime();
+ for (FullPageId fullId : pages) {
+ if (pageIds.contains(fullId)) {
+ long cpStart = System.nanoTime();
+
+ mem.checkpointWritePage(fullId, tmpBuf, pageStoreWriter, null);
- write += writeEnd - writeStart;
+ long cpEnd = System.nanoTime();
- tmpBuf.rewind();
+ cp += cpEnd - cpStart;
}
}
@@ -370,7 +370,7 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract
long end = System.currentTimeMillis();
info("Written pages in " + (end - begin) + "ms, copy took " + (cp / 1_000_000) + "ms, " +
- "write took " + (write / 1_000_000) + "ms, sync took " + (end - syncStart) + "ms");
+ "write took " + (write.get() / 1_000_000) + "ms, sync took " + (end - syncStart) + "ms");
}
finally {
info("Finishing checkpoint...");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
index 620814f..e72c5ca 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
@@ -65,6 +66,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
@@ -595,7 +597,8 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
buf.rewind();
- mem.getForCheckpoint(fullId, buf, null);
+ mem.checkpointWritePage(fullId, buf, (fullPageId, buffer, tag) -> {
+ }, null);
buf.position(PageIO.COMMON_HEADER_END);
@@ -906,8 +909,16 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
Integer tag;
+ AtomicReference<Integer> tag0 = new AtomicReference<>();
+
+ PageStoreWriter pageStoreWriter = (fullPageId, buf, tagx) -> {
+ tag0.set(tagx);
+ };
+
while (true) {
- tag = mem.getForCheckpoint(fullId, tmpBuf, null);
+ mem.checkpointWritePage(fullId, tmpBuf, pageStoreWriter, null);
+
+ tag = tag0.get();
if (tag != null && tag == PageMemoryImpl.TRY_AGAIN_TAG)
continue;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
index 7b35499..4a8c7b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.persistence.CheckpointWritePr
import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -89,7 +90,7 @@ public class IgnitePageMemReplaceDelayedWriteUnitTest {
AtomicInteger totalEvicted = new AtomicInteger();
- ReplacedPageWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> {
+ PageStoreWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> {
log.info("Evicting " + fullPageId);
assert getLockedPages(fullPageId).contains(fullPageId);
@@ -146,7 +147,7 @@ public class IgnitePageMemReplaceDelayedWriteUnitTest {
AtomicInteger totalEvicted = new AtomicInteger();
- ReplacedPageWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> {
+ PageStoreWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> {
log.info("Evicting " + fullPageId);
assert getSegment(fullPageId).writeLock().isHeldByCurrentThread();
@@ -211,7 +212,7 @@ public class IgnitePageMemReplaceDelayedWriteUnitTest {
* @return implementation for test
*/
@NotNull
- private PageMemoryImpl createPageMemory(IgniteConfiguration cfg, ReplacedPageWriter pageWriter, int pageSize) {
+ private PageMemoryImpl createPageMemory(IgniteConfiguration cfg, PageStoreWriter pageWriter, int pageSize) {
IgniteCacheDatabaseSharedManager db = mock(GridCacheDatabaseSharedManager.class);
when(db.checkpointLockIsHeldByThread()).thenReturn(true);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index cfd9543..c0be692 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -31,30 +33,38 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.NoOpFailureHandler;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.mem.DirectMemoryProvider;
import org.apache.ignite.internal.mem.IgniteOutOfMemoryException;
import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagemem.FullPageId;
-import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
+import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
import org.apache.ignite.internal.util.lang.GridInClosure3X;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
+import org.apache.ignite.spi.eventstorage.NoopEventStorageSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
+import org.junit.Test;
import org.mockito.Mockito;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
import static org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl.CHECKPOINT_POOL_OVERFLOW_ERROR_MSG;
/**
@@ -73,12 +83,13 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
/**
* @throws Exception if failed.
*/
+ @Test
public void testThatAllocationTooMuchPagesCauseToOOMException() throws Exception {
PageMemoryImpl memory = createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED);
try {
while (!Thread.currentThread().isInterrupted())
- memory.allocatePage(1, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX);
+ memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
}
catch (IgniteOutOfMemoryException ignore) {
//Success
@@ -90,6 +101,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ @Test
public void testCheckpointBufferOverusageDontCauseWriteLockLeak() throws Exception {
PageMemoryImpl memory = createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED);
@@ -97,7 +109,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
try {
while (!Thread.currentThread().isInterrupted()) {
- long pageId = memory.allocatePage(1, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX);
+ long pageId = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
FullPageId fullPageId = new FullPageId(pageId, 1);
@@ -143,6 +155,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
* Tests that checkpoint buffer won't be overflowed with enabled CHECKPOINT_BUFFER_ONLY throttling.
* @throws Exception If failed.
*/
+ @Test
public void testCheckpointBufferCantOverflowMixedLoad() throws Exception {
testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY);
}
@@ -151,6 +164,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
* Tests that checkpoint buffer won't be overflowed with enabled SPEED_BASED throttling.
* @throws Exception If failed.
*/
+ @Test
public void testCheckpointBufferCantOverflowMixedLoadSpeedBased() throws Exception {
testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy.SPEED_BASED);
}
@@ -159,11 +173,176 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
* Tests that checkpoint buffer won't be overflowed with enabled TARGET_RATIO_BASED throttling.
* @throws Exception If failed.
*/
+ @Test
public void testCheckpointBufferCantOverflowMixedLoadRatioBased() throws Exception {
testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED);
}
/**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testCheckpointProtocolWriteDirtyPageAfterWriteUnlock() throws Exception {
+ TestPageStoreManager pageStoreMgr = new TestPageStoreManager();
+
+ // Create a 1 mb page memory.
+ PageMemoryImpl memory = createPageMemory(
+ 1,
+ PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED,
+ pageStoreMgr,
+ pageStoreMgr
+ );
+
+ int initPageCnt = 10;
+
+ List<FullPageId> allocated = new ArrayList<>(initPageCnt);
+
+ for (int i = 0; i < initPageCnt; i++) {
+ long id = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
+
+ FullPageId fullId = new FullPageId(id, 1);
+
+ allocated.add(fullId);
+
+ writePage(memory, fullId, (byte)1);
+ }
+
+ doCheckpoint(memory.beginCheckpoint(), memory, pageStoreMgr);
+
+ FullPageId cowPageId = allocated.get(0);
+
+ // Mark some pages as dirty.
+ writePage(memory, cowPageId, (byte)2);
+
+ GridMultiCollectionWrapper<FullPageId> cpPages = memory.beginCheckpoint();
+
+ assertEquals(1, cpPages.size());
+
+ // At this point COW mechanics kicks in.
+ writePage(memory, cowPageId, (byte)3);
+
+ doCheckpoint(cpPages, memory, pageStoreMgr);
+
+ byte[] data = pageStoreMgr.storedPages.get(cowPageId);
+
+ for (int i = PageIO.COMMON_HEADER_END; i < PAGE_SIZE; i++)
+ assertEquals(2, data[i]);
+ }
+
+ /**
+ * @param cpPages Checkpoint pages acuiqred by {@code beginCheckpoint()}.
+ * @param memory Page memory.
+ * @param pageStoreMgr Test page store manager.
+ * @throws Exception If failed.
+ */
+ private void doCheckpoint(
+ GridMultiCollectionWrapper<FullPageId> cpPages,
+ PageMemoryImpl memory,
+ TestPageStoreManager pageStoreMgr
+ ) throws Exception {
+ PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> {
+ assertNotNull(tag);
+
+ pageStoreMgr.write(fullPageId.groupId(), fullPageId.pageId(), buf, 1);
+ };
+
+ for (FullPageId cpPage : cpPages) {
+ byte[] data = new byte[PAGE_SIZE];
+
+ ByteBuffer buf = ByteBuffer.wrap(data);
+
+ memory.checkpointWritePage(cpPage, buf, pageStoreWriter, null);
+ }
+
+ memory.finishCheckpoint();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testCheckpointProtocolCannotReplaceUnwrittenPage() throws Exception {
+ TestPageStoreManager pageStoreMgr = new TestPageStoreManager();
+
+ // Create a 1 mb page memory.
+ PageMemoryImpl memory = createPageMemory(
+ 1,
+ PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED,
+ pageStoreMgr,
+ pageStoreMgr);
+
+ int initPageCnt = 500;
+
+ List<FullPageId> allocated = new ArrayList<>(initPageCnt);
+
+ for (int i = 0; i < initPageCnt; i++) {
+ long id = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
+
+ FullPageId fullId = new FullPageId(id, 1);
+ allocated.add(fullId);
+
+ writePage(memory, fullId, (byte)1);
+ }
+
+ // CP Write lock.
+ memory.beginCheckpoint();
+ // CP Write unlock.
+
+ byte[] buf = new byte[PAGE_SIZE];
+
+ memory.checkpointWritePage(allocated.get(0), ByteBuffer.wrap(buf),
+ (fullPageId, buf0, tag) -> {
+ assertNotNull(tag);
+
+ boolean oom = false;
+
+ try {
+ // Try force page replacement.
+ while (true) {
+ memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
+ }
+ }
+ catch (IgniteOutOfMemoryException ex) {
+ oom = true;
+ }
+
+ assertTrue("Should oom before check replaced page.", oom);
+
+ assertTrue("Missing page: " + fullPageId, memory.hasLoadedPage(fullPageId));
+ }
+ , null);
+ }
+
+ /**
+ * @param mem Page memory.
+ * @param fullPageId Full page ID to write.
+ * @param val Value to write.
+ * @throws Exception If failed.
+ */
+ private void writePage(PageMemoryImpl mem, FullPageId fullPageId, byte val) throws Exception {
+ int grpId = fullPageId.groupId();
+ long pageId = fullPageId.pageId();
+ long page = mem.acquirePage(grpId, pageId);
+
+ try {
+ long ptr = mem.writeLock(grpId, pageId, page);
+
+ try {
+ new DummyPageIO().initNewPage(ptr, pageId, PAGE_SIZE);
+
+ for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++)
+ PageUtils.putByte(ptr, i, val);
+ }
+ finally {
+ mem.writeUnlock(grpId, pageId, page, Boolean.FALSE, true);
+ }
+ }
+ finally {
+ mem.releasePage(grpId, pageId, page);
+ }
+ }
+
+ /**
* @throws Exception If failed.
*/
private void testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy plc) throws Exception {
@@ -172,7 +351,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
List<FullPageId> pages = new ArrayList<>();
for (int i = 0; i < (MAX_SIZE - 10) * MB / PAGE_SIZE / 2; i++) {
- long pageId = memory.allocatePage(1, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX);
+ long pageId = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
FullPageId fullPageId = new FullPageId(pageId, 1);
@@ -186,12 +365,15 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
CheckpointMetricsTracker mockTracker = Mockito.mock(CheckpointMetricsTracker.class);
for (FullPageId checkpointPage : pages)
- memory.getForCheckpoint(checkpointPage, ByteBuffer.allocate(PAGE_SIZE), mockTracker);
+ memory.checkpointWritePage(checkpointPage, ByteBuffer.allocate(PAGE_SIZE),
+ (fullPageId, buffer, tag) -> {
+ // No-op.
+ }, mockTracker);
memory.finishCheckpoint();
for (int i = (int)((MAX_SIZE - 10) * MB / PAGE_SIZE / 2); i < (MAX_SIZE - 20) * MB / PAGE_SIZE; i++) {
- long pageId = memory.allocatePage(1, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX);
+ long pageId = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
FullPageId fullPageId = new FullPageId(pageId, 1);
@@ -261,11 +443,31 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
* @param throttlingPlc Throttling Policy.
* @throws Exception If creating mock failed.
*/
- private PageMemoryImpl createPageMemory(PageMemoryImpl.ThrottlingPolicy throttlingPlc) throws Exception {
+ private PageMemoryImpl createPageMemory(
+ PageMemoryImpl.ThrottlingPolicy throttlingPlc) throws Exception {
+ return createPageMemory(
+ MAX_SIZE,
+ throttlingPlc,
+ new NoOpPageStoreManager(),
+ (fullPageId, byteBuf, tag) -> {
+ assert false : "No page replacement (rotation with disk) should happen during the test";
+ });
+ }
+
+ /**
+ * @param throttlingPlc Throttling Policy.
+ * @throws Exception If creating mock failed.
+ */
+ private PageMemoryImpl createPageMemory(
+ int maxSize,
+ PageMemoryImpl.ThrottlingPolicy throttlingPlc,
+ IgnitePageStoreManager mgr,
+ PageStoreWriter replaceWriter
+ ) throws Exception {
long[] sizes = new long[5];
for (int i = 0; i < sizes.length; i++)
- sizes[i] = MAX_SIZE * MB / 4;
+ sizes[i] = maxSize * MB / 4;
sizes[4] = 5 * MB;
@@ -275,12 +477,14 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
igniteCfg.setDataStorageConfiguration(new DataStorageConfiguration());
igniteCfg.setFailureHandler(new NoOpFailureHandler());
igniteCfg.setEncryptionSpi(new NoopEncryptionSpi());
+ igniteCfg.setEventStorageSpi(new NoopEventStorageSpi());
GridTestKernalContext kernalCtx = new GridTestKernalContext(new GridTestLog4jLogger(), igniteCfg);
kernalCtx.add(new IgnitePluginProcessor(kernalCtx, igniteCfg, Collections.<PluginProvider>emptyList()));
kernalCtx.add(new GridInternalSubscriptionProcessor(kernalCtx));
kernalCtx.add(new GridEncryptionManager(kernalCtx));
+ kernalCtx.add(new GridEventStorageManager(kernalCtx));
FailureProcessor failureProc = new FailureProcessor(kernalCtx);
@@ -293,7 +497,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
null,
null,
null,
- new NoOpPageStoreManager(),
+ mgr,
new NoOpWALManager(),
null,
new IgniteCacheDatabaseSharedManager(),
@@ -321,17 +525,15 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
sizes,
sharedCtx,
PAGE_SIZE,
- (fullPageId, byteBuf, tag) -> {
- assert false : "No page replacement (rotation with disk) should happen during the test";
- },
+ replaceWriter,
new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
@Override public void applyx(Long page, FullPageId fullId, PageMemoryEx pageMem) {
}
}, new CheckpointLockStateChecker() {
- @Override public boolean checkpointLockIsHeldByThread() {
- return true;
- }
- },
+ @Override public boolean checkpointLockIsHeldByThread() {
+ return true;
+ }
+ },
new DataRegionMetricsImpl(igniteCfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration()),
throttlingPlc,
noThrottle
@@ -341,4 +543,42 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
return mem;
}
+
+ /**
+ *
+ */
+ private static class TestPageStoreManager extends NoOpPageStoreManager implements PageStoreWriter {
+ /** */
+ private Map<FullPageId, byte[]> storedPages = new HashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public void read(int grpId, long pageId, ByteBuffer pageBuf) throws IgniteCheckedException {
+ FullPageId fullPageId = new FullPageId(pageId, grpId);
+
+ byte[] bytes = storedPages.get(fullPageId);
+
+ if (bytes != null)
+ pageBuf.put(bytes);
+ else
+ pageBuf.put(new byte[PAGE_SIZE]);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(int grpId, long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException {
+ byte[] data = new byte[PAGE_SIZE];
+
+ pageBuf.get(data);
+
+ storedPages.put(new FullPageId(pageId, grpId), data);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writePage(FullPageId fullPageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException {
+ byte[] data = new byte[PAGE_SIZE];
+
+ pageBuf.get(data);
+
+ storedPages.put(fullPageId, data);
+ }
+ }
}