You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/14 11:00:46 UTC
ignite git commit: ignite-3477 Fixed leak on stripe add.
Repository: ignite
Updated Branches:
refs/heads/ignite-3477-fl [created] c304cb10e
ignite-3477 Fixed leak on stripe add.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c304cb10
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c304cb10
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c304cb10
Branch: refs/heads/ignite-3477-fl
Commit: c304cb10eb1b7eeac055b955205ed9998db33be5
Parents: 0314dec
Author: sboikov <sb...@gridgain.com>
Authored: Tue Feb 14 13:38:56 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Feb 14 13:58:43 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 2 +
.../IgniteCacheDatabaseSharedManager.java | 9 +
.../cache/database/freelist/FreeList.java | 6 +
.../cache/database/freelist/FreeListImpl.java | 28 +++
.../cache/database/freelist/PagesList.java | 227 ++++++++++++-------
.../cluster/IgniteChangeGlobalStateSupport.java | 2 +
6 files changed, 188 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c304cb10/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index a6a159c..c71ab2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1157,6 +1157,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
" ^-- Outbound messages queue [size=" + m.getOutboundMessagesQueueSize() + "]";
log.info(msg);
+
+ ctx.cache().context().database().dumpStatistics(log);
}
catch (IgniteClientDisconnectedException ignore) {
// No-op.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c304cb10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
index 9c10057..6370798 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.internal.GridKernalContext;
@@ -76,6 +77,14 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
}
/**
+ * @param log Logger.
+ */
+ public void dumpStatistics(IgniteLogger log) {
+ if (freeList != null)
+ freeList.dumpStatistics(log);
+ }
+
+ /**
* @throws IgniteCheckedException If failed.
*/
protected void initDataStructures() throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c304cb10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java
index d72c5b9..3266b95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.database.freelist;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
/**
@@ -42,4 +43,9 @@ public interface FreeList {
* @throws IgniteCheckedException If failed.
*/
public void removeDataRowByLink(long link) throws IgniteCheckedException;
+
+ /**
+ * @param log Logger.
+ */
+ public void dumpStatistics(IgniteLogger log);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c304cb10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
index 94fcc17..b44033f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.database.freelist;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.pagemem.Page;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
@@ -314,6 +315,33 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
init(metaPageId, initNew);
}
+ /** {@inheritDoc} */
+ @Override public void dumpStatistics(IgniteLogger log) {
+ long dataPages = 0;
+
+ final boolean dumpBucketsInfo = false;
+
+ for (int b = 0; b < BUCKETS; b++) {
+ long size = bucketsSize[b].longValue();
+
+ if (!isReuseBucket(b))
+ dataPages += size;
+
+ if (dumpBucketsInfo) {
+ Stripe[] stripes = getBucket(b);
+
+ log.info("Bucket [b=" + b + ", stripes=" + (stripes != null ? stripes.length : 0) + ']');
+ }
+ }
+
+ if (dataPages > 0) {
+ log.info("FreeList [name=" + name +
+ ", buckets=" + BUCKETS +
+ ", dataPages=" + dataPages +
+ ", reusePages=" + bucketsSize[REUSE_BUCKET].longValue() + "]");
+ }
+ }
+
/**
* @param freeSpace Page free space.
* @param allowReuse {@code True} if it is allowed to get reuse bucket.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c304cb10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java
index 48e0eb3..c2b272b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
+import org.jsr166.LongAdder8;
import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
@@ -74,6 +75,12 @@ public abstract class PagesList extends DataStructure {
IgniteSystemProperties.getInteger("IGNITE_PAGES_LIST_STRIPES_PER_BUCKET",
Math.min(8, Runtime.getRuntime().availableProcessors() * 2));
+ /** */
+ private final boolean trackBucketsSize = IgniteSystemProperties.getBoolean("IGNITE_PAGES_LIST_TRACK_SIZE", false);
+
+ /** */
+ protected final LongAdder8[] bucketsSize;
+
/** Page ID to store list metadata. */
private final long metaPageId;
@@ -134,6 +141,11 @@ public abstract class PagesList extends DataStructure {
this.name = name;
this.buckets = buckets;
this.metaPageId = metaPageId;
+
+ bucketsSize = new LongAdder8[buckets];
+
+ for (int i = 0; i < buckets; i++)
+ bucketsSize[i] = new LongAdder8();
}
/**
@@ -340,6 +352,7 @@ public abstract class PagesList extends DataStructure {
* Adds stripe to the given bucket.
*
* @param bucket Bucket.
+ * @param reuse {@code True} if possible to use reuse list.
* @throws IgniteCheckedException If failed.
* @return Tail page ID.
*/
@@ -402,7 +415,7 @@ public abstract class PagesList extends DataStructure {
return;
}
else {
- // It is safe to assign new tail since we do it only when write lock lock on tail is held.
+ // It is safe to assign new tail since we do it only when write lock on tail is held.
tails[idx].tailId = newTailId;
return;
@@ -494,13 +507,14 @@ public abstract class PagesList extends DataStructure {
/**
* @param bag Reuse bag.
- * @param dataPageBuf Data page buffer.
+ * @param dataPage Data page.
+ * @param dataPageAddr Data page address.
* @param bucket Bucket.
* @throws IgniteCheckedException If failed.
*/
- protected final void put(ReuseBag bag, Page dataPage, long dataPageBuf, int bucket)
+ protected final void put(ReuseBag bag, Page dataPage, long dataPageAddr, int bucket)
throws IgniteCheckedException {
- assert bag == null ^ dataPageBuf == 0L;
+ assert bag == null ^ dataPageAddr == 0L;
for (int lockAttempt = 0; ;) {
Stripe stripe = getPageForPut(bucket);
@@ -508,35 +522,39 @@ public abstract class PagesList extends DataStructure {
long tailId = stripe.tailId;
try (Page tail = page(tailId)) {
- long buf = writeLockPage(tail, bucket, lockAttempt++); // Explicit check.
+ long pageAddr = writeLockPage(tail, bucket, lockAttempt++); // Explicit check.
+
+ if (pageAddr == 0L) {
+ if (isReuseBucket(bucket) && lockAttempt == TRY_LOCK_ATTEMPTS)
+ addStripeForReuseBucket(bucket);
- if (buf == 0L)
continue;
+ }
- assert PageIO.getPageId(buf) == tailId : "bufPageId = " + PageIO.getPageId(buf) + ", tailId = " + tailId;
- assert PageIO.getType(buf) == PageIO.T_PAGE_LIST_NODE;
+ assert PageIO.getPageId(pageAddr) == tailId : "pageId = " + PageIO.getPageId(pageAddr) + ", tailId = " + tailId;
+ assert PageIO.getType(pageAddr) == PageIO.T_PAGE_LIST_NODE;
boolean ok = false;
try {
- PagesListNodeIO io = PageIO.getPageIO(buf);
+ PagesListNodeIO io = PageIO.getPageIO(pageAddr);
ok = bag != null ?
// Here we can always take pages from the bag to build our list.
- putReuseBag(tailId, tail, buf, io, bag, bucket) :
+ putReuseBag(tailId, tail, pageAddr, io, bag, bucket) :
// Here we can use the data page to build list only if it is empty and
// it is being put into reuse bucket. Usually this will be true, but there is
// a case when there is no reuse bucket in the free list, but then deadlock
// on node page allocation from separate reuse list is impossible.
// If the data page is not empty it can not be put into reuse bucket and thus
// the deadlock is impossible as well.
- putDataPage(tailId, tail, buf, io, dataPage, dataPageBuf, bucket);
+ putDataPage(tailId, tail, pageAddr, io, dataPage, dataPageAddr, bucket);
if (ok)
return;
}
finally {
- writeUnlock(tail, buf, ok);
+ writeUnlock(tail, pageAddr, ok);
}
}
}
@@ -545,10 +563,10 @@ public abstract class PagesList extends DataStructure {
/**
* @param pageId Page ID.
* @param page Page.
- * @param buf Byte buffer.
+ * @param pageAddr Page address.
* @param io IO.
* @param dataPage Data page.
- * @param dataPageBuf Data page buffer.
+ * @param dataPageAddr Data page address.
* @param bucket Bucket.
* @return {@code true} If succeeded.
* @throws IgniteCheckedException If failed.
@@ -556,27 +574,30 @@ public abstract class PagesList extends DataStructure {
private boolean putDataPage(
long pageId,
Page page,
- long buf,
+ long pageAddr,
PagesListNodeIO io,
Page dataPage,
- long dataPageBuf,
+ long dataPageAddr,
int bucket
) throws IgniteCheckedException {
- if (io.getNextId(buf) != 0L)
+ if (io.getNextId(pageAddr) != 0L)
return false; // Splitted.
long dataPageId = dataPage.id();
- int idx = io.addPage(buf, dataPageId, pageSize());
+ int idx = io.addPage(pageAddr, dataPageId, pageSize());
if (idx == -1)
- handlePageFull(pageId, page, buf, io, dataPage, dataPageBuf, bucket);
+ handlePageFull(pageId, page, pageAddr, io, dataPage, dataPageAddr, bucket);
else {
+ if (trackBucketsSize)
+ bucketsSize[bucket].increment();
+
if (isWalDeltaRecordNeeded(wal, page))
wal.log(new PagesListAddPageRecord(cacheId, pageId, dataPageId));
- DataPageIO dataIO = DataPageIO.VERSIONS.forPage(dataPageBuf);
- dataIO.setFreeListPageId(dataPageBuf, pageId);
+ DataPageIO dataIO = DataPageIO.VERSIONS.forPage(dataPageAddr);
+ dataIO.setFreeListPageId(dataPageAddr, pageId);
if (isWalDeltaRecordNeeded(wal, dataPage))
wal.log(new DataPageSetFreeListPageRecord(cacheId, dataPage.id(), pageId));
@@ -588,34 +609,34 @@ public abstract class PagesList extends DataStructure {
/**
* @param pageId Page ID.
* @param page Page.
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param io IO.
* @param dataPage Data page.
- * @param dataPageBuf Data page buffer.
+ * @param dataPageAddr Data page address.
* @param bucket Bucket index.
* @throws IgniteCheckedException If failed.
*/
private void handlePageFull(
long pageId,
Page page,
- long buf,
+ long pageAddr,
PagesListNodeIO io,
Page dataPage,
- long dataPageBuf,
+ long dataPageAddr,
int bucket
) throws IgniteCheckedException {
long dataPageId = dataPage.id();
- DataPageIO dataIO = DataPageIO.VERSIONS.forPage(dataPageBuf);
+ DataPageIO dataIO = DataPageIO.VERSIONS.forPage(dataPageAddr);
// Attempt to add page failed: the node page is full.
if (isReuseBucket(bucket)) {
// If we are on the reuse bucket, we can not allocate new page, because it may cause deadlock.
- assert dataIO.isEmpty(dataPageBuf); // We can put only empty data pages to reuse bucket.
+ assert dataIO.isEmpty(dataPageAddr); // We can put only empty data pages to reuse bucket.
// Change page type to index and add it as next node page to this list.
dataPageId = PageIdUtils.changeType(dataPageId, FLAG_IDX);
- setupNextPage(io, pageId, buf, dataPageId, dataPageBuf);
+ setupNextPage(io, pageId, pageAddr, dataPageId, dataPageAddr);
if (isWalDeltaRecordNeeded(wal, page))
wal.log(new PagesListSetNextRecord(cacheId, pageId, dataPageId));
@@ -641,7 +662,7 @@ public abstract class PagesList extends DataStructure {
assert nextPageAddr != 0L;
try {
- setupNextPage(io, pageId, buf, nextId, nextPageAddr);
+ setupNextPage(io, pageId, pageAddr, nextId, nextPageAddr);
if (isWalDeltaRecordNeeded(wal, page))
wal.log(new PagesListSetNextRecord(cacheId, pageId, nextId));
@@ -664,7 +685,10 @@ public abstract class PagesList extends DataStructure {
assert idx != -1;
- dataIO.setFreeListPageId(dataPageBuf, nextId);
+ if (trackBucketsSize)
+ bucketsSize[bucket].increment();
+
+ dataIO.setFreeListPageId(dataPageAddr, nextId);
if (isWalDeltaRecordNeeded(wal, dataPage))
wal.log(new DataPageSetFreeListPageRecord(cacheId, dataPageId, nextId));
@@ -681,7 +705,7 @@ public abstract class PagesList extends DataStructure {
/**
* @param pageId Page ID.
* @param page Page.
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param io IO.
* @param bag Reuse bag.
* @param bucket Bucket.
@@ -692,24 +716,24 @@ public abstract class PagesList extends DataStructure {
private boolean putReuseBag(
final long pageId,
Page page,
- final long buf,
+ final long pageAddr,
PagesListNodeIO io,
ReuseBag bag,
int bucket
) throws IgniteCheckedException {
- if (io.getNextId(buf) != 0L)
+ if (io.getNextId(pageAddr) != 0L)
return false; // Splitted.
long nextId;
- long prevBuf = buf;
+ long prevPageAddr = pageAddr;
long prevId = pageId;
List<Page> locked = null; // TODO may be unlock right away and do not keep all these pages locked?
- List<Long> lockedBufs = null;
+ List<Long> lockedAddrs = null;
try {
while ((nextId = bag.pollFreePage()) != 0L) {
- int idx = io.addPage(prevBuf, nextId, pageSize());
+ int idx = io.addPage(prevPageAddr, nextId, pageSize());
if (idx == -1) { // Attempt to add page failed: the node page is full.
try (Page next = page(nextId)) {
@@ -718,14 +742,14 @@ public abstract class PagesList extends DataStructure {
assert nextPageAddr != 0L;
if (locked == null) {
- lockedBufs = new ArrayList<>(2);
+ lockedAddrs = new ArrayList<>(2);
locked = new ArrayList<>(2);
}
locked.add(next);
- lockedBufs.add(nextPageAddr);
+ lockedAddrs.add(nextPageAddr);
- setupNextPage(io, prevId, prevBuf, nextId, nextPageAddr);
+ setupNextPage(io, prevId, prevPageAddr, nextId, nextPageAddr);
if (isWalDeltaRecordNeeded(wal, page))
wal.log(new PagesListSetNextRecord(cacheId, prevId, nextId));
@@ -746,12 +770,15 @@ public abstract class PagesList extends DataStructure {
// Switch to this new page, which is now a part of our list
// to add the rest of the bag to the new page.
- prevBuf = nextPageAddr;
+ prevPageAddr = nextPageAddr;
prevId = nextId;
page = next;
}
}
else {
+ if (trackBucketsSize)
+ bucketsSize[bucket].increment();
+
// TODO: use single WAL record for bag?
if (isWalDeltaRecordNeeded(wal, page))
wal.log(new PagesListAddPageRecord(cacheId, prevId, nextId));
@@ -765,7 +792,7 @@ public abstract class PagesList extends DataStructure {
// Release write.
for (int i = 0; i < locked.size(); i++)
- writeUnlock(locked.get(i), lockedBufs.get(i), true);
+ writeUnlock(locked.get(i), lockedAddrs.get(i), true);
}
}
@@ -803,7 +830,8 @@ public abstract class PagesList extends DataStructure {
Stripe[] stripes = getBucket(bucket);
if (stripes == null || stripes.length < MAX_STRIPES_PER_BUCKET) {
- addStripe(bucket, false);
+ if (!isReuseBucket(bucket))
+ addStripe(bucket, true);
return 0L;
}
@@ -813,6 +841,19 @@ public abstract class PagesList extends DataStructure {
}
/**
+ * @param bucket Bucket.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void addStripeForReuseBucket(int bucket) throws IgniteCheckedException {
+ assert isReuseBucket(bucket);
+
+ Stripe[] stripes = getBucket(bucket);
+
+ if (stripes == null || stripes.length < MAX_STRIPES_PER_BUCKET)
+ addStripe(bucket, false);
+ }
+
+ /**
* @param bucket Bucket index.
* @param initIoVers Optional IO to initialize page.
* @return Removed page ID.
@@ -828,27 +869,34 @@ public abstract class PagesList extends DataStructure {
long tailId = stripe.tailId;
try (Page tail = page(tailId)) {
- long tailBuf = writeLockPage(tail, bucket, lockAttempt++); // Explicit check.
+ long tailPageAddr = writeLockPage(tail, bucket, lockAttempt++); // Explicit check.
+
+ if (tailPageAddr == 0L) {
+ if (isReuseBucket(bucket) && lockAttempt == TRY_LOCK_ATTEMPTS)
+ addStripeForReuseBucket(bucket);
- if (tailBuf == 0L)
continue;
+ }
- assert PageIO.getPageId(tailBuf) == tailId : "tailId = " + tailId + ", tailBufId = " + PageIO.getPageId(tailBuf);
- assert PageIO.getType(tailBuf) == PageIO.T_PAGE_LIST_NODE;
+ assert PageIO.getPageId(tailPageAddr) == tailId : "tailId = " + tailId + ", tailPageId = " + PageIO.getPageId(tailPageAddr);
+ assert PageIO.getType(tailPageAddr) == PageIO.T_PAGE_LIST_NODE;
boolean dirty = false;
long ret = 0L;
long recycleId = 0L;
try {
- PagesListNodeIO io = PagesListNodeIO.VERSIONS.forPage(tailBuf);
+ PagesListNodeIO io = PagesListNodeIO.VERSIONS.forPage(tailPageAddr);
- if (io.getNextId(tailBuf) != 0)
+ if (io.getNextId(tailPageAddr) != 0)
continue;
- long pageId = io.takeAnyPage(tailBuf);
+ long pageId = io.takeAnyPage(tailPageAddr);
if (pageId != 0L) {
+ if (trackBucketsSize)
+ bucketsSize[bucket].decrement();
+
if (isWalDeltaRecordNeeded(wal, tail))
wal.log(new PagesListRemovePageRecord(cacheId, tailId, pageId));
@@ -858,8 +906,8 @@ public abstract class PagesList extends DataStructure {
// If we got an empty page in non-reuse bucket, move it back to reuse list
// to prevent empty page leak to data pages.
- if (io.isEmpty(tailBuf) && !isReuseBucket(bucket)) {
- long prevId = io.getPreviousId(tailBuf);
+ if (io.isEmpty(tailPageAddr) && !isReuseBucket(bucket)) {
+ long prevId = io.getPreviousId(tailPageAddr);
if (prevId != 0L) {
try (Page prev = page(prevId)) {
@@ -869,13 +917,13 @@ public abstract class PagesList extends DataStructure {
assert ok == TRUE : ok;
}
- recycleId = recyclePage(tailId, tail, tailBuf);
+ recycleId = recyclePage(tailId, tail, tailPageAddr);
}
}
}
else {
// The tail page is empty. We can unlink and return it if we have a previous page.
- long prevId = io.getPreviousId(tailBuf);
+ long prevId = io.getPreviousId(tailPageAddr);
if (prevId != 0L) {
// This can only happen if we are in the reuse bucket.
@@ -893,7 +941,7 @@ public abstract class PagesList extends DataStructure {
PageIO initIo = initIoVers.latest();
- initIo.initNewPage(tailBuf, tailId, pageSize());
+ initIo.initNewPage(tailPageAddr, tailId, pageSize());
if (isWalDeltaRecordNeeded(wal, tail)) {
wal.log(new InitNewPageRecord(cacheId, tail.id(), initIo.getType(),
@@ -901,7 +949,7 @@ public abstract class PagesList extends DataStructure {
}
}
else
- tailId = recyclePage(tailId, tail, tailBuf);
+ tailId = recyclePage(tailId, tail, tailPageAddr);
dirty = true;
@@ -915,7 +963,7 @@ public abstract class PagesList extends DataStructure {
// meta page.
}
finally {
- writeUnlock(tail, tailBuf, dirty);
+ writeUnlock(tail, tailPageAddr, dirty);
}
// Put recycled page (if any) to the reuse bucket after tail is unlocked.
@@ -932,22 +980,21 @@ public abstract class PagesList extends DataStructure {
/**
* @param dataPage Data page.
- * @param dataPageBuf Data page buffer.
+ * @param dataPageAddr Data page address.
* @param dataIO Data page IO.
* @param bucket Bucket index.
* @throws IgniteCheckedException If failed.
* @return {@code True} if page was removed.
*/
- protected final boolean removeDataPage(Page dataPage, long dataPageBuf, DataPageIO dataIO, int bucket)
+ protected final boolean removeDataPage(Page dataPage, long dataPageAddr, DataPageIO dataIO, int bucket)
throws IgniteCheckedException {
long dataPageId = dataPage.id();
- long pageId = dataIO.getFreeListPageId(dataPageBuf);
+ long pageId = dataIO.getFreeListPageId(dataPageAddr);
assert pageId != 0;
try (Page page = page(pageId)) {
- long prevId;
long nextId;
long recycleId = 0L;
@@ -967,11 +1014,14 @@ public abstract class PagesList extends DataStructure {
if (!rmvd)
return false;
+ if (trackBucketsSize)
+ bucketsSize[bucket].decrement();
+
if (isWalDeltaRecordNeeded(wal, page))
wal.log(new PagesListRemovePageRecord(cacheId, pageId, dataPageId));
// Reset free list page ID.
- dataIO.setFreeListPageId(dataPageBuf, 0L);
+ dataIO.setFreeListPageId(dataPageAddr, 0L);
if (isWalDeltaRecordNeeded(wal, dataPage))
wal.log(new DataPageSetFreeListPageRecord(cacheId, dataPageId, 0L));
@@ -981,12 +1031,14 @@ public abstract class PagesList extends DataStructure {
// If the page is empty, we have to try to drop it and link next and previous with each other.
nextId = io.getNextId(pageAddr);
- prevId = io.getPreviousId(pageAddr);
// If there are no next page, then we can try to merge without releasing current write lock,
// because if we will need to lock previous page, the locking order will be already correct.
- if (nextId == 0L)
+ if (nextId == 0L) {
+ long prevId = io.getPreviousId(pageAddr);
+
recycleId = mergeNoNext(pageId, page, pageAddr, prevId, bucket);
+ }
}
finally {
writeUnlock(page, pageAddr, rmvd);
@@ -1006,13 +1058,13 @@ public abstract class PagesList extends DataStructure {
/**
* @param page Page.
* @param pageId Page ID.
- * @param buf Page byte buffer.
+ * @param pageAddr Page address.
* @param prevId Previous page ID.
* @param bucket Bucket index.
* @return Page ID to recycle.
* @throws IgniteCheckedException If failed.
*/
- private long mergeNoNext(long pageId, Page page, long buf, long prevId, int bucket)
+ private long mergeNoNext(long pageId, Page page, long pageAddr, long prevId, int bucket)
throws IgniteCheckedException {
// If we do not have a next page (we are tail) and we are on reuse bucket,
// then we can leave as is as well, because it is normal to have an empty tail page here.
@@ -1029,7 +1081,7 @@ public abstract class PagesList extends DataStructure {
else // If we don't have a previous, then we are tail page of free list, just drop the stripe.
updateTail(bucket, pageId, 0L);
- return recyclePage(pageId, page, buf);
+ return recyclePage(pageId, page, pageAddr);
}
/**
@@ -1091,10 +1143,10 @@ public abstract class PagesList extends DataStructure {
* @param page Page.
* @param pageId Page ID.
* @param io IO.
- * @param buf Byte buffer.
+ * @param pageAddr Page address.
* @param next Next page.
* @param nextId Next page ID.
- * @param nextBuf Next buffer.
+ * @param nextPageAddr Next page address.
* @param bucket Bucket index.
* @return Page to recycle.
* @throws IgniteCheckedException If failed.
@@ -1102,35 +1154,35 @@ public abstract class PagesList extends DataStructure {
private long doMerge(
long pageId,
Page page,
- long buf,
+ long pageAddr,
PagesListNodeIO io,
Page next,
long nextId,
- long nextBuf,
+ long nextPageAddr,
int bucket
) throws IgniteCheckedException {
- long prevId = io.getPreviousId(buf);
+ long prevId = io.getPreviousId(pageAddr);
if (nextId == 0L)
- return mergeNoNext(pageId, page, buf, prevId, bucket);
+ return mergeNoNext(pageId, page, pageAddr, prevId, bucket);
else {
// No one must be able to merge it while we keep a reference.
- assert getPageId(nextBuf) == nextId;
+ assert getPageId(nextPageAddr) == nextId;
if (prevId == 0L) { // No previous page: we are at head.
// These references must be updated at the same time in write locks.
- assert PagesListNodeIO.VERSIONS.forPage(nextBuf).getPreviousId(nextBuf) == pageId;
+ assert PagesListNodeIO.VERSIONS.forPage(nextPageAddr).getPreviousId(nextPageAddr) == pageId;
- PagesListNodeIO nextIO = PagesListNodeIO.VERSIONS.forPage(nextBuf);
- nextIO.setPreviousId(nextBuf, 0);
+ PagesListNodeIO nextIO = PagesListNodeIO.VERSIONS.forPage(nextPageAddr);
+ nextIO.setPreviousId(nextPageAddr, 0);
if (isWalDeltaRecordNeeded(wal, next))
wal.log(new PagesListSetPreviousRecord(cacheId, nextId, 0L));
}
else // Do a fair merge: link previous and next to each other.
- fairMerge(prevId, pageId, nextId, next, nextBuf);
+ fairMerge(prevId, pageId, nextId, next, nextPageAddr);
- return recyclePage(pageId, page, buf);
+ return recyclePage(pageId, page, pageAddr);
}
}
@@ -1141,14 +1193,14 @@ public abstract class PagesList extends DataStructure {
* @param pageId Page ID.
* @param next Next page.
* @param nextId Next page ID.
- * @param nextBuf Next buffer.
+ * @param nextPageAddr Next page address.
* @throws IgniteCheckedException If failed.
*/
private void fairMerge(long prevId,
long pageId,
long nextId,
Page next,
- long nextBuf)
+ long nextPageAddr)
throws IgniteCheckedException {
try (Page prev = page(prevId)) {
long prevPageAddr = writeLock(prev); // No check, we keep a reference.
@@ -1157,18 +1209,18 @@ public abstract class PagesList extends DataStructure {
try {
PagesListNodeIO prevIO = PagesListNodeIO.VERSIONS.forPage(prevPageAddr);
- PagesListNodeIO nextIO = PagesListNodeIO.VERSIONS.forPage(nextBuf);
+ PagesListNodeIO nextIO = PagesListNodeIO.VERSIONS.forPage(nextPageAddr);
// These references must be updated at the same time in write locks.
assert prevIO.getNextId(prevPageAddr) == pageId;
- assert nextIO.getPreviousId(nextBuf) == pageId;
+ assert nextIO.getPreviousId(nextPageAddr) == pageId;
prevIO.setNextId(prevPageAddr, nextId);
if (isWalDeltaRecordNeeded(wal, prev))
wal.log(new PagesListSetNextRecord(cacheId, prevId, nextId));
- nextIO.setPreviousId(nextBuf, prevId);
+ nextIO.setPreviousId(nextPageAddr, prevId);
if (isWalDeltaRecordNeeded(wal, next))
wal.log(new PagesListSetPreviousRecord(cacheId, nextId, prevId));
@@ -1182,14 +1234,14 @@ public abstract class PagesList extends DataStructure {
/**
* @param page Page.
* @param pageId Page ID.
- * @param buf Byte buffer.
+ * @param pageAddr Page address.
* @return Rotated page ID.
* @throws IgniteCheckedException If failed.
*/
- private long recyclePage(long pageId, Page page, long buf) throws IgniteCheckedException {
+ private long recyclePage(long pageId, Page page, long pageAddr) throws IgniteCheckedException {
pageId = PageIdUtils.rotatePageId(pageId);
- PageIO.setPageId(buf, pageId);
+ PageIO.setPageId(pageAddr, pageId);
if (isWalDeltaRecordNeeded(wal, page))
wal.log(new RecycleRecord(cacheId, page.id(), pageId));
@@ -1238,6 +1290,9 @@ public abstract class PagesList extends DataStructure {
/** */
public volatile long tailId;
+ /** */
+ volatile boolean empty;
+
/**
* @param tailId Tail ID.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/c304cb10/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java
index 5f8aec9..3dd9911 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java
@@ -28,6 +28,7 @@ public interface IgniteChangeGlobalStateSupport {
* Called when cluster performing activation.
*
* @param kctx Kernal context.
+ * @throws IgniteCheckedException If failed.
*/
public void onActivate(GridKernalContext kctx) throws IgniteCheckedException;
@@ -35,6 +36,7 @@ public interface IgniteChangeGlobalStateSupport {
* Called when cluster performing deactivation.
*
* @param kctx Kernal context.
+ * @throws IgniteCheckedException If failed.
*/
public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException;
}