You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/10 07:23:35 UTC
[14/43] ignite git commit: IGNITE-4712 Memory leaks in PageMemory
IGNITE-4712 Memory leaks in PageMemory
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fd19998e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fd19998e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fd19998e
Branch: refs/heads/ignite-4712
Commit: fd19998e042a43ef1fadd526d7a9693abe1c78d1
Parents: d376ea9
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Wed Feb 22 10:59:06 2017 +0300
Committer: Igor Seliverstov <gv...@gmail.com>
Committed: Wed Feb 22 10:59:06 2017 +0300
----------------------------------------------------------------------
.../cache/database/freelist/PagesList.java | 178 +++++++++----------
1 file changed, 85 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd19998e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java
index 7c88041..a5fb617 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java
@@ -201,26 +201,43 @@ public abstract class PagesList extends DataStructure {
for (int i = 0; i < upd.length; i++) {
long tailId = upd[i];
- // TODO: need get full bucket size.
- try (Page tail = page(tailId)) {
- long tailAddr = readLock(tail);
+ List<Page> locked = new ArrayList<>(2);
+ List<Long> lockedAddrs = new ArrayList<>(2);
- assert tailAddr != 0L;
+ try {
+ long pageId = tailId;
+ int count = 0;
+
+ while (pageId != 0L) {
+ try (Page page = page(pageId)) {
+ long pageAddr = readLock(page);
- try {
- PagesListNodeIO io = PagesListNodeIO.VERSIONS.forPage(tailAddr);
+ assert pageAddr != 0L;
- int count = io.getCount(tailAddr);
+ locked.add(page);
+ lockedAddrs.add(pageAddr);
- Stripe stripe = new Stripe(tailId);
- stripe.empty = count == 0;
+ PagesListNodeIO io = PagesListNodeIO.VERSIONS.forPage(pageAddr);
- tails[i] = stripe;
+ count += io.getCount(pageAddr);
+ pageId = io.getPreviousId(pageAddr);
- bucketSize += count;
+ // In reuse bucket the page itself can be used as a free page
+ if(isReuseBucket(bucket) && pageId != 0L)
+ count++;
+ }
}
- finally {
- readUnlock(tail, tailAddr);
+
+ Stripe stripe = new Stripe(tailId);
+ stripe.empty = count == 0;
+
+ tails[i] = stripe;
+
+ bucketSize += count;
+ }
+ finally {
+ for (int j = 0; j < locked.size(); j++) {
+ readUnlock(locked.get(j), lockedAddrs.get(j));
}
}
}
@@ -389,6 +406,7 @@ public abstract class PagesList extends DataStructure {
}
Stripe stripe = new Stripe(pageId);
+ stripe.empty = true;
for (;;) {
Stripe[] old = getBucket(bucket);
@@ -449,7 +467,6 @@ public abstract class PagesList extends DataStructure {
else {
// It is safe to assign new tail since we do it only when write lock on tail is held.
tails[idx].tailId = newTailId;
- tails[idx].empty = false;
return true;
}
@@ -501,41 +518,11 @@ public abstract class PagesList extends DataStructure {
}
/**
- * !!! For tests only, does not provide any correctness guarantees for concurrent access.
- *
* @param bucket Bucket index.
* @return Number of pages stored in this list.
- * @throws IgniteCheckedException If failed.
*/
- protected final long storedPagesCount(int bucket) throws IgniteCheckedException {
- long res = 0;
-
- Stripe[] tails = getBucket(bucket);
-
- if (tails != null) {
- for (Stripe tail : tails) {
- long pageId = tail.tailId;
-
- try (Page page = page(pageId)) {
- long pageAddr = readLock(page); // No correctness guaranties.
-
- try {
- PagesListNodeIO io = PagesListNodeIO.VERSIONS.forPage(pageAddr);
-
- int cnt = io.getCount(pageAddr);
-
- assert cnt >= 0;
-
- res += cnt;
- }
- finally {
- readUnlock(page, pageAddr);
- }
- }
- }
- }
-
- return res;
+ protected final long storedPagesCount(int bucket) {
+ return bucketsSize[bucket].get();
}
/**
@@ -685,6 +672,8 @@ public abstract class PagesList extends DataStructure {
dataPageId,
pageId, 0L));
+ bucketsSize[bucket].incrementAndGet();
+
updateTail(bucket, pageId, dataPageId);
}
else {
@@ -720,13 +709,13 @@ public abstract class PagesList extends DataStructure {
assert idx != -1;
- bucketsSize[bucket].incrementAndGet();
-
dataIO.setFreeListPageId(dataPageAddr, nextId);
if (isWalDeltaRecordNeeded(wal, dataPage))
wal.log(new DataPageSetFreeListPageRecord(cacheId, dataPageId, nextId));
+ bucketsSize[bucket].incrementAndGet();
+
updateTail(bucket, pageId, nextId);
}
finally {
@@ -802,6 +791,9 @@ public abstract class PagesList extends DataStructure {
0L
));
+ if(isReuseBucket(bucket))
+ bucketsSize[bucket].incrementAndGet();
+
// Switch to this new page, which is now a part of our list
// to add the rest of the bag to the new page.
prevPageAddr = nextPageAddr;
@@ -810,11 +802,11 @@ public abstract class PagesList extends DataStructure {
}
}
else {
- bucketsSize[bucket].incrementAndGet();
-
// TODO: use single WAL record for bag?
if (isWalDeltaRecordNeeded(wal, page))
wal.log(new PagesListAddPageRecord(cacheId, prevId, nextId));
+
+ bucketsSize[bucket].incrementAndGet();
}
}
}
@@ -923,13 +915,12 @@ public abstract class PagesList extends DataStructure {
continue;
}
- // TODO: condition !isReuseBucket(bucket) is not correct.
- if (!isReuseBucket(bucket) && stripe.empty) {
+ if (stripe.empty) {
// Another thread took the last page.
writeUnlock(tail, tailPageAddr, false);
if (bucketsSize[bucket].get() > 0) {
- lockAttempt = 0;
+ lockAttempt--; // Ignore current attempt.
continue;
}
@@ -941,13 +932,14 @@ public abstract class PagesList extends DataStructure {
assert PageIO.getType(tailPageAddr) == PageIO.T_PAGE_LIST_NODE;
boolean dirty = false;
- long ret = 0L;
+ long ret;
long recycleId = 0L;
try {
PagesListNodeIO io = PagesListNodeIO.VERSIONS.forPage(tailPageAddr);
if (io.getNextId(tailPageAddr) != 0)
+ // It is not a tail anymore, retry.
continue;
long pageId = io.takeAnyPage(tailPageAddr);
@@ -962,64 +954,64 @@ public abstract class PagesList extends DataStructure {
ret = pageId;
- boolean empty = io.isEmpty(tailPageAddr);
-
- // TODO: add comment, it seems flag is not set to correct value for reuse bucket.
- stripe.empty = empty;
-
- // If we got an empty page in non-reuse bucket, move it back to reuse list
- // to prevent empty page leak to data pages.
- if (empty && !isReuseBucket(bucket)) {
+ if (io.isEmpty(tailPageAddr)) {
long prevId = io.getPreviousId(tailPageAddr);
- if (prevId != 0L) {
- try (Page prev = page(prevId)) {
- // Lock pages from next to previous.
- Boolean ok = writePage(pageMem, prev, this, cutTail, null, bucket, FALSE);
+ // If we got an empty page in non-reuse bucket, move it back to reuse list
+ // to prevent empty page leak to data pages.
+ if (!isReuseBucket(bucket)) {
+ if (prevId != 0L) {
+ try (Page prev = page(prevId)) {
+ // Lock pages from next to previous.
+ Boolean ok = writePage(pageMem, prev, this, cutTail, null, bucket, FALSE);
- assert ok == TRUE : ok;
- }
+ assert ok == TRUE : ok;
+ }
- recycleId = recyclePage(tailId, tail, tailPageAddr);
+ recycleId = recyclePage(tailId, tail, tailPageAddr);
+ }
+ else
+ stripe.empty = true;
}
+ else
+ stripe.empty = prevId == 0L;
}
}
else {
- // The tail page is empty. We can unlink and return it if we have a previous page.
+ // The tail page is empty, but stripe is not. It might
+ // happen only if we are in reuse bucket and it has
+ // a previous page, so, the current page can be collected
+ assert isReuseBucket(bucket);
+
long prevId = io.getPreviousId(tailPageAddr);
- if (prevId != 0L) {
- // This can only happen if we are in the reuse bucket.
- assert isReuseBucket(bucket);
+ assert prevId != 0L;
- try (Page prev = page(prevId)) {
- // Lock pages from next to previous.
- Boolean ok = writePage(pageMem, prev, this, cutTail, null, bucket, FALSE);
+ try (Page prev = page(prevId)) {
+ // Lock pages from next to previous.
+ Boolean ok = writePage(pageMem, prev, this, cutTail, null, bucket, FALSE);
- assert ok == TRUE : ok;
- }
+ assert ok == TRUE : ok;
+ }
- if (initIoVers != null) {
- tailId = PageIdUtils.changeType(tailId, FLAG_DATA);
+ if (initIoVers != null) {
+ tailId = PageIdUtils.changeType(tailId, FLAG_DATA);
- PageIO initIo = initIoVers.latest();
+ PageIO initIo = initIoVers.latest();
- initIo.initNewPage(tailPageAddr, tailId, pageSize());
+ initIo.initNewPage(tailPageAddr, tailId, pageSize());
- if (isWalDeltaRecordNeeded(wal, tail)) {
- wal.log(new InitNewPageRecord(cacheId, tail.id(), initIo.getType(),
- initIo.getVersion(), tailId));
- }
+ if (isWalDeltaRecordNeeded(wal, tail)) {
+ wal.log(new InitNewPageRecord(cacheId, tail.id(), initIo.getType(),
+ initIo.getVersion(), tailId));
}
- else
- tailId = recyclePage(tailId, tail, tailPageAddr);
-
- dirty = true;
-
- ret = tailId;
}
else
- stripe.empty = true;
+ tailId = recyclePage(tailId, tail, tailPageAddr);
+
+ dirty = true;
+
+ ret = tailId;
}
// If we do not have a previous page (we are at head), then we still can return