You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2018/05/11 15:49:57 UTC
[1/2] ignite git commit: IGNITE-5874 Store TTL expire times in B+
tree on per-partition basis - Fixes #3231.
Repository: ignite
Updated Branches:
refs/heads/master 01f60542e -> 89c775737
Updated Tags: refs/tags/1.0.0-RELEASE-TEST-RC1 [created] a37da05fb
refs/tags/1.0.0-RELEASE-TEST-RC2 [created] 16ba1421d
refs/tags/1.0.0-RELEASE-TEST-RC3 [created] 1b3c8184d
refs/tags/1.0.0-RELEASE-TEST-RC4 [created] d9ab06450
refs/tags/1.0.1-RELEASE-TEST-RC5 [created] eee94fa28
refs/tags/1.0.1-RELEASE-TEST-RC6 [created] 4de9e8dc5
refs/tags/1.10.0.ea1 [created] 0dae7316d
refs/tags/1.10.0.ea2 [created] ab4808545
refs/tags/1.10.0.ea3 [created] 62df82653
refs/tags/1.10.0.ea5 [created] 30460d2a5
refs/tags/1.10.1.ea4 [created] 8c83d167d
refs/tags/1.5.1-QAVS1901 [created] 5c8283571
refs/tags/1.5.10 [created] 4bf9edd68
refs/tags/1.5.11 [created] 460f0078f
refs/tags/1.5.12 [created] fb675772d
refs/tags/1.5.14 [created] 1362553b4
refs/tags/1.5.15 [created] 78e671138
refs/tags/1.5.16 [created] b39f54f21
refs/tags/1.5.17 [created] d4de1cc0f
refs/tags/1.5.18 [created] 67b8c1ae2
refs/tags/1.5.19 [created] 953e9db0d
refs/tags/1.5.20 [created] fca3c78c7
refs/tags/1.5.21 [created] bd1916bfe
refs/tags/1.5.22 [created] 597ea0cf5
refs/tags/1.5.23 [created] f2832c6ff
refs/tags/1.5.24 [created] 692cdf3e4
refs/tags/1.5.25 [created] c1a354a7a
refs/tags/1.5.26 [created] 0d4dbc416
refs/tags/1.5.27 [created] 847b27814
refs/tags/1.5.28 [created] 55dbc8a62
refs/tags/1.5.29 [created] a1980c6f6
refs/tags/1.5.30 [created] ae7e6cc95
refs/tags/1.5.31 [created] 767f88bcf
refs/tags/1.5.32 [created] f57631d98
refs/tags/1.5.33 [created] 1986b93b2
refs/tags/1.5.4 [created] db62c7a78
refs/tags/1.5.5 [created] 8f3ae6bab
refs/tags/1.5.5-QATEST [created] 6b4e4be3a
refs/tags/1.5.6 [created] 75591a935
refs/tags/1.5.7 [created] c9020c405
refs/tags/1.5.7-QATEST [created] 80b21ebb8
refs/tags/1.5.7-TEST [created] 57c19c20a
refs/tags/1.5.8 [created] 210366eb7
refs/tags/1.5.9 [created] cc595929c
refs/tags/1.6.1 [created] b418cea04
refs/tags/1.6.10 [created] f1c424bd2
refs/tags/1.6.11 [created] babff41f0
refs/tags/1.6.12 [created] a22010d32
refs/tags/1.6.2 [created] 072e3b3ce
refs/tags/1.6.3 [created] 9d0212018
refs/tags/1.6.5 [created] 62c101cf3
refs/tags/1.6.6 [created] f14c6fb25
refs/tags/1.6.7 [created] 8e5ecdde0
refs/tags/1.6.8 [created] 4ef6b1743
refs/tags/1.6.8-QAVS1902 [created] 5739b6a1c
refs/tags/1.6.9 [created] 93723542e
refs/tags/1.7.1 [created] 3dd286282
refs/tags/1.7.2 [created] 9e67197e3
refs/tags/1.7.3 [created] 596479c91
refs/tags/1.7.4 [created] 3fae2e313
refs/tags/1.7.4-p1 [created] 0da3c2ed0
refs/tags/1.7.5 [created] ba3cccc88
refs/tags/1.8.0-b1 [created] 947266517
refs/tags/1.8.0.b2 [created] 5b0cedfe4
refs/tags/1.8.1 [created] 8fe1fc191
refs/tags/1.8.2 [created] f255ff094
refs/tags/2.2.4-test [created] e8e0d75d7
refs/tags/2.2.5-test [created] 36cb161e9
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java
new file mode 100644
index 0000000..6fa039d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.migration;
+
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IndexStorage;
+import org.apache.ignite.internal.processors.cache.persistence.RootPage;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
+import org.apache.ignite.internal.processors.cache.tree.PendingRow;
+import org.apache.ignite.internal.util.IgniteTree;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree.WITHOUT_KEY;
+
+/**
+ * Ignite native persistence migration task upgrades existed PendingTrees to per-partition basis. It's ignore possible
+ * assertions errors when a pointer to an entry exists in tree but the entry itself was removed due to some reason (e.g.
+ * when partition was evicted after restart).
+ *
+ * Task goes through persistent cache groups and copy entries to certain partitions.
+ */
+public class UpgradePendingTreeToPerPartitionTask implements IgniteCallable<Boolean> {
+ /** */
+ private static final String PENDING_ENTRIES_TREE_NAME = "PendingEntries";
+
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public static final int BATCH_SIZE = 500;
+
+ /** */
+ @IgniteInstanceResource
+ private IgniteEx node;
+
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws IgniteException {
+ GridCacheSharedContext<Object, Object> sharedCtx = node.context().cache().context();
+
+ for (CacheGroupContext grp : sharedCtx.cache().cacheGroups()) {
+ if (!grp.persistenceEnabled() || !grp.affinityNode()) {
+ if (!grp.persistenceEnabled())
+ log.info("Skip pending tree upgrade for non-persistent cache group: [grpId=" + grp.groupId() +
+ ", grpName=" + grp.name() + ']');
+ else
+ log.info("Skip pending tree upgrade on non-affinity node for cache group: [grpId=" + grp.groupId() +
+ ", grpName=" + grp.name() + ']');
+
+ continue;
+ }
+
+ try {
+ processCacheGroup(grp);
+ }
+ catch (Exception ex) {
+ if (Thread.interrupted() || X.hasCause(ex, InterruptedException.class))
+ log.info("Upgrade pending tree has been cancelled.");
+ else
+ log.warning("Failed to upgrade pending tree for cache group: [grpId=" + grp.groupId() +
+ ", grpName=" + grp.name() + ']', ex);
+
+ return false;
+ }
+
+ if (Thread.interrupted()) {
+ log.info("Upgrade pending tree has been cancelled.");
+
+ return false;
+ }
+ }
+
+ log.info("All pending trees upgraded successfully.");
+
+ return true;
+ }
+
+ /**
+ * Converts CacheGroup pending tree to per-partition basis.
+ *
+ * @param grp Cache group.
+ * @throws IgniteCheckedException If error occurs.
+ */
+ private void processCacheGroup(CacheGroupContext grp) throws IgniteCheckedException {
+ assert grp.offheap() instanceof GridCacheOffheapManager;
+
+ PendingEntriesTree oldPendingTree;
+
+ final IgniteCacheDatabaseSharedManager db = grp.shared().database();
+
+ db.checkpointReadLock();
+ try {
+ IndexStorage indexStorage = ((GridCacheOffheapManager)grp.offheap()).getIndexStorage();
+
+ //TODO: IGNITE-5874: replace with some check-method to avoid unnecessary page allocation.
+ RootPage pendingRootPage = indexStorage.getOrAllocateForTree(PENDING_ENTRIES_TREE_NAME);
+
+ if (pendingRootPage.isAllocated()) {
+ log.info("No pending tree found for cache group: [grpId=" + grp.groupId() +
+ ", grpName=" + grp.name() + ']');
+
+ // Nothing to do here as just allocated tree is obviously empty.
+ indexStorage.dropRootPage(PENDING_ENTRIES_TREE_NAME);
+
+ return;
+ }
+
+ oldPendingTree = new PendingEntriesTree(
+ grp,
+ PENDING_ENTRIES_TREE_NAME,
+ grp.dataRegion().pageMemory(),
+ pendingRootPage.pageId().pageId(),
+ ((GridCacheOffheapManager)grp.offheap()).reuseListForIndex(null),
+ false
+ );
+ }
+ finally {
+ db.checkpointReadUnlock();
+ }
+
+ processPendingTree(grp, oldPendingTree);
+
+ if (Thread.currentThread().isInterrupted())
+ return;
+
+ db.checkpointReadLock();
+ try {
+ oldPendingTree.destroy();
+ }
+ finally {
+ db.checkpointReadUnlock();
+ }
+ }
+
+ /**
+ * Move pending rows for CacheGroup entries to per-partition PendingTree.
+ * Invalid pending rows will be ignored.
+ *
+ * @param grp Cache group.
+ * @param oldPendingEntries Old-style PendingTree.
+ * @throws IgniteCheckedException If error occurs.
+ */
+ private void processPendingTree(CacheGroupContext grp, PendingEntriesTree oldPendingEntries)
+ throws IgniteCheckedException {
+ final PageMemory pageMemory = grp.dataRegion().pageMemory();
+
+ final IgniteCacheDatabaseSharedManager db = grp.shared().database();
+
+ final Set<Integer> cacheIds = grp.cacheIds();
+
+ PendingRow row = null;
+
+ int processedEntriesCnt = 0;
+ int skippedEntries = 0;
+
+ // Re-acquire checkpoint lock for every next batch.
+ while (!Thread.currentThread().isInterrupted()) {
+ int cnt = 0;
+
+ db.checkpointReadLock();
+ try {
+ GridCursor<PendingRow> cursor = oldPendingEntries.find(row, null, WITHOUT_KEY);
+
+ while (cnt++ < BATCH_SIZE && cursor.next()) {
+ row = cursor.get();
+
+ assert row.link != 0 && row.expireTime != 0 : row;
+
+ GridCacheEntryEx entry;
+
+ // Lost cache or lost entry.
+ if (!cacheIds.contains(row.cacheId) || (entry = getEntry(grp, row)) == null) {
+ skippedEntries++;
+
+ oldPendingEntries.removex(row);
+
+ continue;
+ }
+
+ entry.lockEntry();
+ try {
+ if (processRow(pageMemory, grp, row))
+ processedEntriesCnt++;
+ else
+ skippedEntries++;
+ }
+ finally {
+ entry.unlockEntry();
+ }
+
+ oldPendingEntries.removex(row);
+ }
+
+ if (cnt < BATCH_SIZE)
+ break;
+ }
+ finally {
+ db.checkpointReadUnlock();
+ }
+ }
+
+ log.info("PendingTree upgraded: " +
+ "[grpId=" + grp.groupId() +
+ ", grpName=" + grp.name() +
+ ", processedEntries=" + processedEntriesCnt +
+ ", failedEntries=" + skippedEntries +
+ ']');
+ }
+
+ /**
+ * Return CacheEntry instance for lock purpose.
+ *
+ * @param grp Cache group
+ * @param row Pending row.
+ * @return CacheEntry if found or null otherwise.
+ */
+ private GridCacheEntryEx getEntry(CacheGroupContext grp, PendingRow row) {
+ try {
+ CacheDataRowAdapter rowData = new CacheDataRowAdapter(row.link);
+
+ rowData.initFromLink(grp, CacheDataRowAdapter.RowData.KEY_ONLY);
+
+ GridCacheContext cctx = grp.shared().cacheContext(row.cacheId);
+
+ assert cctx != null;
+
+ return cctx.cache().entryEx(rowData.key());
+ }
+ catch (Throwable ex) {
+ if (Thread.currentThread().isInterrupted() || X.hasCause(ex, InterruptedException.class))
+ throw new IgniteException(new InterruptedException());
+
+ log.warning("Failed to move old-version pending entry " +
+ "to per-partition PendingTree: key not found (skipping): " +
+ "[grpId=" + grp.groupId() +
+ ", grpName=" + grp.name() +
+ ", pendingRow=" + row + "]");
+
+ return null;
+ }
+
+ }
+
+ /**
+ * Validates PendingRow and add it to per-partition PendingTree.
+ *
+ * @param pageMemory Page memory.
+ * @param grp Cache group.
+ * @param row Pending row.
+ * @return {@code True} if pending row successfully moved, {@code False} otherwise.
+ */
+ private boolean processRow(PageMemory pageMemory, CacheGroupContext grp, PendingRow row) {
+ final long pageId = PageIdUtils.pageId(row.link);
+
+ final int partition = PageIdUtils.partId(pageId);
+
+ assert partition >= 0;
+
+ try {
+ final long page = pageMemory.acquirePage(grp.groupId(), pageId);
+ long pageAddr = pageMemory.readLock(grp.groupId(), pageId, page);
+ try {
+ assert PageIO.getType(pageAddr) != 0;
+ assert PageIO.getVersion(pageAddr) != 0;
+
+ IgniteCacheOffheapManager.CacheDataStore store =
+ ((GridCacheOffheapManager)grp.offheap()).dataStore(partition);
+
+ if (store == null) {
+ log.warning("Failed to move old-version pending entry " +
+ "to per-partition PendingTree: Node has no partition anymore (skipping): " +
+ "[grpId=" + grp.groupId() +
+ ", grpName=" + grp.name() +
+ ", partId=" + partition +
+ ", pendingRow=" + row + "]");
+
+ return false;
+ }
+
+ assert store instanceof GridCacheOffheapManager.GridCacheDataStore;
+ assert store.pendingTree() != null;
+
+ store.pendingTree().invoke(row, WITHOUT_KEY, new PutIfAbsentClosure(row));
+ }
+ finally {
+ pageMemory.readUnlock(grp.groupId(), pageId, page);
+ }
+ }
+ catch (AssertionError | Exception ex) {
+ if (Thread.currentThread().isInterrupted() || X.hasCause(ex, InterruptedException.class)) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteException(ex);
+ }
+
+ String msg = "Unexpected error occurs while moving old-version pending entry " +
+ "to per-partition PendingTree. Seems page doesn't longer exists (skipping): " +
+ "[grpId=" + grp.groupId() +
+ ", grpName=" + grp.name() +
+ ", partId=" + partition +
+ ", pendingRow=" + row + ']';
+
+ if (log.isDebugEnabled())
+ log.warning(msg, ex);
+ else
+ log.warning(msg);
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /** */
+ private static class PutIfAbsentClosure implements IgniteTree.InvokeClosure<PendingRow> {
+ /** */
+ private final PendingRow pendingRow;
+
+ /** */
+ private IgniteTree.OperationType op;
+
+ /** */
+ PutIfAbsentClosure(PendingRow pendingRow) {
+ this.pendingRow = pendingRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void call(@Nullable PendingRow oldRow) throws IgniteCheckedException {
+ op = (oldRow == null) ? IgniteTree.OperationType.PUT : IgniteTree.OperationType.NOOP;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PendingRow newRow() {
+ return pendingRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTree.OperationType operationType() {
+ return op;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
index a5236c2..c940c39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
@@ -191,7 +191,6 @@ public abstract class PageIO {
/** */
public static final short T_DATA_REF_METASTORAGE_LEAF = 23;
-
/** Index for payload == 1. */
public static final short T_H2_EX_REF_LEAF_START = 10000;
@@ -215,8 +214,8 @@ public abstract class PageIO {
* @param ver Page format version.
*/
protected PageIO(int type, int ver) {
- assert ver > 0 && ver < 65535: ver;
- assert type > 0 && type < 65535: type;
+ assert ver > 0 && ver < 65535 : ver;
+ assert type > 0 && type < 65535 : type;
this.type = type;
this.ver = ver;
@@ -245,7 +244,7 @@ public abstract class PageIO {
public static void setType(long pageAddr, int type) {
PageUtils.putShort(pageAddr, TYPE_OFF, (short)type);
- assert getType(pageAddr) == type;
+ assert getType(pageAddr) == type : getType(pageAddr);
}
/**
@@ -268,7 +267,7 @@ public abstract class PageIO {
* @param pageAddr Page address.
* @param ver Version.
*/
- private static void setVersion(long pageAddr, int ver) {
+ protected static void setVersion(long pageAddr, int ver) {
PageUtils.putShort(pageAddr, VER_OFF, (short)ver);
assert getVersion(pageAddr) == ver;
@@ -580,7 +579,7 @@ public abstract class PageIO {
* @param pageSize Page size.
* @param sb Sb.
*/
- protected abstract void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException ;
+ protected abstract void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException;
/**
* @param addr Address.
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
index 3d79884..fe6b7a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
@@ -42,9 +42,13 @@ public class PagePartitionMetaIO extends PageMetaIO {
/** */
private static final int NEXT_PART_META_PAGE_OFF = PARTITION_STATE_OFF + 1;
+ /** End of page partition meta. */
+ static final int END_OF_PARTITION_PAGE_META = NEXT_PART_META_PAGE_OFF + 8;
+
/** */
public static final IOVersions<PagePartitionMetaIO> VERSIONS = new IOVersions<>(
- new PagePartitionMetaIO(1)
+ new PagePartitionMetaIO(1),
+ new PagePartitionMetaIOV2(2)
);
/** {@inheritDoc} */
@@ -150,6 +154,7 @@ public class PagePartitionMetaIO extends PageMetaIO {
/**
* Returns partition counters page identifier, page with caches in cache group sizes.
+ *
* @param pageAddr Partition metadata page address.
* @return Next meta partial page ID or {@code 0} if it does not exist.
*/
@@ -167,19 +172,39 @@ public class PagePartitionMetaIO extends PageMetaIO {
PageUtils.putLong(pageAddr, NEXT_PART_META_PAGE_OFF, cntrsPageId);
}
+ /**
+ * Returns partition pending tree root. Pending tree is used to tracking expiring entries.
+ *
+ * @param pageAddr Page address.
+ * @return Pending Tree root page.
+ */
+ public long getPendingTreeRoot(long pageAddr) {
+ throw new UnsupportedOperationException("Per partition pending tree is not supported by " +
+ "this PagePartitionMetaIO version: ver=" + getVersion());
+ }
+
+ /**
+ * Sets new partition pending tree root.
+ *
+ * @param pageAddr Page address.
+ * @param treeRoot Pending Tree root
+ */
+ public void setPendingTreeRoot(long pageAddr, long treeRoot) {
+ throw new UnsupportedOperationException("Per partition pending tree is not supported by " +
+ "this PagePartitionMetaIO version: ver=" + getVersion());
+ }
+
/** {@inheritDoc} */
@Override protected void printPage(long pageAddr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException {
super.printPage(pageAddr, pageSize, sb);
byte state = getPartitionState(pageAddr);
- sb
- .a(",\nPagePartitionMeta[\n\tsize=").a(getSize(pageAddr))
+ sb.a(",\nPagePartitionMeta[\n\tsize=").a(getSize(pageAddr))
.a(",\n\tupdateCounter=").a(getUpdateCounter(pageAddr))
.a(",\n\tglobalRemoveId=").a(getGlobalRemoveId(pageAddr))
.a(",\n\tpartitionState=").a(state).a("(").a(GridDhtPartitionState.fromOrdinal(state)).a(")")
.a(",\n\tcountersPageId=").a(getCountersPageId(pageAddr))
- .a("\n]")
- ;
+ .a("\n]");
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java
new file mode 100644
index 0000000..70556a1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.tree.io;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.util.GridStringBuilder;
+
+/**
+ * IO for partition metadata pages.
+ * Persistent partition contains it's own PendingTree.
+ */
+public class PagePartitionMetaIOV2 extends PagePartitionMetaIO {
+ /** */
+ private static final int PENDING_TREE_ROOT_OFF = PagePartitionMetaIO.END_OF_PARTITION_PAGE_META;
+
+ /**
+ * @param ver Version.
+ */
+ public PagePartitionMetaIOV2(int ver) {
+ super(ver);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initNewPage(long pageAddr, long pageId, int pageSize) {
+ super.initNewPage(pageAddr, pageId, pageSize);
+
+ setPendingTreeRoot(pageAddr, 0L);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getPendingTreeRoot(long pageAddr) {
+ return PageUtils.getLong(pageAddr, PENDING_TREE_ROOT_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setPendingTreeRoot(long pageAddr, long treeRoot) {
+ PageUtils.putLong(pageAddr, PENDING_TREE_ROOT_OFF, treeRoot);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void printPage(long pageAddr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException {
+ byte state = getPartitionState(pageAddr);
+
+ sb.a("PagePartitionMeta[\n\ttreeRoot=").a(getReuseListRoot(pageAddr));
+ sb.a(",\n\tpendingTreeRoot=").a(getLastSuccessfulFullSnapshotId(pageAddr));
+ sb.a(",\n\tlastSuccessfulFullSnapshotId=").a(getLastSuccessfulFullSnapshotId(pageAddr));
+ sb.a(",\n\tlastSuccessfulSnapshotId=").a(getLastSuccessfulSnapshotId(pageAddr));
+ sb.a(",\n\tnextSnapshotTag=").a(getNextSnapshotTag(pageAddr));
+ sb.a(",\n\tlastSuccessfulSnapshotTag=").a(getLastSuccessfulSnapshotTag(pageAddr));
+ sb.a(",\n\tlastAllocatedPageCount=").a(getLastAllocatedPageCount(pageAddr));
+ sb.a(",\n\tcandidatePageCount=").a(getCandidatePageCount(pageAddr));
+ sb.a(",\n\tsize=").a(getSize(pageAddr));
+ sb.a(",\n\tupdateCounter=").a(getUpdateCounter(pageAddr));
+ sb.a(",\n\tglobalRemoveId=").a(getGlobalRemoveId(pageAddr));
+ sb.a(",\n\tpartitionState=").a(state).a("(").a(GridDhtPartitionState.fromOrdinal(state)).a(")");
+ sb.a(",\n\tcountersPageId=").a(getCountersPageId(pageAddr));
+ sb.a("\n]");
+ }
+
+ /**
+ * Upgrade page to PagePartitionMetaIOV2
+ *
+ * @param pageAddr Page address.
+ */
+ public void upgradePage(long pageAddr) {
+ assert PageIO.getType(pageAddr) == getType();
+ assert PageIO.getVersion(pageAddr) < 2;
+
+ PageIO.setVersion(pageAddr, getVersion());
+ setPendingTreeRoot(pageAddr, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
index 43a2303..ebe6f29 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
@@ -55,10 +55,11 @@ public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstrac
Mockito.when(topologyMock.partitions()).thenReturn(3);
List<GridDhtLocalPartition> localPartitions = Lists.newArrayList(
- partitionMock(0, 1, 1),
- partitionMock(1, 2, 2),
- partitionMock(2, 3, 3)
+ partitionMock(0, 1, 1),
+ partitionMock(1, 2, 2),
+ partitionMock(2, 3, 3)
);
+
Mockito.when(topologyMock.localPartitions()).thenReturn(localPartitions);
Mockito.when(topologyMock.currentLocalPartitions()).thenReturn(localPartitions);
}
@@ -82,10 +83,13 @@ public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstrac
*/
private GridDhtPartitionsSingleMessage from(@Nullable Map<Integer, T2<Long, Long>> countersMap, @Nullable Map<Integer, Long> sizesMap) {
GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage();
+
if (countersMap != null)
msg.addPartitionUpdateCounters(0, countersMap);
+
if (sizesMap != null)
msg.addPartitionSizes(0, sizesMap);
+
return msg;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
index 99614ed..a02ed11 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
@@ -17,32 +17,35 @@
package org.apache.ignite.internal.processors.cache.persistence;
+import java.io.Serializable;
import java.util.Map;
+import java.util.Objects;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
/**
- *
+ * Cause by https://issues.apache.org/jira/browse/IGNITE-7278
*/
public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
/** */
@@ -52,7 +55,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
private static final int ENTRIES_COUNT = 10_000;
/** */
- public static final String CACHE_NAME = "cache1";
+ protected static final String CACHE_NAME = "cache1";
/** Checkpoint delay. */
private volatile int checkpointDelay = -1;
@@ -79,21 +82,23 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
DataStorageConfiguration memCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
- new DataRegionConfiguration().setMaxSize(400 * 1024 * 1024).setPersistenceEnabled(true))
+ new DataRegionConfiguration()
+ .setMaxSize(400 * 1024 * 1024)
+ .setPersistenceEnabled(true))
.setWalMode(WALMode.LOG_ONLY)
.setCheckpointFrequency(checkpointDelay);
cfg.setDataStorageConfiguration(memCfg);
- CacheConfiguration ccfg1 = new CacheConfiguration();
+ CacheConfiguration ccfg = new CacheConfiguration();
- ccfg1.setName(CACHE_NAME);
- ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
- ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- ccfg1.setAffinity(new RendezvousAffinityFunction(false, 128));
- ccfg1.setBackups(2);
+ ccfg.setName(CACHE_NAME);
+ ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, 128));
+ ccfg.setBackups(2);
- cfg.setCacheConfiguration(ccfg1);
+ cfg.setCacheConfiguration(ccfg);
return cfg;
}
@@ -197,7 +202,6 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
}
/**
- *
* @throws Exception if failed.
*/
public void testRebalncingDuringLoad_10_10_1_1() throws Exception {
@@ -205,7 +209,6 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
}
/**
- *
* @throws Exception if failed.
*/
public void testRebalncingDuringLoad_10_500_8_16() throws Exception {
@@ -227,7 +230,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
final Ignite load = ignite(0);
- load.active(true);
+ load.cluster().active(true);
try (IgniteDataStreamer<Object, Object> s = load.dataStreamer(CACHE_NAME)) {
s.allowOverwrite(true);
@@ -245,10 +248,13 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
Random rnd = ThreadLocalRandom.current();
while (!done.get()) {
- Map<Integer, Integer> map = new TreeMap<>();
+ Map<Integer, Person> map = new TreeMap<>();
- for (int i = 0; i < batch; i++)
- map.put(rnd.nextInt(ENTRIES_COUNT), rnd.nextInt());
+ for (int i = 0; i < batch; i++) {
+ int key = rnd.nextInt(ENTRIES_COUNT);
+
+ map.put(key, new Person("fn" + key, "ln" + key));
+ }
cache.putAll(map);
}
@@ -277,4 +283,51 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
busyFut.get();
}
+
+ /**
+ *
+ */
+ static class Person implements Serializable {
+ /** */
+ @GridToStringInclude
+ @QuerySqlField(index = true, groups = "full_name")
+ private String fName;
+
+ /** */
+ @GridToStringInclude
+ @QuerySqlField(index = true, groups = "full_name")
+ private String lName;
+
+ /**
+ * @param fName First name.
+ * @param lName Last name.
+ */
+ public Person(String fName, String lName) {
+ this.fName = fName;
+ this.lName = lName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Person.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ IgnitePersistentStoreCacheGroupsTest.Person person = (IgnitePersistentStoreCacheGroupsTest.Person)o;
+
+ return Objects.equals(fName, person.fName) && Objects.equals(lName, person.lName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(fName, lName);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java
deleted file mode 100644
index 66b2047..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence;
-
-import java.util.Map;
-import java.util.Random;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- * Cause by https://issues.apache.org/jira/browse/IGNITE-7278
- */
-public class IgnitePdsContinuousRestartTest2 extends GridCommonAbstractTest {
- /** */
- private static final int GRID_CNT = 4;
-
- /** */
- private static final int ENTRIES_COUNT = 10_000;
-
- /** */
- public static final String CACHE_NAME = "cache1";
-
- /** Checkpoint delay. */
- private volatile int checkpointDelay = -1;
-
- /** */
- private boolean cancel;
-
- /**
- * Default constructor.
- */
- public IgnitePdsContinuousRestartTest2() {
-
- }
-
- /**
- * @param cancel Cancel.
- */
- public IgnitePdsContinuousRestartTest2(boolean cancel) {
- this.cancel = cancel;
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- DataStorageConfiguration memCfg = new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(
- new DataRegionConfiguration()
- .setMaxSize(400 * 1024 * 1024)
- .setPersistenceEnabled(true))
- .setWalMode(WALMode.LOG_ONLY)
- .setCheckpointFrequency(checkpointDelay);
-
- cfg.setDataStorageConfiguration(memCfg);
-
- CacheConfiguration ccfg = new CacheConfiguration();
-
- ccfg.setName(CACHE_NAME);
- ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
- ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- ccfg.setAffinity(new RendezvousAffinityFunction(false, 128));
- ccfg.setBackups(2);
-
- cfg.setCacheConfiguration(ccfg);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- stopAllGrids();
-
- cleanPersistenceDir();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
-
- cleanPersistenceDir();
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testRebalancingDuringLoad_1000_500_1_1() throws Exception {
- checkRebalancingDuringLoad(1000, 500, 1, 1);
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testRebalancingDuringLoad_8000_500_1_1() throws Exception {
- checkRebalancingDuringLoad(8000, 500, 1, 1);
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testRebalancingDuringLoad_1000_20000_1_1() throws Exception {
- checkRebalancingDuringLoad(1000, 20000, 1, 1);
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testRebalancingDuringLoad_8000_8000_1_1() throws Exception {
- checkRebalancingDuringLoad(8000, 8000, 1, 1);
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testRebalancingDuringLoad_1000_500_8_1() throws Exception {
- checkRebalancingDuringLoad(1000, 500, 8, 1);
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testRebalancingDuringLoad_8000_500_8_1() throws Exception {
- checkRebalancingDuringLoad(8000, 500, 8, 1);
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testRebalancingDuringLoad_1000_20000_8_1() throws Exception {
- checkRebalancingDuringLoad(1000, 20000, 8, 1);
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testRebalancingDuringLoad_8000_8000_8_1() throws Exception {
- checkRebalancingDuringLoad(8000, 8000, 8, 1);
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testRebalancingDuringLoad_1000_500_8_16() throws Exception {
- checkRebalancingDuringLoad(1000, 500, 8, 16);
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testRebalancingDuringLoad_8000_500_8_16() throws Exception {
- checkRebalancingDuringLoad(8000, 500, 8, 16);
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testRebalancingDuringLoad_1000_20000_8_16() throws Exception {
- checkRebalancingDuringLoad(1000, 20000, 8, 16);
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testRebalancingDuringLoad_8000_8000_8_16() throws Exception {
- checkRebalancingDuringLoad(8000, 8000, 8, 16);
- }
-
- /**
- *
- * @throws Exception if failed.
- */
- public void testRebalncingDuringLoad_10_10_1_1() throws Exception {
- checkRebalancingDuringLoad(10, 10, 1, 1);
- }
-
- /**
- *
- * @throws Exception if failed.
- */
- public void testRebalncingDuringLoad_10_500_8_16() throws Exception {
- checkRebalancingDuringLoad(10, 500, 8, 16);
- }
-
- /**
- * @throws Exception if failed.
- */
- private void checkRebalancingDuringLoad(
- int restartDelay,
- int checkpointDelay,
- int threads,
- final int batch
- ) throws Exception {
- this.checkpointDelay = checkpointDelay;
-
- startGrids(GRID_CNT);
-
- final Ignite load = ignite(0);
-
- load.cluster().active(true);
-
- try (IgniteDataStreamer<Object, Object> s = load.dataStreamer(CACHE_NAME)) {
- s.allowOverwrite(true);
-
- for (int i = 0; i < ENTRIES_COUNT; i++)
- s.addData(i, i);
- }
-
- final AtomicBoolean done = new AtomicBoolean(false);
-
- IgniteInternalFuture<?> busyFut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
- /** {@inheritDoc} */
- @Override public Object call() throws Exception {
- IgniteCache<Object, Object> cache = load.cache(CACHE_NAME);
- Random rnd = ThreadLocalRandom.current();
-
- while (!done.get()) {
- Map<Integer, Integer> map = new TreeMap<>();
-
- for (int i = 0; i < batch; i++)
- map.put(rnd.nextInt(ENTRIES_COUNT), rnd.nextInt());
-
- cache.putAll(map);
- }
-
- return null;
- }
- }, threads, "updater");
-
- long end = System.currentTimeMillis() + 90_000;
-
- Random rnd = ThreadLocalRandom.current();
-
- while (System.currentTimeMillis() < end) {
- int idx = rnd.nextInt(GRID_CNT - 1) + 1;
-
- stopGrid(idx, cancel);
-
- U.sleep(restartDelay);
-
- startGrid(idx);
-
- U.sleep(restartDelay);
- }
-
- done.set(true);
-
- busyFut.get();
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java
new file mode 100644
index 0000000..d5b3f55
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+
+/**
+ * Cause by https://issues.apache.org/jira/browse/IGNITE-5879
+ */
+public class IgnitePdsContinuousRestartTestWithExpiryPolicy extends IgnitePdsContinuousRestartTest {
+ /** Ip finder. */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /**
+ * Default constructor.
+ */
+ public IgnitePdsContinuousRestartTestWithExpiryPolicy() {
+ super(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+ discoverySpi.setIpFinder(ipFinder);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(CACHE_NAME);
+ ccfg.setGroupName("Group1");
+ ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, 128));
+ ccfg.setBackups(2);
+ ccfg.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1)));
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java
index 1825666..03dc445 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java
@@ -33,7 +33,7 @@ public abstract class IgniteBaselineAbstractFullApiSelfTest extends GridCacheAbs
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
- .setMaxSize(200 * 1024 * 1024)
+ .setMaxSize(256 * 1024 * 1024)
.setPersistenceEnabled(true))
.setWalMode(WALMode.LOG_ONLY));
@@ -41,6 +41,13 @@ public abstract class IgniteBaselineAbstractFullApiSelfTest extends GridCacheAbs
}
/** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
@Override protected int gridCount() {
return 4;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
new file mode 100644
index 0000000..be09e70
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import com.google.common.base.Strings;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.AccessedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test TTL worker with persistence enabled
+ */
+public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
+ /** */
+ public static final String CACHE = "expirableCache";
+
+ /** */
+ private static final int EXPIRATION_TIMEOUT = 10;
+
+ /** */
+ public static final int ENTRIES = 7000;
+
+ /** */
+ private static final TcpDiscoveryVmIpFinder FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ //protection if test failed to finish, e.g. by error
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ final IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+ disco.setIpFinder(FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ final CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(CACHE);
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, 128));
+ ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, EXPIRATION_TIMEOUT)));
+ ccfg.setEagerTtl(true);
+ ccfg.setGroupName("group1");
+
+ ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setMaxSize(256L * 1024 * 1024)
+ .setPersistenceEnabled(true)
+ ).setWalMode(WALMode.DEFAULT));
+
+ cfg.setCacheConfiguration(ccfg);
+ return cfg;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testTtlIsApplied() throws Exception {
+ loadAndWaitForCleanup(false);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testTtlIsAppliedAfterRestart() throws Exception {
+ loadAndWaitForCleanup(true);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ private void loadAndWaitForCleanup(boolean restartGrid) throws Exception {
+ IgniteEx srv = startGrid(0);
+ srv.cluster().active(true);
+
+ fillCache(srv.cache(CACHE));
+
+ if (restartGrid) {
+ stopGrid(0);
+ srv = startGrid(0);
+ srv.cluster().active(true);
+ }
+
+ final IgniteCache<Integer, String> cache = srv.cache(CACHE);
+
+ pringStatistics((IgniteCacheProxy)cache, "After restart from LFS");
+
+ waitAndCheckExpired(cache);
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testRebalancingWithTtlExpirable() throws Exception {
+ IgniteEx srv = startGrid(0);
+ srv.cluster().active(true);
+
+ fillCache(srv.cache(CACHE));
+
+ //causes rebalancing start
+ srv = startGrid(1);
+
+ final IgniteCache<Integer, String> cache = srv.cache(CACHE);
+
+ pringStatistics((IgniteCacheProxy)cache, "After rebalancing start");
+
+ waitAndCheckExpired(cache);
+
+ stopAllGrids();
+ }
+
+ /** */
+ protected void fillCache(IgniteCache<Integer, String> cache) {
+ cache.putAll(new TreeMap<Integer, String>() {{
+ for (int i = 0; i < ENTRIES; i++)
+ put(i, Strings.repeat("Some value " + i, 125));
+ }});
+
+ //Touch entries.
+ for (int i = 0; i < ENTRIES; i++)
+ cache.get(i); // touch entries
+
+ pringStatistics((IgniteCacheProxy)cache, "After cache puts");
+ }
+
+ /** */
+ protected void waitAndCheckExpired(final IgniteCache<Integer, String> cache) throws IgniteInterruptedCheckedException {
+ GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return cache.size() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(EXPIRATION_TIMEOUT + 1));
+
+ pringStatistics((IgniteCacheProxy)cache, "After timeout");
+
+ for (int i = 0; i < ENTRIES; i++)
+ assertNull(cache.get(i));
+ }
+
+ /** */
+ private void pringStatistics(IgniteCacheProxy cache, String msg) {
+ System.out.println(msg + " {{");
+ cache.context().printMemoryStats();
+ System.out.println("}} " + msg);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 1e32320..ab81d8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDynamicC
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSingleNodePutGetPersistenceTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheRestoreTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsDataRegionMetricsTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlTest;
import org.apache.ignite.internal.processors.cache.persistence.db.file.DefaultPageSizeBackwardsCompatibilityTest;
import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCheckpointSimulationWithRealCpDisabledTest;
import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsPageReplacementTest;
@@ -111,6 +112,7 @@ public class IgnitePdsTestSuite extends TestSuite {
// TODO uncomment when https://issues.apache.org/jira/browse/IGNITE-7510 is fixed
// suite.addTestSuite(IgnitePdsClientNearCachePutGetTest.class);
suite.addTestSuite(IgniteDbPutGetWithCacheStoreTest.class);
+ suite.addTestSuite(IgnitePdsWithTtlTest.class);
suite.addTestSuite(IgniteClusterActivateDeactivateTestWithPersistence.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 3f6f713..76cfe4f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -20,7 +20,7 @@ package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.persistence.IgniteDataStorageMetricsSelfTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest;
-import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest2;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithExpiryPolicy;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedCacheDataTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedStoreTest;
@@ -90,7 +90,7 @@ public class IgnitePdsTestSuite2 extends TestSuite {
// Rebalancing test
suite.addTestSuite(IgnitePdsContinuousRestartTest.class);
- suite.addTestSuite(IgnitePdsContinuousRestartTest2.class);
+ suite.addTestSuite(IgnitePdsContinuousRestartTestWithExpiryPolicy.class);
suite.addTestSuite(IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes.class);
@@ -115,7 +115,6 @@ public class IgnitePdsTestSuite2 extends TestSuite {
suite.addTestSuite(IgnitePdsWholeClusterRestartTest.class);
-
// Rebalancing test
suite.addTestSuite(IgniteWalHistoryReservationsTest.class);
[2/2] ignite git commit: IGNITE-5874 Store TTL expire times in B+
tree on per-partition basis - Fixes #3231.
Posted by ir...@apache.org.
IGNITE-5874 Store TTL expire times in B+ tree on per-partition basis - Fixes #3231.
Signed-off-by: Ivan Rakov <ir...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/89c77573
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/89c77573
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/89c77573
Branch: refs/heads/master
Commit: 89c775737936645eaf739b494cc1740cd9605095
Parents: 01f6054
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Fri May 11 18:45:38 2018 +0300
Committer: Ivan Rakov <ir...@apache.org>
Committed: Fri May 11 18:45:38 2018 +0300
----------------------------------------------------------------------
.../JettyRestProcessorAbstractSelfTest.java | 21 -
.../PdsWithTtlCompatibilityTest.java | 191 +++++++++
.../IgniteCompatibilityBasicTestSuite.java | 3 +
.../apache/ignite/IgniteSystemProperties.java | 11 +
.../delta/MetaPageUpdateLastAllocatedIndex.java | 6 +-
.../processors/cache/CacheGroupContext.java | 16 +-
.../processors/cache/GridCacheMapEntry.java | 41 +-
.../cache/IgniteCacheOffheapManager.java | 21 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 128 +++---
.../distributed/dht/GridDhtLocalPartition.java | 156 +++----
.../dht/GridDhtPartitionsStateValidator.java | 1 +
.../GridDhtPartitionsExchangeFuture.java | 14 +-
.../persistence/GridCacheOffheapManager.java | 415 ++++++++++++++-----
.../UpgradePendingTreeToPerPartitionTask.java | 380 +++++++++++++++++
.../cache/persistence/tree/io/PageIO.java | 11 +-
.../tree/io/PagePartitionMetaIO.java | 35 +-
.../tree/io/PagePartitionMetaIOV2.java | 90 ++++
...idCachePartitionsStateValidatorSelfTest.java | 10 +-
.../IgnitePdsContinuousRestartTest.java | 89 +++-
.../IgnitePdsContinuousRestartTest2.java | 281 -------------
...dsContinuousRestartTestWithExpiryPolicy.java | 67 +++
.../IgniteBaselineAbstractFullApiSelfTest.java | 9 +-
.../persistence/db/IgnitePdsWithTtlTest.java | 197 +++++++++
.../ignite/testsuites/IgnitePdsTestSuite.java | 2 +
.../ignite/testsuites/IgnitePdsTestSuite2.java | 5 +-
25 files changed, 1582 insertions(+), 618 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index 0285f3a..96391e9 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -978,27 +978,10 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
assertCacheOperation(ret, true);
}
- /** */
- private void failIgnite_5874() {
- DataStorageConfiguration dsCfg = ignite(0).configuration().getDataStorageConfiguration();
-
- if (dsCfg.getDefaultDataRegionConfiguration().isPersistenceEnabled())
- fail("IGNITE-5874");
-
- if (!F.isEmpty(dsCfg.getDataRegionConfigurations())) {
- for (DataRegionConfiguration dataRegCfg : dsCfg.getDataRegionConfigurations()) {
- if (dataRegCfg.isPersistenceEnabled())
- fail("IGNITE-5874");
- }
- }
- }
-
/**
* @throws Exception If failed.
*/
public void testPutWithExpiration() throws Exception {
- failIgnite_5874();
-
String ret = content(DEFAULT_CACHE_NAME, GridRestCommand.CACHE_PUT,
"key", "putKey",
"val", "putVal",
@@ -1035,8 +1018,6 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
* @throws Exception If failed.
*/
public void testAddWithExpiration() throws Exception {
- failIgnite_5874();
-
String ret = content(DEFAULT_CACHE_NAME, GridRestCommand.CACHE_ADD,
"key", "addKey",
"val", "addVal",
@@ -1176,8 +1157,6 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
* @throws Exception If failed.
*/
public void testReplaceWithExpiration() throws Exception {
- failIgnite_5874();
-
jcache().put("replaceKey", "replaceVal");
assertEquals("replaceVal", jcache().get("replaceKey"));
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java
new file mode 100644
index 0000000..f3649f6
--- /dev/null
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compatibility;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import javax.cache.Cache;
+import javax.cache.expiry.AccessedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.compatibility.persistence.IgnitePersistenceCompatibilityAbstractTest;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
+import org.apache.ignite.internal.processors.cache.persistence.migration.UpgradePendingTreeToPerPartitionTask;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Test PendingTree upgrading to per-partition basis. Test fill cache with persistence enabled and with ExpirePolicy
+ * configured on ignite-2.1 version and check if entries will be correctly expired when a new version node started.
+ *
+ * Note: Test for ignite-2.3 version will always fails due to entry ttl update fails with assertion on checkpoint lock
+ * check.
+ */
+public class PdsWithTtlCompatibilityTest extends IgnitePersistenceCompatibilityAbstractTest {
+ /** */
+ static final String TEST_CACHE_NAME = PdsWithTtlCompatibilityTest.class.getSimpleName();
+
+ /** */
+ static final int DURATION_SEC = 10;
+
+ /** */
+ private static final int ENTRIES_CNT = 100;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setMaxSize(32L * 1024 * 1024)
+ .setPersistenceEnabled(true)
+ ).setWalMode(WALMode.LOG_ONLY));
+
+ return cfg;
+ }
+
+ /**
+ * Tests opportunity to read data from previous Ignite DB version.
+ *
+ * @throws Exception If failed.
+ */
+ public void testNodeStartByOldVersionPersistenceData_2_1() throws Exception {
+ doTestStartupWithOldVersion("2.1.0");
+ }
+
+ /**
+ * Tests opportunity to read data from previous Ignite DB version.
+ *
+ * @param igniteVer 3-digits version of ignite
+ * @throws Exception If failed.
+ */
+ protected void doTestStartupWithOldVersion(String igniteVer) throws Exception {
+ try {
+ startGrid(1, igniteVer, new ConfigurationClosure(), new PostStartupClosure());
+
+ stopAllGrids();
+
+ IgniteEx ignite = startGrid(0);
+
+ assertEquals(1, ignite.context().discovery().topologyVersion());
+
+ ignite.active(true);
+
+ validateResultingCacheData(ignite, ignite.cache(TEST_CACHE_NAME));
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @param cache to be filled by different keys and values. Results may be validated in {@link
+ * #validateResultingCacheData(Ignite, IgniteCache)}.
+ */
+ public static void saveCacheData(Cache<Object, Object> cache) {
+ for (int i = 0; i < ENTRIES_CNT; i++)
+ cache.put(i, "data-" + i);
+
+ //Touch
+ for (int i = 0; i < ENTRIES_CNT; i++)
+ assertNotNull(cache.get(i));
+ }
+
+ /**
+ * Asserts cache contained all expected values as it was saved before.
+ *
+ * @param cache cache should be filled using {@link #saveCacheData(Cache)}.
+ */
+ public static void validateResultingCacheData(Ignite ignite,
+ IgniteCache<Object, Object> cache) throws IgniteInterruptedCheckedException {
+
+ final long expireTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(DURATION_SEC + 1);
+
+ final IgniteFuture<Collection<Boolean>> future = ignite.compute().broadcastAsync(new UpgradePendingTreeToPerPartitionTask());
+
+ GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return future.isDone() && expireTime < System.currentTimeMillis();
+ }
+ }, TimeUnit.SECONDS.toMillis(DURATION_SEC + 2));
+
+ for (Boolean res : future.get())
+ assertTrue(res);
+
+ for (int i = 0; i < ENTRIES_CNT; i++)
+ assertNull(cache.get(i));
+ }
+
+ /** */
+ public static class ConfigurationClosure implements IgniteInClosure<IgniteConfiguration> {
+ /** {@inheritDoc} */
+ @Override public void apply(IgniteConfiguration cfg) {
+ cfg.setLocalHost("127.0.0.1");
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+ disco.setIpFinder(GridCacheAbstractFullApiSelfTest.LOCAL_IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration().setWalMode(WALMode.LOG_ONLY));
+ }
+ }
+
+ /** */
+ public static class PostStartupClosure implements IgniteInClosure<Ignite> {
+ /** {@inheritDoc} */
+ @Override public void apply(Ignite ignite) {
+ ignite.active(true);
+
+ CacheConfiguration<Object, Object> cacheCfg = new CacheConfiguration<>();
+ cacheCfg.setName(TEST_CACHE_NAME);
+ cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ cacheCfg.setBackups(1);
+ cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ cacheCfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, DURATION_SEC)));
+ cacheCfg.setEagerTtl(true);
+ cacheCfg.setGroupName("myGroup");
+
+ IgniteCache<Object, Object> cache = ignite.createCache(cacheCfg);
+
+ saveCacheData(cache);
+
+ ignite.active(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
index b526137..f6dd736 100644
--- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.compatibility.testsuites;
import junit.framework.TestSuite;
+import org.apache.ignite.compatibility.PdsWithTtlCompatibilityTest;
import org.apache.ignite.compatibility.persistence.FoldersReuseCompatibilityTest;
import org.apache.ignite.compatibility.persistence.MigratingToWalV2SerializerWithCompactionTest;
import org.apache.ignite.compatibility.persistence.PersistenceBasicCompatibilityTest;
@@ -35,6 +36,8 @@ public class IgniteCompatibilityBasicTestSuite {
suite.addTestSuite(PersistenceBasicCompatibilityTest.class);
+ suite.addTestSuite(PdsWithTtlCompatibilityTest.class);
+
suite.addTestSuite(FoldersReuseCompatibilityTest.class);
suite.addTestSuite(MigratingToWalV2SerializerWithCompactionTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 008974c..727e809 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -888,6 +888,17 @@ public final class IgniteSystemProperties {
public static final String IGNITE_DISABLE_WAL_DURING_REBALANCING = "IGNITE_DISABLE_WAL_DURING_REBALANCING";
/**
+ * When set to {@code true}, Ignite will skip partitions sizes check on partition validation after rebalance has finished.
+ * Partitions sizes may differs on nodes when Expiry Policy is in use and it is ok due to lazy entry eviction mechanics.
+ *
+ * There is no need to disable partition size validation either in normal case or when expiry policy is configured for cache.
+ * But it should be disabled manually when policy is used on per entry basis to hint Ignite to skip this check.
+ *
+ * Default is {@code false}.
+ */
+ public static final String IGNITE_SKIP_PARTITION_SIZE_VALIDATION = "IGNITE_SKIP_PARTITION_SIZE_VALIDATION";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
index 39f6a03..324227b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
@@ -41,9 +41,11 @@ public class MetaPageUpdateLastAllocatedIndex extends PageDeltaRecord {
/** {@inheritDoc} */
@Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
- assert PageIO.getType(pageAddr) == PageIO.T_META || PageIO.getType(pageAddr) == PageIO.T_PART_META;
+ int type = PageIO.getType(pageAddr);
- PageMetaIO io = PageMetaIO.VERSIONS.forVersion(PageIO.getVersion(pageAddr));
+ assert type == PageIO.T_META || type == PageIO.T_PART_META;
+
+ PageMetaIO io = PageIO.getPageIO(type, PageIO.getVersion(pageAddr));
io.setLastAllocatedPageCount(pageAddr, lastAllocatedIdx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 5f750d5..d1bdbb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -39,9 +39,9 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsEvictor;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsEvictor;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
@@ -313,7 +313,7 @@ public class CacheGroupContext {
drEnabled = true;
this.caches = caches;
- }
+ }
/**
* @param cctx Cache context.
@@ -372,8 +372,8 @@ public class CacheGroupContext {
List<GridCacheContext> caches = this.caches;
assert !sharedGroup() && caches.size() == 1 :
- "stopping=" + ctx.kernalContext().isStopping() + ", groupName=" + ccfg.getGroupName() +
- ", caches=" + caches;
+ "stopping=" + ctx.kernalContext().isStopping() + ", groupName=" + ccfg.getGroupName() +
+ ", caches=" + caches;
return caches.get(0);
}
@@ -434,6 +434,7 @@ public class CacheGroupContext {
}
}
}
+
/**
* Adds partition unload event.
*
@@ -514,13 +515,6 @@ public class CacheGroupContext {
}
/**
- * @return {@code True} if fast eviction is allowed.
- */
- public boolean allowFastEviction() {
- return persistenceEnabled() && !queriesEnabled();
- }
-
- /**
* @return {@code True} in case replication is enabled.
*/
public boolean isDrEnabled() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 9f3686a..767c314 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -34,6 +34,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.eviction.EvictableEntry;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.pagemem.wal.StorageException;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
@@ -401,7 +402,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
checkObsolete();
if (isStartVersion() && ((flags & IS_UNSWAPPED_MASK) == 0)) {
- assert row == null || row.key() == key: "Unexpected row key";
+ assert row == null || row.key() == key : "Unexpected row key";
CacheDataRow read = row == null ? cctx.offheap().read(this) : row;
@@ -1411,7 +1412,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (readThrough && needVal && old == null &&
(cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
- old0 = readThrough(null, key, false, subjId, taskName);
+ old0 = readThrough(null, key, false, subjId, taskName);
old = cctx.toCacheObject(old0);
@@ -2462,7 +2463,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
ttlAndExpireTimeExtras(ttl, expireTime);
- storeValue(val, expireTime, ver, null);
+ cctx.shared().database().checkpointReadLock();
+
+ try {
+ storeValue(val, expireTime, ver, null);
+ }
+ finally {
+ cctx.shared().database().checkpointReadUnlock();
+ }
}
/**
@@ -3108,7 +3116,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridCacheMvcc mvcc = mvccExtras();
return mvcc != null && mvcc.isLocallyOwnedByIdOrThread(lockVer, threadId);
- } finally {
+ }
+ finally {
unlockEntry();
}
}
@@ -3347,6 +3356,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
obsolete = true;
}
}
+ catch (NodeStoppingException ignore) {
+ if (log.isDebugEnabled())
+ log.warning("Node is stopping while removing expired value.", ignore);
+ }
catch (IgniteCheckedException e) {
U.error(log, "Failed to clean up expired cache entry: " + this, e);
}
@@ -3406,7 +3419,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (log.isTraceEnabled())
log.trace("onExpired clear [key=" + key + ", entry=" + System.identityHashCode(this) + ']');
- removeValue();
+ cctx.shared().database().checkpointReadLock();
+
+ try {
+ removeValue();
+ }
+ finally {
+ cctx.shared().database().checkpointReadUnlock();
+ }
if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) {
cctx.events().addEvent(partition(),
@@ -3586,8 +3606,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param ver New entry version.
* @param oldRow Old row if available.
* @param predicate Optional predicate.
- * @throws IgniteCheckedException If update failed.
+ *
* @return {@code True} if storage was modified.
+ * @throws IgniteCheckedException If update failed.
*/
protected boolean storeValue(
@Nullable CacheObject val,
@@ -3599,7 +3620,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate);
- cctx.offheap().invoke(cctx, key, localPartition(), closure);
+ cctx.offheap().invoke(cctx, key, localPartition(), closure);
return closure.treeOp != IgniteTree.OperationType.NOOP;
}
@@ -4051,7 +4072,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/**
- * Increments public size of map.
+ * Increments public size of map.
*/
protected void incrementMapPublicSize() {
GridDhtLocalPartition locPart = localPartition();
@@ -4782,7 +4803,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
needUpdate = true;
}
- else if (updateExpireTime && expiryPlc != null && entry.val != null){
+ else if (updateExpireTime && expiryPlc != null && entry.val != null) {
long ttl = expiryPlc.forAccess();
if (ttl != CU.TTL_NOT_CHANGED) {
@@ -4929,7 +4950,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (entry.val == null) {
boolean new0 = entry.isStartVersion();
- assert entry.deletedUnlocked() || new0 || entry.isInternal(): "Invalid entry [entry=" + entry +
+ assert entry.deletedUnlocked() || new0 || entry.isInternal() : "Invalid entry [entry=" + entry +
", locNodeId=" + cctx.localNodeId() + ']';
if (!new0 && !entry.isInternal())
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index a12c033..fa25412 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -21,12 +21,13 @@ import java.util.Map;
import javax.cache.Cache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
import org.apache.ignite.internal.processors.cache.persistence.RowStore;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
import org.apache.ignite.internal.util.GridAtomicLong;
@@ -47,7 +48,7 @@ public interface IgniteCacheOffheapManager {
* @param grp Cache group.
* @throws IgniteCheckedException If failed.
*/
- public void start(GridCacheSharedContext ctx, CacheGroupContext grp) throws IgniteCheckedException;;
+ public void start(GridCacheSharedContext ctx, CacheGroupContext grp) throws IgniteCheckedException;
/**
* @param cctx Cache context.
@@ -142,6 +143,8 @@ public interface IgniteCacheOffheapManager {
/**
* @param cctx Cache context.
* @param c Closure.
+ * @param amount Limit of processed entries by single call, {@code -1} for no limit.
+ * @return {@code True} if unprocessed expired entries remains.
* @throws IgniteCheckedException If failed.
*/
public boolean expire(GridCacheContext cctx, IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount)
@@ -167,9 +170,9 @@ public interface IgniteCacheOffheapManager {
/**
* @param cctx Cache context.
- * @param key Key.
- * @param val Value.
- * @param ver Version.
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
* @param expireTime Expire time.
* @param oldRow Old row if available.
* @param part Partition.
@@ -537,5 +540,13 @@ public interface IgniteCacheOffheapManager {
* @param rowCacheCleaner Rows cache cleaner.
*/
public void setRowCacheCleaner(GridQueryRowCacheCleaner rowCacheCleaner);
+
+ /**
+ * Return PendingTree for data store.
+ *
+ * @return PendingTree instance.
+ * @throws IgniteCheckedException
+ */
+ PendingEntriesTree pendingTree();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 5c78eb5..bf0de02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -101,16 +101,16 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
protected final ConcurrentMap<Integer, CacheDataStore> partDataStores = new ConcurrentHashMap<>();
/** */
- protected PendingEntriesTree pendingEntries;
+ private PendingEntriesTree pendingEntries;
/** */
- private volatile boolean hasPendingEntries;
+ protected volatile boolean hasPendingEntries;
/** */
private final GridAtomicLong globalRmvId = new GridAtomicLong(U.currentTimeMillis() * 1000_000);
/** */
- private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+ protected final GridSpinBusyLock busyLock = new GridSpinBusyLock();
/** */
private int updateValSizeThreshold;
@@ -148,19 +148,29 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/** {@inheritDoc} */
public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException {
+ initPendingTree(cctx);
+ }
+
+ /**
+ * @param cctx Cache context.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected void initPendingTree(GridCacheContext cctx) throws IgniteCheckedException {
+ assert !cctx.group().persistenceEnabled();
+
if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) {
String name = "PendingEntries";
- long rootPage = allocateForTree();
+ long rootPage = allocateForTree();
- pendingEntries = new PendingEntriesTree(
- grp,
- name,
- grp.dataRegion().pageMemory(),
- rootPage,
- grp.reuseList(),
- true);
- }
+ pendingEntries = new PendingEntriesTree(
+ grp,
+ name,
+ grp.dataRegion().pageMemory(),
+ rootPage,
+ grp.reuseList(),
+ true);
+ }
}
/**
@@ -204,11 +214,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
try {
if (grp.sharedGroup()) {
assert cacheId != CU.UNDEFINED_CACHE_ID;
- assert ctx.database().checkpointLockIsHeldByThread();
for (CacheDataStore store : cacheDataStores())
store.clear(cacheId);
+ // Clear non-persistent pending tree if needed.
if (pendingEntries != null) {
PendingRow row = new PendingRow(cacheId);
@@ -241,6 +251,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
}
+ /**
+ * @param part Partition.
+ * @return Data store for given entry.
+ */
+ public CacheDataStore dataStore(int part) {
+ return grp.isLocal() ? locCacheDataStore : partDataStores.get(part);
+ }
+
/** {@inheritDoc} */
@Override public long cacheEntriesCount(int cacheId) {
long size = 0;
@@ -1011,51 +1029,56 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
) throws IgniteCheckedException {
assert !cctx.isNear() : cctx.name();
- if (hasPendingEntries && pendingEntries != null) {
- GridCacheVersion obsoleteVer = null;
+ if (!hasPendingEntries || pendingEntries == null)
+ return false;
- long now = U.currentTimeMillis();
+ GridCacheVersion obsoleteVer = null;
- GridCursor<PendingRow> cur;
+ long now = U.currentTimeMillis();
- if (grp.sharedGroup())
- cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0));
- else
- cur = pendingEntries.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
+ GridCursor<PendingRow> cur;
- if (!cur.next())
- return false;
+ if (grp.sharedGroup())
+ cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0));
+ else
+ cur = pendingEntries.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
- int cleared = 0;
+ if (!cur.next())
+ return false;
- cctx.shared().database().checkpointReadLock();
+ int cleared = 0;
- try {
- do {
- PendingRow row = cur.get();
+ if (!busyLock.enterBusy())
+ return false;
- if (amount != -1 && cleared > amount)
- return true;
+ try {
+ do {
+ if (amount != -1 && cleared > amount)
+ return true;
- if (row.key.partition() == -1)
- row.key.partition(cctx.affinity().partition(row.key));
+ PendingRow row = cur.get();
- assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
+ if (row.key.partition() == -1)
+ row.key.partition(cctx.affinity().partition(row.key));
- if (pendingEntries.removex(row)) {
- if (obsoleteVer == null)
- obsoleteVer = ctx.versions().next();
+ assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
- c.apply(cctx.cache().entryEx(row.key), obsoleteVer);
- }
+ if (pendingEntries.removex(row)) {
+ if (obsoleteVer == null)
+ obsoleteVer = ctx.versions().next();
+
+ GridCacheEntryEx entry = cctx.cache().entryEx(row.key);
- cleared++;
+ if (entry != null)
+ c.apply(entry, obsoleteVer);
}
- while (cur.next());
- }
- finally {
- cctx.shared().database().checkpointReadUnlock();
+
+ cleared++;
}
+ while (cur.next());
+ }
+ finally {
+ busyLock.leaveBusy();
}
return false;
@@ -1395,15 +1418,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (oldRow != null) {
assert oldRow.link() != 0 : oldRow;
- if (pendingEntries != null && oldRow.expireTime() != 0)
- pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
+ if (pendingTree() != null && oldRow.expireTime() != 0)
+ pendingTree().removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
if (newRow.link() != oldRow.link())
rowStore.removeRow(oldRow.link());
}
- if (pendingEntries != null && expireTime != 0) {
- pendingEntries.putx(new PendingRow(cacheId, expireTime, newRow.link()));
+ if (pendingTree() != null && expireTime != 0) {
+ pendingTree().putx(new PendingRow(cacheId, expireTime, newRow.link()));
hasPendingEntries = true;
}
@@ -1444,8 +1467,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
assert cacheId == CU.UNDEFINED_CACHE_ID || oldRow.cacheId() == cacheId :
"Incorrect cache ID [expected=" + cacheId + ", actual=" + oldRow.cacheId() + "].";
- if (pendingEntries != null && oldRow.expireTime() != 0)
- pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
+ if (pendingTree() != null && oldRow.expireTime() != 0)
+ pendingTree().removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
decrementSize(cctx.cacheId());
}
@@ -1543,7 +1566,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/** {@inheritDoc} */
@Override public void clear(int cacheId) throws IgniteCheckedException {
assert cacheId != CU.UNDEFINED_CACHE_ID;
- assert ctx.database().checkpointLockIsHeldByThread();
if (cacheSize(cacheId) == 0)
return;
@@ -1624,6 +1646,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
}
+ /** {@inheritDoc} */
+ @Override public PendingEntriesTree pendingTree() {
+ return pendingEntries;
+ }
+
/**
* @param cctx Cache context.
* @param key Key.
@@ -1676,5 +1703,4 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
return 0;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index be74eff..a199f6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -47,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -57,9 +56,9 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.util.deque.FastSizeDeque;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.apache.ignite.util.deque.FastSizeDeque;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
@@ -342,9 +341,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @return {@code True} if partition is empty.
*/
public boolean isEmpty() {
- if (grp.allowFastEviction())
- return internalSize() == 0;
-
return store.fullSize() == 0 && internalSize() == 0;
}
@@ -981,78 +977,76 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
long cleared = 0;
- if (!grp.allowFastEviction()) {
- CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
+ CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
- try {
- GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id);
+ try {
+ GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id);
- while (it0.hasNext()) {
- ctx.database().checkpointReadLock();
+ while (it0.hasNext()) {
+ ctx.database().checkpointReadLock();
- try {
- CacheDataRow row = it0.next();
-
- // Do not clear fresh rows in case of single partition clearing.
- if (row.version().compareTo(clearVer) >= 0 && (state() == MOVING && clear))
- continue;
-
- if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId()))
- hld = cacheMapHolder(ctx.cacheContext(row.cacheId()));
-
- assert hld != null;
-
- GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(
- hld,
- hld.cctx,
- grp.affinity().lastVersion(),
- row.key(),
- true,
- false);
-
- if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
- removeEntry(cached);
-
- if (rec && !hld.cctx.config().isEventsDisabled()) {
- hld.cctx.events().addEvent(cached.partition(),
- cached.key(),
- ctx.localNodeId(),
- (IgniteUuid)null,
- null,
- EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
- null,
- false,
- cached.rawGet(),
- cached.hasValue(),
- null,
- null,
- null,
- false);
- }
-
- cleared++;
+ try {
+ CacheDataRow row = it0.next();
+
+ // Do not clear fresh rows in case of single partition clearing.
+ if (row.version().compareTo(clearVer) >= 0 && (state() == MOVING && clear))
+ continue;
+
+ if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId()))
+ hld = cacheMapHolder(ctx.cacheContext(row.cacheId()));
+
+ assert hld != null;
+
+ GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(
+ hld,
+ hld.cctx,
+ grp.affinity().lastVersion(),
+ row.key(),
+ true,
+ false);
+
+ if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
+ removeEntry(cached);
+
+ if (rec && !hld.cctx.config().isEventsDisabled()) {
+ hld.cctx.events().addEvent(cached.partition(),
+ cached.key(),
+ ctx.localNodeId(),
+ (IgniteUuid)null,
+ null,
+ EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
+ null,
+ false,
+ cached.rawGet(),
+ cached.hasValue(),
+ null,
+ null,
+ null,
+ false);
}
- }
- catch (GridDhtInvalidPartitionException e) {
- assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']';
- break; // Partition is already concurrently cleared and evicted.
- }
- finally {
- ctx.database().checkpointReadUnlock();
+ cleared++;
}
}
- }
- catch (NodeStoppingException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to get iterator for evicted partition: " + id);
+ catch (GridDhtInvalidPartitionException e) {
+ assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']';
- throw e;
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to get iterator for evicted partition: " + id, e);
+ break; // Partition is already concurrently cleared and evicted.
+ }
+ finally {
+ ctx.database().checkpointReadUnlock();
+ }
}
}
+ catch (NodeStoppingException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to get iterator for evicted partition: " + id);
+
+ throw e;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to get iterator for evicted partition: " + id, e);
+ }
return cleared;
}
@@ -1406,37 +1400,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
/**
- * Recreate cache data store after successful clearing and allowed fast eviction.
- */
- private void recreateCacheDataStore() {
- assert grp.offheap() instanceof GridCacheOffheapManager;
-
- try {
- CacheDataStore store0 = store;
-
- store = ((GridCacheOffheapManager) grp.offheap()).recreateCacheDataStore(store0);
-
- // Inject row cache cleaner on store creation
- // Used in case the cache with enabled SqlOnheapCache is single cache at the cache group
- if (ctx.kernalContext().query().moduleEnabled()) {
- GridQueryRowCacheCleaner cleaner = ctx.kernalContext().query().getIndexing()
- .rowCacheCleaner(grp.groupId());
-
- if (store != null && cleaner != null)
- store.setRowCacheCleaner(cleaner);
- }
- } catch (IgniteCheckedException e) {
- finish(e);
- }
- }
-
- /**
* Successfully finishes the future.
*/
public void finish() {
- if (state() == MOVING && clear && grp.allowFastEviction())
- recreateCacheDataStore();
-
synchronized (this) {
onDone();
finished = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
index cc0542c..866c513 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
@@ -95,6 +95,7 @@ public class GridDhtPartitionsStateValidator {
// Validate cache sizes.
result = validatePartitionsSizes(top, messages, ignoringNodes);
+
if (!result.isEmpty())
throw new IgniteCheckedException("Partitions cache sizes are inconsistent for " + fold(topVer, result));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 39f4ed1..1b79b76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -136,6 +136,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** */
private static final IgniteProductVersion FORCE_AFF_REASSIGNMENT_SINCE = IgniteProductVersion.fromString("2.4.3");
+ /**
+ * This may be useful when per-entry (not per-cache based) partition policy is in use.
+ * See {@link IgniteSystemProperties#IGNITE_SKIP_PARTITION_SIZE_VALIDATION} for details.
+ * Default value is {@code false}.
+ */
+ private static final boolean SKIP_PARTITION_SIZE_VALIDATION = Boolean.getBoolean(IgniteSystemProperties.IGNITE_SKIP_PARTITION_SIZE_VALIDATION);
+
/** */
@GridToStringExclude
private final Object mux = new Object();
@@ -2755,13 +2762,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
grpCtx.topology() :
cctx.exchange().clientTopology(grpId, events().discoveryCache());
- // Do not validate read or write through caches or caches with disabled rebalance.
+ // Do not validate read or write through caches or caches with disabled rebalance
+ // or ExpiryPolicy is set or validation is disabled.
if (grpCtx == null
|| grpCtx.config().isReadThrough()
|| grpCtx.config().isWriteThrough()
|| grpCtx.config().getCacheStoreFactory() != null
|| grpCtx.config().getRebalanceDelay() == -1
- || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE)
+ || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE
+ || grpCtx.config().getExpiryPolicyFactory() == null
+ || SKIP_PARTITION_SIZE_VALIDATION)
continue;
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 5feaa25..d7cc623 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId;
@@ -49,6 +50,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheTtlManager;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -56,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator;
import org.apache.ignite.internal.processors.cache.persistence.freelist.CacheFreeListImpl;
+import org.apache.ignite.internal.processors.cache.persistence.migration.UpgradePendingTreeToPerPartitionTask;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.partstate.PagesAllocationRange;
@@ -64,6 +68,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionCountersIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIOV2;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl;
import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
@@ -71,10 +76,13 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointe
import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
+import org.apache.ignite.internal.processors.cache.tree.PendingRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -95,7 +103,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
private ReuseListImpl reuseList;
/** {@inheritDoc} */
+ @Override protected void initPendingTree(GridCacheContext cctx) throws IgniteCheckedException {
+ // No-op. Per-partition PendingTree should be used.
+ }
+
+ /** {@inheritDoc} */
@Override protected void initDataStructures() throws IgniteCheckedException {
+ assert ctx.database().checkpointLockIsHeldByThread();
+
Metas metas = getOrAllocateCacheMetas();
RootPage reuseListRoot = metas.reuseListRoot;
@@ -122,29 +137,12 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
((GridCacheDatabaseSharedManager)ctx.database()).addCheckpointListener(this);
}
- /** {@inheritDoc} */
- @Override public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException {
- if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) {
- ctx.database().checkpointReadLock();
-
- try {
- final String name = "PendingEntries";
-
- RootPage pendingRootPage = indexStorage.getOrAllocateForTree(name);
-
- pendingEntries = new PendingEntriesTree(
- grp,
- name,
- grp.dataRegion().pageMemory(),
- pendingRootPage.pageId().pageId(),
- reuseList,
- pendingRootPage.isAllocated()
- );
- }
- finally {
- ctx.database().checkpointReadUnlock();
- }
- }
+ /**
+ * Get internal IndexStorage.
+ * See {@link UpgradePendingTreeToPerPartitionTask} for details.
+ */
+ public IndexStorage getIndexStorage() {
+ return indexStorage;
}
/** {@inheritDoc} */
@@ -218,8 +216,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
int grpId = grp.groupId();
long partMetaId = pageMem.partitionMetaPageId(grpId, store.partId());
- long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
+ long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
try {
long partMetaPageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
@@ -274,7 +272,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
if (needSnapshot) {
pageCnt = this.ctx.pageStore().pages(grpId, store.partId());
- io.setCandidatePageCount(partMetaPageAddr, size == 0 ? 0: pageCnt);
+ io.setCandidatePageCount(partMetaPageAddr, size == 0 ? 0 : pageCnt);
if (saveMeta) {
saveMeta(ctx);
@@ -285,7 +283,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
if (state == OWNING) {
assert part != null;
- if(!addPartition(
+ if (!addPartition(
part,
ctx.partitionStatMap(),
partMetaPageAddr,
@@ -295,8 +293,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
this.ctx.pageStore().pages(grpId, store.partId()),
store.fullSize()
))
- U.warn(log,"Partition was concurrently evicted grpId=" + grpId +
- ", partitionId=" + part.id());
+ U.warn(log, "Partition was concurrently evicted grpId=" + grpId +
+ ", partitionId=" + part.id());
}
else if (state == MOVING || state == RENTING) {
if (ctx.partitionStatMap().forceSkipIndexPartition(grpId)) {
@@ -333,7 +331,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
}
else if (needSnapshot)
- tryAddEmptyPartitionToSnapshot(store, ctx);;
+ tryAddEmptyPartitionToSnapshot(store, ctx);
}
else if (needSnapshot)
tryAddEmptyPartitionToSnapshot(store, ctx);
@@ -350,8 +348,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
private void tryAddEmptyPartitionToSnapshot(CacheDataStore store, Context ctx) {
if (getPartition(store).state() == OWNING) {
ctx.partitionStatMap().put(
- new GroupPartitionId(grp.groupId(), store.partId()),
- new PagesAllocationRange(0, 0));
+ new GroupPartitionId(grp.groupId(), store.partId()),
+ new PagesAllocationRange(0, 0));
}
}
@@ -362,7 +360,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
*/
private GridDhtLocalPartition getPartition(CacheDataStore store) {
return grp.topology().localPartition(store.partId(),
- AffinityTopologyVersion.NONE, false, true);
+ AffinityTopologyVersion.NONE, false, true);
}
/**
@@ -385,7 +383,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
long nextId = cntrsPageId;
- while (true){
+ while (true) {
final long curId = nextId;
final long curPage = pageMem.acquirePage(grpId, curId);
@@ -542,19 +540,19 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
* @param currAllocatedPageCnt total number of pages allocated for partition <code>[partition, grpId]</code>
*/
private static boolean addPartition(
- GridDhtLocalPartition part,
- final PartitionAllocationMap map,
- final long metaPageAddr,
- final PageMetaIO io,
- final int grpId,
- final int partId,
- final int currAllocatedPageCnt,
- final long partSize
+ GridDhtLocalPartition part,
+ final PartitionAllocationMap map,
+ final long metaPageAddr,
+ final PageMetaIO io,
+ final int grpId,
+ final int partId,
+ final int currAllocatedPageCnt,
+ final long partSize
) {
if (part != null) {
boolean reserved = part.reserve();
- if(!reserved)
+ if (!reserved)
return false;
}
else
@@ -596,43 +594,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
}
- /**
- * Destroys given {@code store} and creates new with the same update counters as in given.
- *
- * @param store Store to destroy.
- * @return New cache data store.
- * @throws IgniteCheckedException If failed.
- */
- public CacheDataStore recreateCacheDataStore(CacheDataStore store) throws IgniteCheckedException {
- long updCounter = store.updateCounter();
- long initUpdCounter = store.initialUpdateCounter();
-
- int p = store.partId();
-
- PageMemoryEx pageMemory = (PageMemoryEx)grp.dataRegion().pageMemory();
-
- int tag = pageMemory.invalidate(grp.groupId(), p);
-
- ctx.pageStore().onPartitionDestroyed(grp.groupId(), p, tag);
-
- CacheDataStore store0;
-
- partStoreLock.lock(p);
-
- try {
- store0 = createCacheDataStore0(p);
- store0.updateCounter(updCounter);
- store0.updateInitialCounter(initUpdCounter);
-
- partDataStores.put(p, store0);
- }
- finally {
- partStoreLock.unlock(p);
- }
-
- return store0;
- }
-
/** {@inheritDoc} */
@Override public void onPartitionCounterUpdated(int part, long cntr) {
CacheDataStore store = partDataStores.get(part);
@@ -743,7 +704,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
return new Metas(
new RootPage(new FullPageId(metastoreRoot, grpId), allocated),
- new RootPage(new FullPageId(reuseListRoot, grpId), allocated));
+ new RootPage(new FullPageId(reuseListRoot, grpId), allocated),
+ null);
}
finally {
pageMem.writeUnlock(grpId, metaId, metaPage, null, allocated);
@@ -787,6 +749,47 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
return iterator;
}
+ /** {@inheritDoc} */
+ @Override public boolean expire(
+ GridCacheContext cctx,
+ IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
+ int amount
+ ) throws IgniteCheckedException {
+ assert !cctx.isNear() : cctx.name();
+
+ if (!hasPendingEntries)
+ return false;
+
+ if (!busyLock.enterBusy())
+ return false;
+
+ try {
+ int cleared = 0;
+
+ for (CacheDataStore store : cacheDataStores()) {
+ cleared += ((GridCacheDataStore)store).purgeExpired(cctx, c, amount - cleared);
+
+ if (amount != -1 && cleared >= amount)
+ return true;
+ }
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long expiredSize() throws IgniteCheckedException {
+ long size = 0;
+
+ for (CacheDataStore store : cacheDataStores())
+ size += ((GridCacheDataStore)store).expiredSize();
+
+ return size;
+ }
+
/**
* Calculates free space of all partition data stores - number of bytes available for use in allocated pages.
*
@@ -1098,13 +1101,18 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
@GridToStringInclude
private final RootPage treeRoot;
+ /** */
+ @GridToStringInclude
+ private final RootPage pendingTreeRoot;
+
/**
* @param treeRoot Metadata storage root.
* @param reuseListRoot Reuse list root.
*/
- Metas(RootPage treeRoot, RootPage reuseListRoot) {
+ Metas(RootPage treeRoot, RootPage reuseListRoot, RootPage pendingTreeRoot) {
this.treeRoot = treeRoot;
this.reuseListRoot = reuseListRoot;
+ this.pendingTreeRoot = pendingTreeRoot;
}
/** {@inheritDoc} */
@@ -1116,7 +1124,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/**
*
*/
- private class GridCacheDataStore implements CacheDataStore {
+ public class GridCacheDataStore implements CacheDataStore {
/** */
private final int partId;
@@ -1127,6 +1135,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
private volatile CacheFreeListImpl freeList;
/** */
+ private PendingEntriesTree pendingTree;
+
+ /** */
private volatile CacheDataStore delegate;
/** */
@@ -1164,11 +1175,10 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
return null;
}
- IgniteCacheDatabaseSharedManager dbMgr = ctx.database();
-
- dbMgr.checkpointReadLock();
-
if (init.compareAndSet(false, true)) {
+ IgniteCacheDatabaseSharedManager dbMgr = ctx.database();
+
+ dbMgr.checkpointReadLock();
try {
Metas metas = getOrAllocatePartitionMetas();
@@ -1183,6 +1193,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
ctx.wal(),
reuseRoot.pageId().pageId(),
reuseRoot.isAllocated()) {
+ /** {@inheritDoc} */
@Override protected long allocatePageNoReuse() throws IgniteCheckedException {
assert grp.shared().database().checkpointLockIsHeldByThread();
@@ -1201,6 +1212,24 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
rowStore,
treeRoot.pageId().pageId(),
treeRoot.isAllocated()) {
+ /** {@inheritDoc} */
+ @Override protected long allocatePageNoReuse() throws IgniteCheckedException {
+ assert grp.shared().database().checkpointLockIsHeldByThread();
+
+ return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA);
+ }
+ };
+
+ RootPage pendingTreeRoot = metas.pendingTreeRoot;
+
+ final PendingEntriesTree pendingTree0 = new PendingEntriesTree(
+ grp,
+ "PendingEntries-" + partId,
+ grp.dataRegion().pageMemory(),
+ pendingTreeRoot.pageId().pageId(),
+ reuseList,
+ pendingTreeRoot.isAllocated()) {
+ /** {@inheritDoc} */
@Override protected long allocatePageNoReuse() throws IgniteCheckedException {
assert grp.shared().database().checkpointLockIsHeldByThread();
@@ -1210,7 +1239,17 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
- delegate0 = new CacheDataStoreImpl(partId, name, rowStore, dataTree);
+ delegate0 = new CacheDataStoreImpl(partId, name, rowStore, dataTree) {
+ /** {@inheritDoc} */
+ @Override public PendingEntriesTree pendingTree() {
+ return pendingTree0;
+ }
+ };
+
+ pendingTree = pendingTree0;
+
+ if (!hasPendingEntries && pendingTree0.size() > 0)
+ hasPendingEntries = true;
int grpId = grp.groupId();
long partMetaId = pageMem.partitionMetaPageId(grpId, partId);
@@ -1258,8 +1297,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
}
else {
- dbMgr.checkpointReadUnlock();
-
U.await(latch);
delegate0 = delegate;
@@ -1280,13 +1317,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
int grpId = grp.groupId();
long partMetaId = pageMem.partitionMetaPageId(grpId, partId);
+
long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
try {
boolean allocated = false;
- long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
+ boolean pendingTreeAllocated = false;
+ long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
try {
- long treeRoot, reuseListRoot;
+ long treeRoot, reuseListRoot, pendingTreeRoot;
// Initialize new page.
if (PageIO.getType(pageAddr) != PageIO.T_PART_META) {
@@ -1296,22 +1335,18 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
treeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
reuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
+ pendingTreeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA;
assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA;
+ assert PageIdUtils.flag(pendingTreeRoot) == PageMemory.FLAG_DATA;
io.setTreeRoot(pageAddr, treeRoot);
io.setReuseListRoot(pageAddr, reuseListRoot);
+ io.setPendingTreeRoot(pageAddr, pendingTreeRoot);
if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null))
- wal.log(new MetaPageInitRecord(
- grpId,
- partMetaId,
- io.getType(),
- io.getVersion(),
- treeRoot,
- reuseListRoot
- ));
+ wal.log(new PageSnapshot(new FullPageId(partMetaId, grpId), pageAddr, pageMem.pageSize()));
allocated = true;
}
@@ -1321,6 +1356,33 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
treeRoot = io.getTreeRoot(pageAddr);
reuseListRoot = io.getReuseListRoot(pageAddr);
+ int pageVersion = PagePartitionMetaIO.getVersion(pageAddr);
+
+ if (pageVersion < 2) {
+ assert pageVersion == 1;
+
+ if (log.isDebugEnabled())
+ log.info("Upgrade partition meta page version: [part=" + partId +
+ ", grpId=" + grpId + ", oldVer=" + pageVersion +
+ ", newVer=" + io.getVersion()
+ );
+
+ io = PagePartitionMetaIO.VERSIONS.latest();
+
+ ((PagePartitionMetaIOV2)io).upgradePage(pageAddr);
+
+ pendingTreeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
+
+ io.setPendingTreeRoot(pageAddr, pendingTreeRoot);
+
+ if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null))
+ wal.log(new PageSnapshot(new FullPageId(partMetaId, grpId), pageAddr, pageMem.pageSize()));
+
+ pendingTreeAllocated = true;
+ }
+ else
+ pendingTreeRoot = io.getPendingTreeRoot(pageAddr);
+
if (PageIdUtils.flag(treeRoot) != PageMemory.FLAG_DATA)
throw new StorageException("Wrong tree root page id flag: treeRoot="
+ U.hexLong(treeRoot) + ", part=" + partId + ", grpId=" + grpId);
@@ -1328,14 +1390,19 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
if (PageIdUtils.flag(reuseListRoot) != PageMemory.FLAG_DATA)
throw new StorageException("Wrong reuse list root page id flag: reuseListRoot="
+ U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId);
+
+ if (PageIdUtils.flag(pendingTreeRoot) != PageMemory.FLAG_DATA)
+ throw new StorageException("Wrong pending tree root page id flag: reuseListRoot="
+ + U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId);
}
return new Metas(
new RootPage(new FullPageId(treeRoot, grpId), allocated),
- new RootPage(new FullPageId(reuseListRoot, grpId), allocated));
+ new RootPage(new FullPageId(reuseListRoot, grpId), allocated),
+ new RootPage(new FullPageId(pendingTreeRoot, grpId), allocated || pendingTreeAllocated));
}
finally {
- pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, allocated);
+ pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, allocated || pendingTreeAllocated);
}
}
finally {
@@ -1485,6 +1552,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
long expireTime,
@Nullable CacheDataRow oldRow
) throws IgniteCheckedException {
+ assert ctx.database().checkpointLockIsHeldByThread();
+
CacheDataStore delegate = init0(false);
delegate.update(cctx, key, val, ver, expireTime, oldRow);
@@ -1498,6 +1567,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
GridCacheVersion ver,
long expireTime,
@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+ assert ctx.database().checkpointLockIsHeldByThread();
+
CacheDataStore delegate = init0(false);
return delegate.createRow(cctx, key, val, ver, expireTime, oldRow);
@@ -1506,6 +1577,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/** {@inheritDoc} */
@Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c)
throws IgniteCheckedException {
+ assert ctx.database().checkpointLockIsHeldByThread();
+
CacheDataStore delegate = init0(false);
delegate.invoke(cctx, key, c);
@@ -1514,6 +1587,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/** {@inheritDoc} */
@Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId)
throws IgniteCheckedException {
+ assert ctx.database().checkpointLockIsHeldByThread();
+
CacheDataStore delegate = init0(false);
delegate.remove(cctx, key, partId);
@@ -1583,10 +1658,142 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/** {@inheritDoc} */
@Override public void clear(int cacheId) throws IgniteCheckedException {
- CacheDataStore delegate = init0(true);
+ CacheDataStore delegate0 = init0(true);
- if (delegate != null)
- delegate.clear(cacheId);
+ if (delegate0 == null)
+ return;
+
+ ctx.database().checkpointReadLock();
+ try {
+ // Clear persistent pendingTree
+ if (pendingTree != null) {
+ PendingRow row = new PendingRow(cacheId);
+
+ GridCursor<PendingRow> cursor = pendingTree.find(row, row, PendingEntriesTree.WITHOUT_KEY);
+
+ while (cursor.next()) {
+ PendingRow row0 = cursor.get();
+
+ assert row0.link != 0 : row;
+
+ boolean res = pendingTree.removex(row0);
+
+ assert res;
+ }
+ }
+
+ delegate0.clear(cacheId);
+ }
+ finally {
+ ctx.database().checkpointReadUnlock();
+ }
+ }
+
+ /**
+ * Gets the number of entries pending expire.
+ *
+ * @return Number of pending entries.
+ * @throws IgniteCheckedException If failed to get number of pending entries.
+ */
+ public long expiredSize() throws IgniteCheckedException {
+ CacheDataStore delegate0 = init0(true);
+
+ return delegate0 == null ? 0 : pendingTree.size();
+ }
+
+ /**
+ * Removes expired entries from data store.
+ *
+ * @param cctx Cache context.
+ * @param c Expiry closure that should be applied to expired entry. See {@link GridCacheTtlManager} for details.
+ * @param amount Limit of processed entries by single call, {@code -1} for no limit.
+ * @return {@code True} if unprocessed expired entries remains.
+ * @throws IgniteCheckedException If failed.
+ */
+ public int purgeExpired(GridCacheContext cctx,
+ IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
+ int amount) throws IgniteCheckedException {
+ CacheDataStore delegate0 = init0(true);
+
+ if (delegate0 == null || pendingTree == null)
+ return 0;
+
+ GridDhtLocalPartition part = cctx.topology().localPartition(partId, AffinityTopologyVersion.NONE, false, false);
+
+ // Skip non-owned partitions.
+ if (part == null || part.state() != OWNING || pendingTree.size() == 0)
+ return 0;
+
+ cctx.shared().database().checkpointReadLock();
+ try {
+ if (!part.reserve())
+ return 0;
+
+ try {
+ if (part.state() != OWNING)
+ return 0;
+
+ long now = U.currentTimeMillis();
+
+ GridCursor<PendingRow> cur;
+
+ if (grp.sharedGroup())
+ cur = pendingTree.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0));
+ else
+ cur = pendingTree.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
+
+ if (!cur.next())
+ return 0;
+
+ GridCacheVersion obsoleteVer = null;
+
+ int cleared = 0;
+
+ do {
+ PendingRow row = cur.get();
+
+ if (amount != -1 && cleared > amount)
+ return cleared;
+
+ assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
+
+ row.key.partition(partId);
+
+ if (pendingTree.removex(row)) {
+ if (obsoleteVer == null)
+ obsoleteVer = ctx.versions().next();
+
+ GridCacheEntryEx e1 = cctx.cache().entryEx(row.key);
+
+ if (e1 != null)
+ c.apply(e1, obsoleteVer);
+ }
+
+ cleared++;
+ }
+ while (cur.next());
+
+ return cleared;
+ }
+ finally {
+ part.release();
+ }
+ }
+ finally {
+ cctx.shared().database().checkpointReadUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public PendingEntriesTree pendingTree() {
+ try {
+ CacheDataStore delegate0 = init0(true);
+
+ return delegate0 == null ? null : pendingTree;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
}