You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/12/16 16:34:18 UTC
[ignite] 01/02: IGNITE-12430 Move PagePool to a separate class
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 4ede2ee2f7f65dba33f263d0fee7cb26709858d7
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Mon Dec 2 19:51:31 2019 +0300
IGNITE-12430 Move PagePool to a separate class
---
.../cache/persistence/pagemem/PageHeader.java | 274 ++++++++++++
.../cache/persistence/pagemem/PageMemoryImpl.java | 487 +--------------------
.../cache/persistence/pagemem/PagePool.java | 243 ++++++++++
.../apache/ignite/internal/util/GridUnsafe.java | 20 +
.../apache/ignite/internal/util/IgniteUtils.java | 15 +
.../cache/persistence/pagemem/PagePoolTest.java | 337 ++++++++++++++
.../ignite/internal/util/IgniteUtilsSelfTest.java | 19 +
.../ignite/testsuites/IgniteBasicTestSuite.java | 2 +
8 files changed, 930 insertions(+), 467 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageHeader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageHeader.java
new file mode 100644
index 0000000..20930ce
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageHeader.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/**
+ *
+ */
+class PageHeader {
+ /** */
+ public static final long PAGE_MARKER = 0x0000000000000001L;
+
+ /** Dirty flag. */
+ private static final long DIRTY_FLAG = 0x0100000000000000L;
+
+ /** Page relative pointer. Does not change once a page is allocated. */
+ private static final int RELATIVE_PTR_OFFSET = 8;
+
+ /** Page ID offset */
+ private static final int PAGE_ID_OFFSET = 16;
+
+ /** Page cache group ID offset. */
+ private static final int PAGE_CACHE_ID_OFFSET = 24;
+
+ /** Page pin counter offset. */
+ private static final int PAGE_PIN_CNT_OFFSET = 28;
+
+ /** Page temp copy buffer relative pointer offset. */
+ private static final int PAGE_TMP_BUF_OFFSET = 40;
+
+ /**
+ * @param absPtr Absolute pointer to initialize.
+ * @param relative Relative pointer to write.
+ */
+ public static void initNew(long absPtr, long relative) {
+ relative(absPtr, relative);
+
+ tempBufferPointer(absPtr, PageMemoryImpl.INVALID_REL_PTR);
+
+ GridUnsafe.putLong(absPtr, PAGE_MARKER);
+ GridUnsafe.putInt(absPtr + PAGE_PIN_CNT_OFFSET, 0);
+ }
+
+ /**
+ * @param absPtr Absolute pointer.
+ * @return Dirty flag.
+ */
+ public static boolean dirty(long absPtr) {
+ return flag(absPtr, DIRTY_FLAG);
+ }
+
+ /**
+ * @param absPtr Page absolute pointer.
+ * @param dirty Dirty flag.
+ * @return Previous value of dirty flag.
+ */
+ public static boolean dirty(long absPtr, boolean dirty) {
+ return flag(absPtr, DIRTY_FLAG, dirty);
+ }
+
+ /**
+ * @param absPtr Absolute pointer.
+ * @param flag Flag mask.
+ * @return Flag value.
+ */
+ private static boolean flag(long absPtr, long flag) {
+ assert (flag & 0xFFFFFFFFFFFFFFL) == 0;
+ assert Long.bitCount(flag) == 1;
+
+ long relPtrWithFlags = GridUnsafe.getLong(absPtr + RELATIVE_PTR_OFFSET);
+
+ return (relPtrWithFlags & flag) != 0;
+ }
+
+ /**
+ * Sets flag.
+ *
+ * @param absPtr Absolute pointer.
+ * @param flag Flag mask.
+ * @param set New flag value.
+ * @return Previous flag value.
+ */
+ private static boolean flag(long absPtr, long flag, boolean set) {
+ assert (flag & 0xFFFFFFFFFFFFFFL) == 0;
+ assert Long.bitCount(flag) == 1;
+
+ long relPtrWithFlags = GridUnsafe.getLong(absPtr + RELATIVE_PTR_OFFSET);
+
+ boolean was = (relPtrWithFlags & flag) != 0;
+
+ if (set)
+ relPtrWithFlags |= flag;
+ else
+ relPtrWithFlags &= ~flag;
+
+ GridUnsafe.putLong(absPtr + RELATIVE_PTR_OFFSET, relPtrWithFlags);
+
+ return was;
+ }
+
+ /**
+ * @param absPtr Page pointer.
+ * @return If page is pinned.
+ */
+ public static boolean isAcquired(long absPtr) {
+ return GridUnsafe.getInt(absPtr + PAGE_PIN_CNT_OFFSET) > 0;
+ }
+
+ /**
+ * @param absPtr Absolute pointer.
+ */
+ public static void acquirePage(long absPtr) {
+ GridUnsafe.incrementAndGetInt(absPtr + PAGE_PIN_CNT_OFFSET);
+ }
+
+ /**
+ * @param absPtr Absolute pointer.
+ */
+ public static int releasePage(long absPtr) {
+ return GridUnsafe.decrementAndGetInt(absPtr + PAGE_PIN_CNT_OFFSET);
+ }
+
+ /**
+ * @param absPtr Absolute pointer.
+ * @return Number of acquires for the page.
+ */
+ public static int pinCount(long absPtr) {
+ return GridUnsafe.getIntVolatile(null, absPtr);
+ }
+
+ /**
+ * Reads relative pointer from the page at the given absolute position.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @return Relative pointer written to the page.
+ */
+ public static long readRelative(long absPtr) {
+ return GridUnsafe.getLong(absPtr + RELATIVE_PTR_OFFSET) & PageMemoryImpl.RELATIVE_PTR_MASK;
+ }
+
+ /**
+ * Writes relative pointer to the page at the given absolute position.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @param relPtr Relative pointer to write.
+ */
+ public static void relative(long absPtr, long relPtr) {
+ GridUnsafe.putLong(absPtr + RELATIVE_PTR_OFFSET, relPtr & PageMemoryImpl.RELATIVE_PTR_MASK);
+ }
+
+ /**
+ * Volatile write for current timestamp to page in {@code absAddr} address.
+ *
+ * @param absPtr Absolute page address.
+ */
+ public static void writeTimestamp(final long absPtr, long tstamp) {
+ tstamp >>= 8;
+
+ GridUnsafe.putLongVolatile(null, absPtr, (tstamp << 8) | 0x01);
+ }
+
+ /**
+ * Read for timestamp from page in {@code absAddr} address.
+ *
+ * @param absPtr Absolute page address.
+ * @return Timestamp.
+ */
+ public static long readTimestamp(final long absPtr) {
+ long markerAndTs = GridUnsafe.getLong(absPtr);
+
+ // Clear last byte as it is occupied by page marker.
+ return markerAndTs & ~0xFF;
+ }
+
+ /**
+ * Sets pointer to checkpoint buffer.
+ *
+ * @param absPtr Page absolute pointer.
+ * @param tmpRelPtr Temp buffer relative pointer or {@link PageMemoryImpl#INVALID_REL_PTR} if page is not copied to checkpoint
+ * buffer.
+ */
+ public static void tempBufferPointer(long absPtr, long tmpRelPtr) {
+ GridUnsafe.putLong(absPtr + PAGE_TMP_BUF_OFFSET, tmpRelPtr);
+ }
+
+ /**
+ * Gets pointer to checkpoint buffer or {@link PageMemoryImpl#INVALID_REL_PTR} if page is not copied to checkpoint buffer.
+ *
+ * @param absPtr Page absolute pointer.
+ * @return Temp buffer relative pointer.
+ */
+ public static long tempBufferPointer(long absPtr) {
+ return GridUnsafe.getLong(absPtr + PAGE_TMP_BUF_OFFSET);
+ }
+
+ /**
+ * Reads page ID from the page at the given absolute position.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @return Page ID written to the page.
+ */
+ public static long readPageId(long absPtr) {
+ return GridUnsafe.getLong(absPtr + PAGE_ID_OFFSET);
+ }
+
+ /**
+ * Writes page ID to the page at the given absolute position.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @param pageId Page ID to write.
+ */
+ private static void pageId(long absPtr, long pageId) {
+ GridUnsafe.putLong(absPtr + PAGE_ID_OFFSET, pageId);
+ }
+
+ /**
+ * Reads cache group ID from the page at the given absolute pointer.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @return Cache group ID written to the page.
+ */
+ private static int readPageGroupId(final long absPtr) {
+ return GridUnsafe.getInt(absPtr + PAGE_CACHE_ID_OFFSET);
+ }
+
+ /**
+ * Writes cache group ID from the page at the given absolute pointer.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @param grpId Cache group ID to write.
+ */
+ private static void pageGroupId(final long absPtr, final int grpId) {
+ GridUnsafe.putInt(absPtr + PAGE_CACHE_ID_OFFSET, grpId);
+ }
+
+ /**
+ * Reads page ID and cache group ID from the page at the given absolute pointer.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @return Full page ID written to the page.
+ */
+ public static FullPageId fullPageId(final long absPtr) {
+ return new FullPageId(readPageId(absPtr), readPageGroupId(absPtr));
+ }
+
+ /**
+ * Writes page ID and cache group ID from the page at the given absolute pointer.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @param fullPageId Full page ID to write.
+ */
+ public static void fullPageId(final long absPtr, final FullPageId fullPageId) {
+ pageId(absPtr, fullPageId.pageId());
+
+ pageGroupId(absPtr, fullPageId.groupId());
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index 7adf1c5..b0cc7bd 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -32,7 +32,6 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -85,7 +84,6 @@ import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.OffheapReadWriteLock;
import org.apache.ignite.internal.util.future.CountDownFuture;
import org.apache.ignite.internal.util.lang.GridInClosure3X;
-import org.apache.ignite.internal.util.offheap.GridOffHeapOutOfMemoryException;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -125,51 +123,18 @@ import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
*/
@SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
public class PageMemoryImpl implements PageMemoryEx {
- /** */
- public static final long PAGE_MARKER = 0x0000000000000001L;
-
- /** Relative pointer chunk index mask. */
- private static final long SEGMENT_INDEX_MASK = 0xFFFFFF0000000000L;
-
/** Full relative pointer mask. */
- private static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL;
-
- /** Dirty flag. */
- private static final long DIRTY_FLAG = 0x0100000000000000L;
+ public static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL;
/** Invalid relative pointer value. */
- private static final long INVALID_REL_PTR = RELATIVE_PTR_MASK;
+ public static final long INVALID_REL_PTR = RELATIVE_PTR_MASK;
/** Pointer which means that this page is outdated (for example, cache was destroyed, partition eviction'd happened */
private static final long OUTDATED_REL_PTR = INVALID_REL_PTR + 1;
- /** Address mask to avoid ABA problem. */
- private static final long ADDRESS_MASK = 0xFFFFFFFFFFFFFFL;
-
- /** Counter mask to avoid ABA problem. */
- private static final long COUNTER_MASK = ~ADDRESS_MASK;
-
- /** Counter increment to avoid ABA problem. */
- private static final long COUNTER_INC = ADDRESS_MASK + 1;
-
- /** Page relative pointer. Does not change once a page is allocated. */
- public static final int RELATIVE_PTR_OFFSET = 8;
-
- /** Page ID offset */
- public static final int PAGE_ID_OFFSET = 16;
-
- /** Page cache group ID offset. */
- public static final int PAGE_CACHE_ID_OFFSET = 24;
-
- /** Page pin counter offset. */
- public static final int PAGE_PIN_CNT_OFFSET = 28;
-
/** Page lock offset. */
public static final int PAGE_LOCK_OFFSET = 32;
- /** Page temp copy buffer relative pointer offset. */
- public static final int PAGE_TMP_BUF_OFFSET = 40;
-
/**
* 8b Marker/timestamp
* 8b Relative pointer
@@ -206,20 +171,17 @@ public class PageMemoryImpl implements PageMemoryEx {
/** Checkpoint lock state provider. */
private final CheckpointLockStateChecker stateChecker;
- /** Number of used pages in checkpoint buffer. */
- private final AtomicInteger cpBufPagesCntr = new AtomicInteger(0);
-
/** Use new implementation of loaded pages table: 'Robin Hood hashing: backward shift deletion'. */
private final boolean useBackwardShiftMap
= IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_LOADED_PAGES_BACKWARD_SHIFT_MAP, true);
/** */
- private ExecutorService asyncRunner = new ThreadPoolExecutor(
+ private final ExecutorService asyncRunner = new ThreadPoolExecutor(
0,
Runtime.getRuntime().availableProcessors(),
30L,
TimeUnit.SECONDS,
- new ArrayBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors()));
+ new ArrayBlockingQueue<>(Runtime.getRuntime().availableProcessors()));
/** Page store manager. */
private IgnitePageStoreManager storeMgr;
@@ -243,7 +205,7 @@ public class PageMemoryImpl implements PageMemoryEx {
private volatile Segment[] segments;
/** Lock for segments changes. */
- private Object segmentsLock = new Object();
+ private final Object segmentsLock = new Object();
/** */
private PagePool checkpointPool;
@@ -378,7 +340,7 @@ public class PageMemoryImpl implements PageMemoryEx {
DirectMemoryRegion cpReg = regions.get(regs - 1);
- checkpointPool = new PagePool(regs - 1, cpReg, cpBufPagesCntr);
+ checkpointPool = new PagePool(regs - 1, cpReg, sysPageSize, rwLock);
long checkpointBuf = cpReg.size();
@@ -582,7 +544,7 @@ public class PageMemoryImpl implements PageMemoryEx {
assert !PageHeader.isAcquired(absPtr) :
"Pin counter must be 0 for a new page [relPtr=" + U.hexLong(relPtr) +
- ", absPtr=" + U.hexLong(absPtr) + ", pinCntr=" + GridUnsafe.getInt(absPtr + PAGE_PIN_CNT_OFFSET) + ']';
+ ", absPtr=" + U.hexLong(absPtr) + ", pinCntr=" + PageHeader.pinCount(absPtr) + ']';
setDirty(fullId, absPtr, true, true);
@@ -676,21 +638,21 @@ public class PageMemoryImpl implements PageMemoryEx {
}
/** {@inheritDoc} */
- @Override public boolean freePage(int grpId, long pageId) throws IgniteCheckedException {
+ @Override public boolean freePage(int grpId, long pageId) {
assert false : "Free page should be never called directly when persistence is enabled.";
return false;
}
/** {@inheritDoc} */
- @Override public long metaPageId(int grpId) throws IgniteCheckedException {
+ @Override public long metaPageId(int grpId) {
assert started;
return storeMgr.metaPageId(grpId);
}
/** {@inheritDoc} */
- @Override public long partitionMetaPageId(int grpId, int partId) throws IgniteCheckedException {
+ @Override public long partitionMetaPageId(int grpId, int partId) {
assert started;
return PageIdUtils.pageId(partId, PageIdAllocator.FLAG_DATA, 0);
@@ -1103,9 +1065,8 @@ public class PageMemoryImpl implements PageMemoryEx {
double res = 0;
- for (Segment segment : segments) {
+ for (Segment segment : segments)
res = Math.max(res, segment.getDirtyPagesRatio());
- }
return res;
}
@@ -1119,9 +1080,8 @@ public class PageMemoryImpl implements PageMemoryEx {
long res = 0;
- for (Segment segment : segments) {
+ for (Segment segment : segments)
res += segment.pages();
- }
return res;
}
@@ -1630,7 +1590,7 @@ public class PageMemoryImpl implements PageMemoryEx {
// Create a buffer copy if the page is scheduled for a checkpoint.
if (isInCheckpoint(fullId) && PageHeader.tempBufferPointer(absPtr) == INVALID_REL_PTR) {
- long tmpRelPtr = checkpointPool.borrowOrAllocateFreePage(fullId.pageId());
+ long tmpRelPtr = checkpointPool.borrowOrAllocateFreePage(PageIdUtils.tag(fullId.pageId()));
if (tmpRelPtr == INVALID_REL_PTR) {
rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS);
@@ -1669,10 +1629,10 @@ public class PageMemoryImpl implements PageMemoryEx {
/**
* @param page Page pointer.
* @param fullId full page ID.
- * @param walPlc
- * @param walPlc Full page WAL record policy.
- * @param markDirty set dirty flag to page.
- * @param restore
+ * @param walPlc WAL policy
+ * @param walPlc Full page WAL record policy
+ * @param markDirty set dirty flag to page
+ * @param restore restore flag
*/
private void writeUnlockPage(
long page,
@@ -1807,7 +1767,7 @@ public class PageMemoryImpl implements PageMemoryEx {
/** {@inheritDoc} */
@Override public int checkpointBufferPagesCount() {
- return cpBufPagesCntr.get();
+ return checkpointPool.size();
}
/**
@@ -1888,189 +1848,6 @@ public class PageMemoryImpl implements PageMemoryEx {
}
/**
- *
- */
- private class PagePool {
- /** Segment index. */
- protected final int idx;
-
- /** Direct memory region. */
- protected final DirectMemoryRegion region;
-
- /** Pool pages counter. */
- protected final AtomicInteger pagesCntr;
-
- /** */
- protected long lastAllocatedIdxPtr;
-
- /** Pointer to the address of the free page list. */
- protected long freePageListPtr;
-
- /** Pages base. */
- protected long pagesBase;
-
- /**
- * @param idx Index.
- * @param region Region
- * @param pagesCntr Pages counter.
- */
- protected PagePool(int idx, DirectMemoryRegion region, AtomicInteger pagesCntr) {
- this.idx = idx;
- this.region = region;
- this.pagesCntr = pagesCntr;
-
- long base = (region.address() + 7) & ~0x7;
-
- freePageListPtr = base;
-
- base += 8;
-
- lastAllocatedIdxPtr = base;
-
- base += 8;
-
- // Align page start by
- pagesBase = base;
-
- GridUnsafe.putLong(freePageListPtr, INVALID_REL_PTR);
- GridUnsafe.putLong(lastAllocatedIdxPtr, 1L);
- }
-
- /**
- * Allocates a new free page.
- *
- * @param pageId Page ID to to initialize.
- * @return Relative pointer to the allocated page.
- * @throws GridOffHeapOutOfMemoryException If failed to allocate new free page.
- */
- private long borrowOrAllocateFreePage(long pageId) throws GridOffHeapOutOfMemoryException {
- if (pagesCntr != null)
- pagesCntr.getAndIncrement();
-
- long relPtr = borrowFreePage();
-
- return relPtr != INVALID_REL_PTR ? relPtr : allocateFreePage(pageId);
- }
-
- /**
- * @return Relative pointer to a free page that was borrowed from the allocated pool.
- */
- private long borrowFreePage() {
- while (true) {
- long freePageRelPtrMasked = GridUnsafe.getLong(freePageListPtr);
-
- long freePageRelPtr = freePageRelPtrMasked & ADDRESS_MASK;
-
- if (freePageRelPtr != INVALID_REL_PTR) {
- long freePageAbsPtr = absolute(freePageRelPtr);
-
- long nextFreePageRelPtr = GridUnsafe.getLong(freePageAbsPtr) & ADDRESS_MASK;
-
- long cnt = ((freePageRelPtrMasked & COUNTER_MASK) + COUNTER_INC) & COUNTER_MASK;
-
- if (GridUnsafe.compareAndSwapLong(null, freePageListPtr, freePageRelPtrMasked, nextFreePageRelPtr | cnt)) {
- GridUnsafe.putLong(freePageAbsPtr, PAGE_MARKER);
-
- return freePageRelPtr;
- }
- }
- else
- return INVALID_REL_PTR;
- }
- }
-
- /**
- * @param pageId Page ID.
- * @return Relative pointer of the allocated page.
- * @throws GridOffHeapOutOfMemoryException If failed to allocate new free page.
- */
- private long allocateFreePage(long pageId) throws GridOffHeapOutOfMemoryException {
- long limit = region.address() + region.size();
-
- while (true) {
- long lastIdx = GridUnsafe.getLong(lastAllocatedIdxPtr);
-
- // Check if we have enough space to allocate a page.
- if (pagesBase + (lastIdx + 1) * sysPageSize > limit)
- return INVALID_REL_PTR;
-
- if (GridUnsafe.compareAndSwapLong(null, lastAllocatedIdxPtr, lastIdx, lastIdx + 1)) {
- long absPtr = pagesBase + lastIdx * sysPageSize;
-
- assert (lastIdx & SEGMENT_INDEX_MASK) == 0L;
-
- long relative = relative(lastIdx);
-
- assert relative != INVALID_REL_PTR;
-
- PageHeader.initNew(absPtr, relative);
-
- rwLock.init(absPtr + PAGE_LOCK_OFFSET, PageIdUtils.tag(pageId));
-
- return relative;
- }
- }
- }
-
- /**
- * @param relPtr Relative pointer to free.
- * @return Resulting number of pages in pool if pages counter is enabled, 0 otherwise.
- */
- private int releaseFreePage(long relPtr) {
- long absPtr = absolute(relPtr);
-
- assert !PageHeader.isAcquired(absPtr) : "Release pinned page: " + PageHeader.fullPageId(absPtr);
-
- int resCntr = 0;
-
- if (pagesCntr != null)
- resCntr = pagesCntr.decrementAndGet();
-
- while (true) {
- long freePageRelPtrMasked = GridUnsafe.getLong(freePageListPtr);
-
- long freePageRelPtr = freePageRelPtrMasked & RELATIVE_PTR_MASK;
-
- GridUnsafe.putLong(absPtr, freePageRelPtr);
-
- if (GridUnsafe.compareAndSwapLong(null, freePageListPtr, freePageRelPtrMasked, relPtr))
- return resCntr;
- }
- }
-
- /**
- * @param relativePtr Relative pointer.
- * @return Absolute pointer.
- */
- long absolute(long relativePtr) {
- int segIdx = (int)((relativePtr >> 40) & 0xFFFF);
-
- assert segIdx == idx : "expected=" + idx + ", actual=" + segIdx;
-
- long pageIdx = relativePtr & ~SEGMENT_INDEX_MASK;
-
- long off = pageIdx * sysPageSize;
-
- return pagesBase + off;
- }
-
- /**
- * @param pageIdx Page index in the pool.
- * @return Relative pointer.
- */
- private long relative(long pageIdx) {
- return pageIdx | ((long)idx) << 40;
- }
-
- /**
- * @return Max number of pages in the pool.
- */
- private int pages() {
- return (int)((region.size() - (pagesBase - region.address())) / sysPageSize);
- }
- }
-
- /**
* Gets a collection of all pages currently marked as dirty. Will create a collection copy.
*
* @return Collection of all page IDs marked as dirty.
@@ -2160,7 +1937,7 @@ public class PageMemoryImpl implements PageMemoryEx {
DirectMemoryRegion poolRegion = region.slice(memPerTbl + ldPagesMapOffInRegion);
- pool = new PagePool(idx, poolRegion, null);
+ pool = new PagePool(idx, poolRegion, sysPageSize, rwLock);
maxDirtyPages = throttlingPlc != ThrottlingPolicy.DISABLED
? pool.pages() * 3 / 4
@@ -2246,7 +2023,7 @@ public class PageMemoryImpl implements PageMemoryEx {
* @return Page relative pointer.
*/
private long borrowOrAllocateFreePage(long pageId) {
- return pool.borrowOrAllocateFreePage(pageId);
+ return pool.borrowOrAllocateFreePage(PageIdUtils.tag(pageId));
}
/**
@@ -2702,230 +2479,6 @@ public class PageMemoryImpl implements PageMemoryEx {
/**
*
*/
- private static class PageHeader {
- /**
- * @param absPtr Absolute pointer to initialize.
- * @param relative Relative pointer to write.
- */
- private static void initNew(long absPtr, long relative) {
- relative(absPtr, relative);
-
- tempBufferPointer(absPtr, INVALID_REL_PTR);
-
- GridUnsafe.putLong(absPtr, PAGE_MARKER);
- GridUnsafe.putInt(absPtr + PAGE_PIN_CNT_OFFSET, 0);
- }
-
- /**
- * @param absPtr Absolute pointer.
- * @return Dirty flag.
- */
- private static boolean dirty(long absPtr) {
- return flag(absPtr, DIRTY_FLAG);
- }
-
- /**
- * @param absPtr Page absolute pointer.
- * @param dirty Dirty flag.
- * @return Previous value of dirty flag.
- */
- private static boolean dirty(long absPtr, boolean dirty) {
- return flag(absPtr, DIRTY_FLAG, dirty);
- }
-
- /**
- * @param absPtr Absolute pointer.
- * @param flag Flag mask.
- * @return Flag value.
- */
- private static boolean flag(long absPtr, long flag) {
- assert (flag & 0xFFFFFFFFFFFFFFL) == 0;
- assert Long.bitCount(flag) == 1;
-
- long relPtrWithFlags = GridUnsafe.getLong(absPtr + RELATIVE_PTR_OFFSET);
-
- return (relPtrWithFlags & flag) != 0;
- }
-
- /**
- * Sets flag.
- *
- * @param absPtr Absolute pointer.
- * @param flag Flag mask.
- * @param set New flag value.
- * @return Previous flag value.
- */
- private static boolean flag(long absPtr, long flag, boolean set) {
- assert (flag & 0xFFFFFFFFFFFFFFL) == 0;
- assert Long.bitCount(flag) == 1;
-
- long relPtrWithFlags = GridUnsafe.getLong(absPtr + RELATIVE_PTR_OFFSET);
-
- boolean was = (relPtrWithFlags & flag) != 0;
-
- if (set)
- relPtrWithFlags |= flag;
- else
- relPtrWithFlags &= ~flag;
-
- GridUnsafe.putLong(absPtr + RELATIVE_PTR_OFFSET, relPtrWithFlags);
-
- return was;
- }
-
- /**
- * @param absPtr Page pointer.
- * @return If page is pinned.
- */
- private static boolean isAcquired(long absPtr) {
- return GridUnsafe.getInt(absPtr + PAGE_PIN_CNT_OFFSET) > 0;
- }
-
- /**
- * @param absPtr Absolute pointer.
- */
- private static void acquirePage(long absPtr) {
- updateAtomicInt(absPtr + PAGE_PIN_CNT_OFFSET, 1);
- }
-
- /**
- * @param absPtr Absolute pointer.
- */
- private static int releasePage(long absPtr) {
- return updateAtomicInt(absPtr + PAGE_PIN_CNT_OFFSET, -1);
- }
-
- /**
- * Reads relative pointer from the page at the given absolute position.
- *
- * @param absPtr Absolute memory pointer to the page header.
- * @return Relative pointer written to the page.
- */
- private static long readRelative(long absPtr) {
- return GridUnsafe.getLong(absPtr + RELATIVE_PTR_OFFSET) & RELATIVE_PTR_MASK;
- }
-
- /**
- * Writes relative pointer to the page at the given absolute position.
- *
- * @param absPtr Absolute memory pointer to the page header.
- * @param relPtr Relative pointer to write.
- */
- private static void relative(long absPtr, long relPtr) {
- GridUnsafe.putLong(absPtr + RELATIVE_PTR_OFFSET, relPtr & RELATIVE_PTR_MASK);
- }
-
- /**
- * Volatile write for current timestamp to page in {@code absAddr} address.
- *
- * @param absPtr Absolute page address.
- */
- private static void writeTimestamp(final long absPtr, long tstamp) {
- tstamp >>= 8;
-
- GridUnsafe.putLongVolatile(null, absPtr, (tstamp << 8) | 0x01);
- }
-
- /**
- * Read for timestamp from page in {@code absAddr} address.
- *
- * @param absPtr Absolute page address.
- * @return Timestamp.
- */
- private static long readTimestamp(final long absPtr) {
- long markerAndTs = GridUnsafe.getLong(absPtr);
-
- // Clear last byte as it is occupied by page marker.
- return markerAndTs & ~0xFF;
- }
-
- /**
- * Sets pointer to checkpoint buffer.
- *
- * @param absPtr Page absolute pointer.
- * @param tmpRelPtr Temp buffer relative pointer or {@link #INVALID_REL_PTR} if page is not copied to checkpoint
- * buffer.
- */
- private static void tempBufferPointer(long absPtr, long tmpRelPtr) {
- GridUnsafe.putLong(absPtr + PAGE_TMP_BUF_OFFSET, tmpRelPtr);
- }
-
- /**
- * Gets pointer to checkpoint buffer or {@link #INVALID_REL_PTR} if page is not copied to checkpoint buffer.
- *
- * @param absPtr Page absolute pointer.
- * @return Temp buffer relative pointer.
- */
- private static long tempBufferPointer(long absPtr) {
- return GridUnsafe.getLong(absPtr + PAGE_TMP_BUF_OFFSET);
- }
-
- /**
- * Reads page ID from the page at the given absolute position.
- *
- * @param absPtr Absolute memory pointer to the page header.
- * @return Page ID written to the page.
- */
- private static long readPageId(long absPtr) {
- return GridUnsafe.getLong(absPtr + PAGE_ID_OFFSET);
- }
-
- /**
- * Writes page ID to the page at the given absolute position.
- *
- * @param absPtr Absolute memory pointer to the page header.
- * @param pageId Page ID to write.
- */
- private static void pageId(long absPtr, long pageId) {
- GridUnsafe.putLong(absPtr + PAGE_ID_OFFSET, pageId);
- }
-
- /**
- * Reads cache group ID from the page at the given absolute pointer.
- *
- * @param absPtr Absolute memory pointer to the page header.
- * @return Cache group ID written to the page.
- */
- private static int readPageGroupId(final long absPtr) {
- return GridUnsafe.getInt(absPtr + PAGE_CACHE_ID_OFFSET);
- }
-
- /**
- * Writes cache group ID from the page at the given absolute pointer.
- *
- * @param absPtr Absolute memory pointer to the page header.
- * @param grpId Cache group ID to write.
- */
- private static void pageGroupId(final long absPtr, final int grpId) {
- GridUnsafe.putInt(absPtr + PAGE_CACHE_ID_OFFSET, grpId);
- }
-
- /**
- * Reads page ID and cache group ID from the page at the given absolute pointer.
- *
- * @param absPtr Absolute memory pointer to the page header.
- * @return Full page ID written to the page.
- */
- private static FullPageId fullPageId(final long absPtr) {
- return new FullPageId(readPageId(absPtr), readPageGroupId(absPtr));
- }
-
- /**
- * Writes page ID and cache group ID from the page at the given absolute pointer.
- *
- * @param absPtr Absolute memory pointer to the page header.
- * @param fullPageId Full page ID to write.
- */
- private static void fullPageId(final long absPtr, final FullPageId fullPageId) {
- pageId(absPtr, fullPageId.pageId());
-
- pageGroupId(absPtr, fullPageId.groupId());
- }
- }
-
- /**
- *
- */
private static class ClearSegmentRunnable implements Runnable {
/** */
private Segment seg;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePool.java
new file mode 100644
index 0000000..d267b36
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePool.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.OffheapReadWriteLock;
+import org.apache.ignite.internal.util.offheap.GridOffHeapOutOfMemoryException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class PagePool {
+ /** Relative pointer chunk index mask. */
+ private static final long SEGMENT_INDEX_MASK = 0xFFFFFF0000000000L;
+
+ /** Address mask to avoid ABA problem. */
+ private static final long ADDRESS_MASK = 0xFFFFFFFFFFFFFFL;
+
+ /** Counter increment to avoid ABA problem. */
+ private static final long COUNTER_INC = ADDRESS_MASK + 1;
+
+ /** Counter mask to avoid ABA problem. */
+ private static final long COUNTER_MASK = ~ADDRESS_MASK;
+
+ /** Segment index. */
+ protected final int idx;
+
+ /** Direct memory region. */
+ protected final DirectMemoryRegion region;
+
+ /** Pool pages counter. */
+ protected final AtomicInteger pagesCntr = new AtomicInteger();
+
+ /** */
+ protected long lastAllocatedIdxPtr;
+
+ /** Pointer to the address of the free page list. */
+ protected long freePageListPtr;
+
+ /** Pages base. */
+ protected long pagesBase;
+
+ /** System page size. */
+ private final int sysPageSize;
+
+ /** Instance of RW Lock Updater */
+ private OffheapReadWriteLock rwLock;
+
+ /**
+ * @param idx Index.
+ * @param region Region
+ */
+ protected PagePool(
+ int idx,
+ DirectMemoryRegion region,
+ int sysPageSize,
+ OffheapReadWriteLock rwLock
+ ) {
+ this.idx = idx;
+ this.region = region;
+ this.sysPageSize = sysPageSize;
+ this.rwLock = rwLock;
+
+ long base = (region.address() + 7) & ~0x7;
+
+ freePageListPtr = base;
+
+ base += 8;
+
+ lastAllocatedIdxPtr = base;
+
+ base += 8;
+
+ // Align page start by
+ pagesBase = base;
+
+ GridUnsafe.putLong(freePageListPtr, PageMemoryImpl.INVALID_REL_PTR);
+ GridUnsafe.putLong(lastAllocatedIdxPtr, 0L);
+ }
+
+ /**
+ * Allocates a new free page.
+ *
+ * @param tag Tag to initialize page RW lock.
+ * @return Relative pointer to the allocated page.
+ * @throws GridOffHeapOutOfMemoryException If failed to allocate new free page.
+ */
+ public long borrowOrAllocateFreePage(int tag) throws GridOffHeapOutOfMemoryException {
+ long relPtr = borrowFreePage();
+
+ if (relPtr == PageMemoryImpl.INVALID_REL_PTR)
+ relPtr = allocateFreePage(tag);
+
+ if (relPtr != PageMemoryImpl.INVALID_REL_PTR && pagesCntr != null)
+ pagesCntr.incrementAndGet();
+
+ return relPtr;
+ }
+
+ /**
+ * @return Relative pointer to a free page that was borrowed from the allocated pool.
+ */
+ private long borrowFreePage() {
+ while (true) {
+ long freePageRelPtrMasked = GridUnsafe.getLong(null, freePageListPtr);
+
+ long freePageRelPtr = freePageRelPtrMasked & ADDRESS_MASK;
+
+ if (freePageRelPtr != PageMemoryImpl.INVALID_REL_PTR) {
+ long freePageAbsPtr = absolute(freePageRelPtr);
+
+ long nextFreePageRelPtr = GridUnsafe.getLong(null, freePageAbsPtr) & ADDRESS_MASK;
+
+ // nextFreePageRelPtr may be invalid because a concurrent thread may have already polled this value
+ // and used it.
+ long cnt = ((freePageRelPtrMasked & COUNTER_MASK) + COUNTER_INC) & COUNTER_MASK;
+
+ if (GridUnsafe.compareAndSwapLong(null, freePageListPtr, freePageRelPtrMasked, nextFreePageRelPtr | cnt)) {
+ GridUnsafe.putLongVolatile(null, freePageAbsPtr, PageHeader.PAGE_MARKER);
+
+ return freePageRelPtr;
+ }
+ }
+ else
+ return PageMemoryImpl.INVALID_REL_PTR;
+ }
+ }
+
+ /**
+ * @param tag Tag to initialize page RW lock.
+ * @return Relative pointer of the allocated page.
+ * @throws GridOffHeapOutOfMemoryException If failed to allocate new free page.
+ */
+ private long allocateFreePage(int tag) throws GridOffHeapOutOfMemoryException {
+ long limit = region.address() + region.size();
+
+ while (true) {
+ long lastIdx = GridUnsafe.getLong(null, lastAllocatedIdxPtr);
+
+ // Check if we have enough space to allocate a page.
+ if (pagesBase + (lastIdx + 1) * sysPageSize > limit)
+ return PageMemoryImpl.INVALID_REL_PTR;
+
+ if (GridUnsafe.compareAndSwapLong(null, lastAllocatedIdxPtr, lastIdx, lastIdx + 1)) {
+ long absPtr = pagesBase + lastIdx * sysPageSize;
+
+ assert (lastIdx & SEGMENT_INDEX_MASK) == 0L;
+
+ long relative = relative(lastIdx);
+
+ assert relative != PageMemoryImpl.INVALID_REL_PTR;
+
+ PageHeader.initNew(absPtr, relative);
+
+ rwLock.init(absPtr + PageMemoryImpl.PAGE_LOCK_OFFSET, tag);
+
+ return relative;
+ }
+ }
+ }
+
+ /**
+ * @param relPtr Relative pointer to free.
+ * @return Resulting number of pages in pool if pages counter is enabled, 0 otherwise.
+ */
+ public int releaseFreePage(long relPtr) {
+ long absPtr = absolute(relPtr);
+
+ assert !PageHeader.isAcquired(absPtr) : "Release pinned page: " + PageHeader.fullPageId(absPtr);
+
+ int resCntr = 0;
+
+ if (pagesCntr != null)
+ resCntr = pagesCntr.decrementAndGet();
+
+ while (true) {
+ long freePageRelPtrMasked = GridUnsafe.getLong(null, freePageListPtr);
+
+ long freePageRelPtr = freePageRelPtrMasked & PageMemoryImpl.RELATIVE_PTR_MASK;
+
+ GridUnsafe.putLong(null, absPtr, freePageRelPtr);
+
+ if (GridUnsafe.compareAndSwapLong(null, freePageListPtr, freePageRelPtrMasked, relPtr))
+ return resCntr;
+ }
+ }
+
+ /**
+ * @param relativePtr Relative pointer.
+ * @return Absolute pointer.
+ */
+ long absolute(long relativePtr) {
+ int segIdx = (int)((relativePtr >> 40) & 0xFFFF);
+
+ assert segIdx == idx : "expected=" + idx + ", actual=" + segIdx + ", relativePtr=" + U.hexLong(relativePtr);
+
+ long pageIdx = relativePtr & ~SEGMENT_INDEX_MASK;
+
+ long off = pageIdx * sysPageSize;
+
+ return pagesBase + off;
+ }
+
+ /**
+ * @param pageIdx Page index in the pool.
+ * @return Relative pointer.
+ */
+ private long relative(long pageIdx) {
+ return pageIdx | ((long)idx) << 40;
+ }
+
+ /**
+ * @return Max number of pages in the pool.
+ */
+ public int pages() {
+ return (int)((region.size() - (pagesBase - region.address())) / sysPageSize);
+ }
+
+ /**
+ * @return Number of pages in the list.
+ */
+ public int size() {
+ return pagesCntr.get();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
index 573a602..2197c7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
@@ -1362,6 +1362,26 @@ public abstract class GridUnsafe {
}
/**
+ * Atomically increments value stored in an integer pointed by {@code ptr}.
+ *
+ * @param ptr Pointer to an integer.
+ * @return Updated value.
+ */
+ public static int incrementAndGetInt(long ptr) {
+ return UNSAFE.getAndAddInt(null, ptr, 1) + 1;
+ }
+
+ /**
+ * Atomically increments value stored in an integer pointed by {@code ptr}.
+ *
+ * @param ptr Pointer to an integer.
+ * @return Updated value.
+ */
+ public static int decrementAndGetInt(long ptr) {
+ return UNSAFE.getAndAddInt(null, ptr, -1) - 1;
+ }
+
+ /**
* Gets byte value with volatile semantic.
*
* @param obj Object.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index f6d9150..34ef7be 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -8721,6 +8721,21 @@ public abstract class IgniteUtils {
}
/**
+ * Round up the argument to the next highest power of 2;
+ *
+ * @param v Value to round up.
+ * @return Next closest power of 2.
+ */
+ public static int nextPowerOf2(int v) {
+ A.ensure(v >= 0, "v must not be negative");
+
+ if (v == 0)
+ return 1;
+
+ return 1 << (32 - Integer.numberOfLeadingZeros(v - 1));
+ }
+
+ /**
* Gets absolute value for integer. If integer is {@link Integer#MIN_VALUE}, then {@code 0} is returned.
*
* @param i Integer.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePoolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePoolTest.java
new file mode 100644
index 0000000..7d17d2a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePoolTest.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.internal.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
+import org.apache.ignite.internal.util.OffheapReadWriteLock;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class PagePoolTest extends GridCommonAbstractTest {
+ /**
+ * @return Test parameters.
+ */
+ @Parameterized.Parameters(name = "PageSize={0}, segment={1}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(
+ new Object[] {1024 + PageMemoryImpl.PAGE_OVERHEAD, 0},
+ new Object[] {1024 + PageMemoryImpl.PAGE_OVERHEAD, 1},
+ new Object[] {1024 + PageMemoryImpl.PAGE_OVERHEAD, 2},
+ new Object[] {1024 + PageMemoryImpl.PAGE_OVERHEAD, 4},
+ new Object[] {1024 + PageMemoryImpl.PAGE_OVERHEAD, 8},
+ new Object[] {1024 + PageMemoryImpl.PAGE_OVERHEAD, 16},
+ new Object[] {1024 + PageMemoryImpl.PAGE_OVERHEAD, 31},
+
+ new Object[] {2048 + PageMemoryImpl.PAGE_OVERHEAD, 0},
+ new Object[] {2048 + PageMemoryImpl.PAGE_OVERHEAD, 1},
+ new Object[] {2048 + PageMemoryImpl.PAGE_OVERHEAD, 2},
+ new Object[] {2048 + PageMemoryImpl.PAGE_OVERHEAD, 4},
+ new Object[] {2048 + PageMemoryImpl.PAGE_OVERHEAD, 8},
+ new Object[] {2048 + PageMemoryImpl.PAGE_OVERHEAD, 16},
+ new Object[] {2048 + PageMemoryImpl.PAGE_OVERHEAD, 31},
+
+ new Object[] {4096 + PageMemoryImpl.PAGE_OVERHEAD, 0},
+ new Object[] {4096 + PageMemoryImpl.PAGE_OVERHEAD, 1},
+ new Object[] {4096 + PageMemoryImpl.PAGE_OVERHEAD, 2},
+ new Object[] {4096 + PageMemoryImpl.PAGE_OVERHEAD, 4},
+ new Object[] {4096 + PageMemoryImpl.PAGE_OVERHEAD, 8},
+ new Object[] {4096 + PageMemoryImpl.PAGE_OVERHEAD, 16},
+ new Object[] {4096 + PageMemoryImpl.PAGE_OVERHEAD, 31},
+
+ new Object[] {8192 + PageMemoryImpl.PAGE_OVERHEAD, 0},
+ new Object[] {8192 + PageMemoryImpl.PAGE_OVERHEAD, 1},
+ new Object[] {8192 + PageMemoryImpl.PAGE_OVERHEAD, 2},
+ new Object[] {8192 + PageMemoryImpl.PAGE_OVERHEAD, 4},
+ new Object[] {8192 + PageMemoryImpl.PAGE_OVERHEAD, 8},
+ new Object[] {8192 + PageMemoryImpl.PAGE_OVERHEAD, 16},
+ new Object[] {8192 + PageMemoryImpl.PAGE_OVERHEAD, 31},
+
+ new Object[] {16384 + PageMemoryImpl.PAGE_OVERHEAD, 0},
+ new Object[] {16384 + PageMemoryImpl.PAGE_OVERHEAD, 1},
+ new Object[] {16384 + PageMemoryImpl.PAGE_OVERHEAD, 2},
+ new Object[] {16384 + PageMemoryImpl.PAGE_OVERHEAD, 4},
+ new Object[] {16384 + PageMemoryImpl.PAGE_OVERHEAD, 8},
+ new Object[] {16384 + PageMemoryImpl.PAGE_OVERHEAD, 16},
+ new Object[] {16384 + PageMemoryImpl.PAGE_OVERHEAD, 31}
+ );
+ }
+
+ /** */
+ @Parameterized.Parameter
+ public int sysPageSize;
+
+ /** */
+ @Parameterized.Parameter(1)
+ public int segment;
+
+ /** */
+ private static final int PAGES = 100;
+
+ /** */
+ private OffheapReadWriteLock rwLock = new OffheapReadWriteLock(U.nextPowerOf2(Runtime.getRuntime().availableProcessors()));
+
+ /** */
+ private DirectMemoryProvider provider;
+
+ /** */
+ private DirectMemoryRegion region;
+
+ /** */
+ private PagePool pool;
+
+ /**
+ */
+ @Before
+ public void prepare() {
+ provider = new UnsafeMemoryProvider(log);
+ provider.initialize(new long[] {sysPageSize * PAGES + 16});
+
+ region = provider.nextRegion();
+
+ pool = new PagePool(segment, region, sysPageSize, rwLock);
+ }
+
+ /**
+ */
+ @After
+ public void cleanup() {
+ provider.shutdown(true);
+ }
+
+ /**
+ */
+ @Test
+ public void testSingleThreadedBorrowRelease() {
+ assertEquals(PAGES, pool.pages());
+ assertEquals(0, pool.size());
+
+ Set<Long> allocated = new LinkedHashSet<>();
+ LinkedList<Long> allocatedQueue = new LinkedList<>();
+
+ info("Region start: " + U.hexLong(region.address()));
+
+ int tag = 1;
+
+ for (int i = 0; i < PAGES; i++) {
+ long relPtr = pool.borrowOrAllocateFreePage(tag);
+
+ assertTrue("Failed for i=" + i, relPtr != PageMemoryImpl.INVALID_REL_PTR);
+
+ assertTrue(allocated.add(relPtr));
+ allocatedQueue.add(relPtr);
+
+ PageHeader.writeTimestamp(pool.absolute(relPtr), 0xFFFFFFFFFFFFFFFFL);
+
+ assertEquals(i + 1, pool.size());
+ }
+
+ info("Done allocating");
+
+ assertEquals(PageMemoryImpl.INVALID_REL_PTR, pool.borrowOrAllocateFreePage(tag));
+
+ assertEquals(PAGES, pool.size());
+
+ {
+ int i = 0;
+
+ for (Long relPtr : allocated) {
+ pool.releaseFreePage(relPtr);
+
+ i++;
+
+ assertEquals(PAGES - i, pool.size());
+ }
+ }
+
+ info("Done releasing");
+
+ assertEquals(0, pool.size());
+
+ {
+ Iterator<Long> it = allocatedQueue.descendingIterator();
+
+ int i = 0;
+
+ while (it.hasNext()) {
+ long relPtr = it.next();
+
+ long fromPool = pool.borrowOrAllocateFreePage(tag);
+
+ assertEquals(relPtr, fromPool);
+
+ i++;
+
+ assertEquals(i, pool.size());
+
+ PageHeader.writeTimestamp(pool.absolute(relPtr), 0xFFFFFFFFFFFFFFFFL);
+ }
+ }
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testMultithreadedConsistency() throws Exception {
+ assertEquals(PAGES, pool.pages());
+ assertEquals(0, pool.size());
+
+ ConcurrentMap<Long, Long> allocated = new ConcurrentHashMap<>();
+
+ {
+ long relPtr;
+
+ while ((relPtr = pool.borrowOrAllocateFreePage(1)) != PageMemoryImpl.INVALID_REL_PTR) {
+ assertNull(allocated.put(relPtr, relPtr));
+
+ PageHeader.writeTimestamp(pool.absolute(relPtr), 0xFFFFFFFFFFFFFFFFL);
+ }
+ }
+
+ assertEquals(PAGES, pool.size());
+ assertEquals(PAGES, allocated.size());
+
+ GridTestUtils.runMultiThreaded(() -> {
+ while (!allocated.isEmpty()) {
+ Long polled = pollRandom(allocated);
+
+ if (polled != null)
+ pool.releaseFreePage(polled);
+ }
+ }, Runtime.getRuntime().availableProcessors(), "load-runner");
+
+ assertTrue(allocated.isEmpty());
+ assertEquals(0, pool.size());
+
+ GridTestUtils.runMultiThreaded(() -> {
+ long polled;
+
+ while ((polled = pool.borrowOrAllocateFreePage(1)) != PageMemoryImpl.INVALID_REL_PTR) {
+ assertNull(allocated.put(polled, polled));
+
+ PageHeader.writeTimestamp(pool.absolute(polled), 0xFFFFFFFFFFFFFFFFL);
+ }
+ }, Runtime.getRuntime().availableProcessors(), "load-runner");
+
+ assertEquals(PAGES, pool.size());
+ assertEquals(PAGES, allocated.size());
+
+ GridTestUtils.runMultiThreaded(() -> {
+ boolean toPool = true;
+
+ for (int i = 0; i < 10_000; i++) {
+ if (toPool) {
+ if (allocated.size() < PAGES / 3) {
+ toPool = false;
+
+ log.info("Direction switched: " + toPool);
+ }
+ }
+ else {
+ if (allocated.size() > 2 * PAGES / 3) {
+ toPool = true;
+
+ log.info("Direction switched: " + toPool);
+ }
+ }
+
+ boolean inverse = ThreadLocalRandom.current().nextInt(3) == 0;
+
+ if (toPool ^ inverse) {
+ Long polled = pollRandom(allocated);
+
+ if (polled != null)
+ pool.releaseFreePage(polled);
+ }
+ else {
+ long polled = pool.borrowOrAllocateFreePage(1);
+
+ if (polled != PageMemoryImpl.INVALID_REL_PTR) {
+ long abs = pool.absolute(polled);
+
+ PageHeader.writeTimestamp(abs, 0xFFFFFFFFFFFFFFFFL);
+
+ assertNull(allocated.put(polled, polled));
+ }
+ }
+ }
+ }, Runtime.getRuntime().availableProcessors(), "load-runner");
+
+ {
+ long relPtr;
+
+ while ((relPtr = pool.borrowOrAllocateFreePage(1)) != PageMemoryImpl.INVALID_REL_PTR)
+ assertNull(allocated.put(relPtr, relPtr));
+
+ assertEquals(PAGES, allocated.size());
+ assertEquals(PAGES, pool.size());
+ }
+ }
+
+ /**
+ * @param allocated Map of allocated pages.
+ * @return Random page polled from the map.
+ */
+ private Long pollRandom(ConcurrentMap<Long, Long> allocated) {
+ int size = allocated.size();
+
+ if (size == 0)
+ return null;
+
+ int cnt = ThreadLocalRandom.current().nextInt(size);
+
+ Iterator<Long> it = allocated.keySet().iterator();
+
+ for (int i = 0; i < cnt; i++) {
+ if (it.hasNext())
+ it.next();
+ else
+ break;
+ }
+
+ if (it.hasNext()) {
+ Long key = it.next();
+
+ if (allocated.remove(key) != null)
+ return key;
+ }
+
+ return null;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 7784602..7168e0b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -128,6 +128,25 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
}
/**
+ *
+ */
+ @Test
+ public void testNextPowOf2() {
+ assertEquals(1, U.nextPowerOf2(0));
+ assertEquals(1, U.nextPowerOf2(1));
+ assertEquals(2, U.nextPowerOf2(2));
+ assertEquals(4, U.nextPowerOf2(3));
+ assertEquals(4, U.nextPowerOf2(4));
+
+ assertEquals(8, U.nextPowerOf2(5));
+ assertEquals(8, U.nextPowerOf2(6));
+ assertEquals(8, U.nextPowerOf2(7));
+ assertEquals(8, U.nextPowerOf2(8));
+
+ assertEquals(32768, U.nextPowerOf2(32767));
+ }
+
+ /**
* @throws Exception If failed.
*/
@Test
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 658172e..927dd71 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.SetTxTimeoutOnPartitionMapExc
import org.apache.ignite.internal.processors.cache.distributed.IgniteRejectConnectOnNodeStopTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.DropCacheContextDuringEvictionTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictionTaskFailureHandlerTest;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PagePoolTest;
import org.apache.ignite.internal.processors.cache.query.continuous.DiscoveryDataDeserializationFailureHanderTest;
import org.apache.ignite.internal.processors.cache.transactions.AtomicOperationsInTxTest;
import org.apache.ignite.internal.processors.cache.transactions.TransactionIntegrityWithSystemWorkerDeathTest;
@@ -206,6 +207,7 @@ import org.junit.runners.Suite;
GridPeerDeploymentRetryModifiedTest.class,
// Basic DB data structures.
+ PagePoolTest.class,
BPlusTreeSelfTest.class,
BPlusTreeFakeReuseSelfTest.class,
BPlusTreeReuseSelfTest.class,