You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/17 08:24:12 UTC
[08/24] ignite git commit: Optimized snapshot progress tracking
Optimized snapshot progress tracking
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/21964fb5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/21964fb5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/21964fb5
Branch: refs/heads/ignite-5578
Commit: 21964fb5f6fb6fee891283332202cbc9ed5ac3f3
Parents: 3787181
Author: Dmitry Pavlov <dp...@gmail.com>
Authored: Wed Jul 12 18:59:10 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Jul 12 18:59:10 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/pagemem/FullPageId.java | 6 +-
.../ignite/internal/pagemem/PageIdUtils.java | 14 +-
.../pagemem/store/IgnitePageStoreManager.java | 1 +
.../internal/pagemem/store/PageStore.java | 2 +
.../delta/MetaPageUpdateLastAllocatedIndex.java | 2 +-
.../cache/persistence/DbCheckpointListener.java | 7 +-
.../FullPageIdIterableComparator.java | 51 -------
.../GridCacheDatabaseSharedManager.java | 63 +++++---
.../persistence/GridCacheOffheapManager.java | 56 +++----
.../cache/persistence/file/FilePageStore.java | 2 +-
.../persistence/file/FilePageStoreManager.java | 1 +
.../cache/persistence/pagemem/PageMemoryEx.java | 8 +-
.../persistence/pagemem/PageMemoryImpl.java | 10 +-
.../persistence/partstate/GroupPartitionId.java | 145 +++++++++++++++++++
.../partstate/PagesAllocationRange.java | 68 +++++++++
.../partstate/PartitionAllocationMap.java | 113 +++++++++++++++
.../snapshot/IgniteCacheSnapshotManager.java | 17 ++-
.../cache/persistence/tree/io/PageMetaIO.java | 27 ++--
.../persistence/tree/io/TrackingPageIO.java | 8 +-
.../persistence/tree/util/PageHandler.java | 3 +-
20 files changed, 466 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java
index 00f52c1..9e24943 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java
@@ -21,7 +21,7 @@ import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
- * Compound object used to address a page in the global page space.
+ * Compound object used to address a page in the global page space.
* <h3>Page ID structure</h3>
* <p>
* Generally, a full page ID consists of a cache ID and page ID. A page ID consists of
@@ -49,13 +49,13 @@ import org.apache.ignite.internal.util.typedef.internal.U;
* Effective page ID is page ID with zeroed bits used for page ID rotation.
*/
public class FullPageId {
- /** */
+ /** Page ID. */
private final long pageId;
/** */
private final long effectivePageId;
- /** */
+ /** Cache group ID. */
private final int grpId;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdUtils.java
index 92f427a..6f4ba93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdUtils.java
@@ -49,10 +49,10 @@ public final class PageIdUtils {
/** */
public static final long TAG_MASK = ~(-1L << TAG_SIZE);
- /** */
+ /** Page Index is a monotonically growing number within each partition */
public static final long PART_ID_MASK = ~(-1L << PART_ID_SIZE);
- /** */
+ /** Flags mask. Flags consists from a number of reserved bits, and page type (data/index page) */
public static final long FLAG_MASK = ~(-1L << FLAG_SIZE);
/** */
@@ -92,10 +92,10 @@ public final class PageIdUtils {
}
/**
- * Extracts a page index from the given pageId.
+ * Extracts a page index from the given page ID.
*
- * @param pageId Page id.
- * @return Page ID.
+ * @param pageId Page ID.
+ * @return Page index.
*/
public static int pageIndex(long pageId) {
return (int)(pageId & PAGE_IDX_MASK); // 4 bytes
@@ -150,7 +150,9 @@ public final class PageIdUtils {
/**
* @param partId Partition ID.
- * @return Part ID constructed from the given cache ID and partition ID.
+ * @param flag Flags (a number of reserved bits, and page type (data/index page))
+ * @param pageIdx Page index, monotonically growing number within each partition
+ * @return Page ID constructed from the given pageIdx and partition ID, see {@link FullPageId}
*/
public static long pageId(int partId, byte flag, int pageIdx) {
long pageId = flag & FLAG_MASK;
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
index a1b766f..eaa85ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
@@ -77,6 +77,7 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh
*
* @param grpId Cache group ID of the evicted partition.
* @param partId Partition ID.
+ * @param tag Partition tag (growing 1-based partition file version).
* @throws IgniteCheckedException If failed to handle partition destroy callback.
*/
public void onPartitionDestroyed(int grpId, int partId, int tag) throws IgniteCheckedException;
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
index be83704..4698a6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
@@ -70,6 +70,8 @@ public interface PageStore {
*
* @param pageId Page ID.
* @param pageBuf Page buffer to write.
+ * @param tag Partition file version, 1-based incrementing counter. For outdated pages {@code tag} has lower value,
+ * and write does nothing
* @throws IgniteCheckedException If page writing failed (IO error occurred).
*/
public void write(long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException;
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
index 60aebde..11b2a67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
@@ -44,7 +44,7 @@ public class MetaPageUpdateLastAllocatedIndex extends PageDeltaRecord {
PageMetaIO io = PageMetaIO.VERSIONS.forVersion(PageIO.getVersion(pageAddr));
- io.setLastPageCount(pageAddr, lastAllocatedIdx);
+ io.setLastAllocatedPageCount(pageAddr, lastAllocatedIdx);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
index 0b28b6a..1c438b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
@@ -17,9 +17,8 @@
package org.apache.ignite.internal.processors.cache.persistence;
-import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
/**
*
@@ -35,9 +34,9 @@ public interface DbCheckpointListener {
public boolean nextSnapshot();
/**
- *
+ * @return Partition allocation statistic map
*/
- public Map<T2<Integer, Integer>, T2<Integer, Integer>> partitionStatMap();
+ public PartitionAllocationMap partitionStatMap();
/**
* @param cacheOrGrpName Cache or group name.
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/FullPageIdIterableComparator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/FullPageIdIterableComparator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/FullPageIdIterableComparator.java
deleted file mode 100644
index c056c52..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/FullPageIdIterableComparator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence;
-
-import java.io.Serializable;
-import java.util.Comparator;
-import org.apache.ignite.internal.util.typedef.T2;
-
-/**
- *
- */
-public class FullPageIdIterableComparator implements Comparator<T2<Integer, Integer>>, Serializable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- public static final FullPageIdIterableComparator INSTANCE = new FullPageIdIterableComparator();
-
- /** {@inheritDoc} */
- @Override public int compare(T2<Integer, Integer> o1, T2<Integer, Integer> o2) {
- if (o1.get1() < o2.get1())
- return -1;
-
- if (o1.get1() > o2.get1())
- return 1;
-
- if (o1.get2() < o2.get2())
- return -1;
-
- if (o1.get2() > o2.get2())
- return 1;
-
- return 0;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 5136731..9f2067a 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -40,7 +40,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -52,6 +51,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
@@ -109,6 +109,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
@@ -263,8 +264,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** */
private boolean stopping;
- /** Checkpoint runner thread pool. */
- private ExecutorService asyncRunner;
+ /** Checkpoint runner thread pool. If null tasks are to be run in single thread */
+ @Nullable private ExecutorService asyncRunner;
/** Buffer for the checkpoint threads. */
private ThreadLocal<ByteBuffer> threadBuf;
@@ -1916,6 +1917,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
asyncRunner == null ? 1 : chp.cpPages.collectionsSize());
tracker.onPagesWriteStart();
+ final AtomicInteger writtenPagesCtr = new AtomicInteger();
+ final int totalPagesToWriteCnt = chp.cpPages.size();
if (asyncRunner != null) {
for (int i = 0; i < chp.cpPages.collectionsSize(); i++) {
@@ -1923,7 +1926,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
tracker,
chp.cpPages.innerCollection(i),
updStores,
- doneWriteFut
+ doneWriteFut,
+ writtenPagesCtr,
+ totalPagesToWriteCnt
);
try {
@@ -1937,7 +1942,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
else {
// Single-threaded checkpoint.
- Runnable write = new WriteCheckpointPages(tracker, chp.cpPages, updStores, doneWriteFut);
+ Runnable write = new WriteCheckpointPages(tracker,
+ chp.cpPages,
+ updStores,
+ doneWriteFut,
+ writtenPagesCtr,
+ totalPagesToWriteCnt);
write.run();
}
@@ -2092,15 +2102,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
curCpProgress = curr;
}
- final NavigableMap<T2<Integer, Integer>, T2<Integer, Integer>> map =
- new TreeMap<>(FullPageIdIterableComparator.INSTANCE);
+ final PartitionAllocationMap map = new PartitionAllocationMap();
DbCheckpointListener.Context ctx0 = new DbCheckpointListener.Context() {
@Override public boolean nextSnapshot() {
return curr.nextSnapshot;
}
- @Override public Map<T2<Integer, Integer>, T2<Integer, Integer>> partitionStatMap() {
+ /** {@inheritDoc} */
+ @Override public PartitionAllocationMap partitionStatMap() {
return map;
}
@@ -2278,14 +2288,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
}
- /**
- *
- */
+ /** Pages write task */
private class WriteCheckpointPages implements Runnable {
/** */
private CheckpointMetricsTracker tracker;
- /** */
+ /** Collection of page IDs to write under this task. Overall pages to write may be greater than this collection*/
private Collection<FullPageId> writePageIds;
/** */
@@ -2294,19 +2302,34 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** */
private CountDownFuture doneFut;
+ /** Counter for all written pages. May be shared between several workers */
+ private AtomicInteger writtenPagesCntr;
+
+ /** Total pages to write, counter may be greater than {@link #writePageIds} size*/
+ private final int totalPagesToWrite;
+
/**
- * @param writePageIds Write page IDs.
+ * Creates task for write pages
+ * @param tracker
+ * @param writePageIds Collection of page IDs to write.
+ * @param updStores
+ * @param doneFut
+ * @param writtenPagesCntr all written pages counter, may be shared between several write tasks
+ * @param totalPagesToWrite total pages to be written under this checkpoint
*/
private WriteCheckpointPages(
- CheckpointMetricsTracker tracker,
- Collection<FullPageId> writePageIds,
- GridConcurrentHashSet<PageStore> updStores,
- CountDownFuture doneFut
- ) {
+ final CheckpointMetricsTracker tracker,
+ final Collection<FullPageId> writePageIds,
+ final GridConcurrentHashSet<PageStore> updStores,
+ final CountDownFuture doneFut,
+ @NotNull final AtomicInteger writtenPagesCntr,
+ final int totalPagesToWrite) {
this.tracker = tracker;
this.writePageIds = writePageIds;
this.updStores = updStores;
this.doneFut = doneFut;
+ this.writtenPagesCntr = writtenPagesCntr;
+ this.totalPagesToWrite = totalPagesToWrite;
}
/** {@inheritDoc} */
@@ -2354,7 +2377,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
tmpWriteBuf.rewind();
}
- snapshotMgr.onPageWrite(fullId, tmpWriteBuf);
+ int curWrittenPages = writtenPagesCntr.incrementAndGet();
+
+ snapshotMgr.onPageWrite(fullId, tmpWriteBuf, curWrittenPages, totalPagesToWrite);
tmpWriteBuf.rewind();
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index bd902fb..6e6b7df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -52,6 +52,9 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeListImpl;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PagesAllocationRange;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionCountersIO;
@@ -63,7 +66,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -207,9 +209,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
try {
- long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
+ long partMetaPageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
- if (pageAddr == 0L) {
+ if (partMetaPageAddr == 0L) {
U.warn(log, "Failed to acquire write lock for meta page [metaPage=" + partMetaPage +
", saveMeta=" + saveMeta + ", beforeDestroy=" + beforeDestroy + ", size=" + size +
", updCntr=" + updCntr + ", state=" + state + ']');
@@ -220,21 +222,21 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
boolean changed = false;
try {
- PagePartitionMetaIO io = PageIO.getPageIO(pageAddr);
+ PagePartitionMetaIO io = PageIO.getPageIO(partMetaPageAddr);
- changed |= io.setUpdateCounter(pageAddr, updCntr);
- changed |= io.setGlobalRemoveId(pageAddr, rmvId);
- changed |= io.setSize(pageAddr, size);
+ changed |= io.setUpdateCounter(partMetaPageAddr, updCntr);
+ changed |= io.setGlobalRemoveId(partMetaPageAddr, rmvId);
+ changed |= io.setSize(partMetaPageAddr, size);
if (state != null)
- changed |= io.setPartitionState(pageAddr, (byte)state.ordinal());
+ changed |= io.setPartitionState(partMetaPageAddr, (byte)state.ordinal());
else
assert grp.isLocal() : grp.cacheOrGroupName();
long cntrsPageId;
if (grp.sharedGroup()) {
- cntrsPageId = io.getCountersPageId(pageAddr);
+ cntrsPageId = io.getCountersPageId(partMetaPageAddr);
byte[] data = serializeCacheSizes(store.cacheSizes());
@@ -247,7 +249,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
if (init && items > 0) {
cntrsPageId = pageMem.allocatePage(grpId, store.partId(), PageIdAllocator.FLAG_DATA);
- io.setCountersPageId(pageAddr, cntrsPageId);
+ io.setCountersPageId(partMetaPageAddr, cntrsPageId);
changed = true;
}
@@ -301,7 +303,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
if (needSnapshot) {
pageCnt = this.ctx.pageStore().pages(grpId, store.partId());
- io.setCandidatePageCount(pageAddr, pageCnt);
+ io.setCandidatePageCount(partMetaPageAddr, pageCnt);
if (saveMeta) {
long metaPageId = pageMem.metaPageId(grpId);
@@ -345,13 +347,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
if (partMap.containsKey(store.partId()) &&
partMap.get(store.partId()) == GridDhtPartitionState.OWNING)
- addPartition(ctx.partitionStatMap(), pageAddr, io, grpId, store.partId(),
+ addPartition(ctx.partitionStatMap(), partMetaPageAddr, io, grpId, store.partId(),
this.ctx.pageStore().pages(grpId, store.partId()));
changed = true;
}
else
- pageCnt = io.getCandidatePageCount(pageAddr);
+ pageCnt = io.getCandidatePageCount(partMetaPageAddr);
if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null))
wal.log(new MetaPageUpdatePartitionDataRecord(
@@ -397,27 +399,29 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/**
* @param map Map to add values to.
- * @param pageAddr page address
+ * @param metaPageAddr Meta page address
* @param io Page Meta IO
* @param cacheId Cache ID.
- * @param partition Partition ID.
- * @param pages Number of pages to add.
+ * @param partId Partition ID. Or {@link PageIdAllocator#INDEX_PARTITION} for index partition
+ * @param currAllocatedPageCnt total number of pages allocated for partition <code>[partition, cacheId]</code>
*/
private static void addPartition(
- Map<T2<Integer, Integer>, T2<Integer, Integer>> map,
- long pageAddr,
- PageMetaIO io,
- int cacheId,
- int partition,
- int pages
+ final PartitionAllocationMap map,
+ final long metaPageAddr,
+ final PageMetaIO io,
+ final int cacheId,
+ final int partId,
+ final int currAllocatedPageCnt
) {
- if (pages <= 1)
+ if (currAllocatedPageCnt <= 1)
return;
- assert PageIO.getPageId(pageAddr) != 0;
+ assert PageIO.getPageId(metaPageAddr) != 0;
- int lastAllocatedIdx = io.getLastPageCount(pageAddr);
- map.put(new T2<>(cacheId, partition), new T2<>(lastAllocatedIdx, pages));
+ int lastAllocatedPageCnt = io.getLastAllocatedPageCount(metaPageAddr);
+ map.put(
+ new GroupPartitionId(cacheId, partId),
+ new PagesAllocationRange(lastAllocatedPageCnt, currAllocatedPageCnt));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index c827e96..a7ca13c 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -77,7 +77,7 @@ public class FilePageStore implements PageStore {
/** */
private volatile boolean recover;
- /** */
+ /** Partition file version, 1-based incrementing counter. For outdated pages tag has low value, and write does nothing */
private volatile int tag;
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index af20136..e2ad070 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -318,6 +318,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
* @param cacheId Cache ID to write.
* @param pageId Page ID.
* @param pageBuf Page buffer.
+ * @param tag Partition tag (growing 1-based partition file version). Used to validate page is not outdated
* @return PageStore to which the page has been written.
* @throws IgniteCheckedException If IO error occurred.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
index 7c63d41..53e21b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
@@ -115,19 +115,19 @@ public interface PageMemoryEx extends PageMemory {
*
* @param pageId Page ID to get byte buffer for. The page ID must be present in the collection returned by
* the {@link #beginCheckpoint()} method call.
- * @param tmpBuf Temporary buffer to write changes into.
+ * @param outBuf Temporary buffer to write changes into.
* @param tracker Checkpoint metrics tracker.
- * @return {@code True} if data were read, {@code false} otherwise (data already saved to storage).
+ * @return {@code Partition tag} if data was read, {@code null} otherwise (data already saved to storage).
* @throws IgniteException If failed to obtain page data.
*/
- @Nullable public Integer getForCheckpoint(FullPageId pageId, ByteBuffer tmpBuf, CheckpointMetricsTracker tracker);
+ @Nullable public Integer getForCheckpoint(FullPageId pageId, ByteBuffer outBuf, CheckpointMetricsTracker tracker);
/**
* Marks partition as invalid / outdated.
*
* @param cacheId Cache ID.
* @param partId Partition ID.
- * @return New partition tag.
+ * @return New partition tag (growing 1-based partition file version).
*/
public int invalidate(int cacheId, int partId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index 47381d7..1b4cf81 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -792,8 +792,8 @@ public class PageMemoryImpl implements PageMemoryEx {
}
/** {@inheritDoc} */
- @Override public Integer getForCheckpoint(FullPageId fullId, ByteBuffer tmpBuf, CheckpointMetricsTracker tracker) {
- assert tmpBuf.remaining() == pageSize();
+ @Override public Integer getForCheckpoint(FullPageId fullId, ByteBuffer outBuf, CheckpointMetricsTracker tracker) {
+ assert outBuf.remaining() == pageSize();
Segment seg = segment(fullId.groupId(), fullId.pageId());
@@ -876,7 +876,7 @@ public class PageMemoryImpl implements PageMemoryEx {
}
}
else
- return copyPageForCheckpoint(absPtr, fullId, tmpBuf, tmpBuffer, tracker) ? tag : null;
+ return copyPageForCheckpoint(absPtr, fullId, outBuf, tmpBuffer, tracker) ? tag : null;
}
/**
@@ -1565,7 +1565,7 @@ public class PageMemoryImpl implements PageMemoryEx {
/** */
private final int maxDirtyPages;
- /** */
+ /** Maps partition (cacheId, partId) to its tag. Tag is 1-based incrementing partition file counter */
private final Map<T2<Integer, Integer>, Integer> partTagMap = new HashMap<>();
/**
@@ -1903,7 +1903,7 @@ public class PageMemoryImpl implements PageMemoryEx {
/**
* @param grpId Cache group ID.
* @param partId Partition ID.
- * @return Partition tag.
+ * @return Partition tag. Growing 1 based partition file version
*/
private int partTag(int grpId, int partId) {
assert getReadHoldCount() > 0 || getWriteHoldCount() > 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java
new file mode 100644
index 0000000..dbdf670
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.partstate;
+
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Pair of cache group ID with partition ID. Immutable, comparable class, may be used as key in maps
+ */
+public class GroupPartitionId implements Comparable<GroupPartitionId> {
+ /** Index for super(meta) page. There is always such page for iterated cache partition */
+ private static final int METAPAGE_IDX = 0;
+
+ /** Cache group ID. */
+ private final int grpId;
+
+ /** Partition ID. */
+ private final int partId;
+
+ /**
+ * Creates group-partition tuple.
+ *
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ */
+ public GroupPartitionId(final int grpId, final int partId) {
+ this.grpId = grpId;
+ this.partId = partId;
+ }
+
+ /**
+ * @param partId Partition ID.
+ * @return flag to be used for partition
+ */
+ private static byte getFlagByPartId(final int partId) {
+ return partId == PageIdAllocator.INDEX_PARTITION ? PageMemory.FLAG_IDX : PageMemory.FLAG_DATA;
+ }
+
+ /**
+ * @return cache ID
+ */
+ public int getGroupId() {
+ return grpId;
+ }
+
+ /**
+ * @return Partition ID
+ */
+ public int getPartitionId() {
+ return partId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GroupPartitionId.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ GroupPartitionId key = (GroupPartitionId)o;
+
+ if (grpId != key.grpId)
+ return false;
+
+ return partId == key.partId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = grpId;
+
+ result = 31 * result + partId;
+
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(@NotNull GroupPartitionId o) {
+ if (getGroupId() < o.getGroupId())
+ return -1;
+
+ if (getGroupId() > o.getGroupId())
+ return 1;
+
+ if (getPartitionId() < o.getPartitionId())
+ return -1;
+
+ if (getPartitionId() > o.getPartitionId())
+ return 1;
+ return 0;
+ }
+
+ /**
+ * @param pageIdx Page Index, monotonically growing number within each partition
+ * @return page ID (64 bits) constructed from partition ID and given index
+ */
+ private long createPageId(final int pageIdx) {
+ final int partId = getPartitionId();
+
+ return PageIdUtils.pageId(partId, getFlagByPartId(partId), pageIdx);
+ }
+
+ /**
+ * Returns Full page ID. For index 0 will return super-page of next partition
+ *
+ * @param pageIdx Page Index, monotonically growing number within each partition
+ * @return FullPageId consists of cache ID (32 bits) and page ID (64 bits).
+ */
+ @NotNull private FullPageId createFullPageId(final int pageIdx) {
+ return new FullPageId(createPageId(pageIdx), getGroupId());
+ }
+
+ /**
+ * @return will return super-page (metapage) of this partition
+ */
+ @NotNull public FullPageId createFirstPageFullId() {
+ return createFullPageId(METAPAGE_IDX);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PagesAllocationRange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PagesAllocationRange.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PagesAllocationRange.java
new file mode 100644
index 0000000..e7170c3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PagesAllocationRange.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.partstate;
+
+/**
+ * Range of pages allocated.
+ * Contains currently allocated page count and previously observed page count.
+ * May be used for tracking history of recent allocation for partition <code>[partition, cacheId]</code>
+ */
+public class PagesAllocationRange {
+ /**
+ * Previously observed total number of allocated pages. May be stored using PageMetaIO.
+ * Used to separate newly allocated pages with previously observed state
+ * Minimum value is 0. Can't be greater than {@link #currAllocatedPageCnt}
+ */
+ private final int lastAllocatedPageCnt;
+
+ /** Total current number of pages allocated, minimum value is 0. */
+ private final int currAllocatedPageCnt;
+
+ /**
+ * Creates pages range
+ *
+ * @param lastAllocatedPageCnt Last allocated pages count.
+ * @param currAllocatedPageCnt Currently allocated pages count.
+ */
+ public PagesAllocationRange(final int lastAllocatedPageCnt, final int currAllocatedPageCnt) {
+ this.lastAllocatedPageCnt = lastAllocatedPageCnt;
+ this.currAllocatedPageCnt = currAllocatedPageCnt;
+ }
+
+ /**
+ * @return Total current number of pages allocated, minimum value is 0.
+ */
+ public int getCurrAllocatedPageCnt() {
+ return currAllocatedPageCnt;
+ }
+
+ /**
+ * @return Previously observed total number of allocated pages.
+ */
+ public int getLastAllocatedPageCnt() {
+ return lastAllocatedPageCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "PagesAllocationRange{" +
+ "lastAllocatedPageCnt=" + lastAllocatedPageCnt +
+ ", currAllocatedPageCnt=" + currAllocatedPageCnt +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionAllocationMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionAllocationMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionAllocationMap.java
new file mode 100644
index 0000000..9ed4000
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionAllocationMap.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.partstate;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Information structure with partitions state.
+ * Page counts map.
+ */
+public class PartitionAllocationMap {
+ /** Maps following pairs: (groupId, partId) -> (lastAllocatedCount, allocatedCount) */
+ private final NavigableMap<GroupPartitionId, PagesAllocationRange> map = new TreeMap<>();
+
+ /**
+ * Returns the value to which the specified key is mapped,
+ * or {@code null} if this map contains no mapping for the key.
+ *
+ * @param key to get
+ * @return value or null
+ */
+ @Nullable public PagesAllocationRange get(GroupPartitionId key) {
+ return map.get(key);
+ }
+
+ /**
+ * Extracts partition information from full page ID
+ *
+ * @param fullId page related to some cache
+ * @return pair of cache ID and partition ID
+ */
+ @NotNull public static GroupPartitionId createCachePartId(@NotNull final FullPageId fullId) {
+ return new GroupPartitionId(fullId.groupId(), PageIdUtils.partId(fullId.pageId()));
+ }
+
+ /** @return <tt>true</tt> if this map contains no key-value mappings */
+ public boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ /** @return the number of key-value mappings in this map. */
+ public int size() {
+ return map.size();
+ }
+
+ /** @return keys (all caches partitions) */
+ public Set<GroupPartitionId> keySet() {
+ return map.keySet();
+ }
+
+ /** @return values (allocation ranges) */
+ public Iterable<PagesAllocationRange> values() {
+ return map.values();
+ }
+
+ /** @return Returns the first (lowest) key currently in this map. */
+ public GroupPartitionId firstKey() {
+ return map.firstKey();
+ }
+
+ /**
+ * Returns next (higher) key for provided cache and partition or null
+ *
+ * @param key cache and partition to search
+ * @return first found key which is greater than provided
+ */
+ @Nullable public GroupPartitionId nextKey(@NotNull final GroupPartitionId key) {
+ return map.navigableKeySet().higher(key);
+ }
+
+ /** @return set view of the mappings contained in this map, sorted in ascending key order */
+ public Set<Map.Entry<GroupPartitionId, PagesAllocationRange>> entrySet() {
+ return map.entrySet();
+ }
+
+ /** @return <tt>true</tt> if this map contains a mapping for the specified key */
+ public boolean containsKey(GroupPartitionId key) {
+ return map.containsKey(key);
+ }
+
+ /**
+ * @param key key with which the specified value is to be associated
+ * @param val value to be associated with the specified key
+ * @return the previous value associated with <tt>key</tt>, or <tt>null</tt> if there was no mapping for
+ * <tt>key</tt>.
+ */
+ public PagesAllocationRange put(GroupPartitionId key, PagesAllocationRange val) {
+ return map.put(key, val);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
index 0a27bcd..50e6515 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.nio.ByteBuffer;
-import java.util.NavigableMap;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.events.DiscoveryEvent;
@@ -29,8 +28,8 @@ import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
-import org.apache.ignite.internal.util.typedef.T2;
import org.jetbrains.annotations.Nullable;
/**
@@ -64,12 +63,13 @@ public class IgniteCacheSnapshotManager<T extends SnapshotOperation> extends Gri
/**
* @param snapshotOperation current snapshot operation.
+ * @param map (cacheId, partId) -> (lastAllocatedIndex, count)
*
* @return {@code true} if next operation must be snapshot, {@code false} if checkpoint must be executed.
*/
public boolean onMarkCheckPointBegin(
T snapshotOperation,
- NavigableMap<T2<Integer, Integer>, T2<Integer, Integer>> map
+ PartitionAllocationMap map
) throws IgniteCheckedException {
return false;
}
@@ -107,9 +107,16 @@ public class IgniteCacheSnapshotManager<T extends SnapshotOperation> extends Gri
}
/**
- * @param fullId Full id.
+ * @param fullId Full page id.
+ * @param tmpWriteBuf buffer
+ * @param writtenPages Overall pages written, negative value means there is no progress tracked
+ * @param totalPages Overall pages count to be written, should be positive
*/
- public void onPageWrite(FullPageId fullId, ByteBuffer tmpWriteBuf) {
+ public void onPageWrite(
+ final FullPageId fullId,
+ final ByteBuffer tmpWriteBuf,
+ final int writtenPages,
+ final int totalPages) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageMetaIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageMetaIO.java
index ac482e8..becd3e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageMetaIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageMetaIO.java
@@ -43,10 +43,10 @@ public class PageMetaIO extends PageIO {
/** Last successful full snapshot tag offset. */
private static final int LAST_SUCCESSFUL_FULL_SNAPSHOT_TAG_OFF = NEXT_SNAPSHOT_TAG_OFF + 8;
- /** Last allocated index offset. */
+ /** Last allocated pages count offset. */
private static final int LAST_PAGE_COUNT_OFF = LAST_SUCCESSFUL_FULL_SNAPSHOT_TAG_OFF + 8;
- /** Candidate allocated index offset. */
+ /** Candidate allocated page count offset. */
private static final int CANDIDATE_PAGE_COUNT_OFF = LAST_PAGE_COUNT_OFF + 4;
/** End of page meta. */
@@ -82,7 +82,7 @@ public class PageMetaIO extends PageIO {
setLastSuccessfulSnapshotId(pageAddr, 0);
setNextSnapshotTag(pageAddr, 1);
setLastSuccessfulSnapshotTag(pageAddr, 0);
- setLastPageCount(pageAddr, 0);
+ setLastAllocatedPageCount(pageAddr, 0);
setCandidatePageCount(pageAddr, 0);
}
@@ -179,24 +179,31 @@ public class PageMetaIO extends PageIO {
}
/**
- * @param pageAddr Page address.
- * @param pageCnt Last allocated index.
+ * Sets last allocated pages count, used to save and observe previous allocated count
+ *
+ * @param pageAddr Meta Page address.
+ * @param pageCnt Last allocated pages count to set
*/
- public void setLastPageCount(long pageAddr, int pageCnt) {
+ public void setLastAllocatedPageCount(final long pageAddr, final int pageCnt) {
PageUtils.putInt(pageAddr, LAST_PAGE_COUNT_OFF, pageCnt);
}
/**
- * @param buf Buffer.
+ * Gets last allocated pages count from given buffer
+ *
+ * @param buf Buffer to read data from.
*/
- public int getLastPageCount(@NotNull ByteBuffer buf) {
+ public int getLastAllocatedPageCount(@NotNull final ByteBuffer buf) {
return buf.getInt(LAST_PAGE_COUNT_OFF);
}
/**
- * @param pageAddr Page address.
+ * Gets last allocated pages count by provided address
+ *
+ * @param pageAddr Meta page address.
+ * @return Last allocated page count
*/
- public int getLastPageCount(long pageAddr) {
+ public int getLastAllocatedPageCount(final long pageAddr) {
return PageUtils.getInt(pageAddr, LAST_PAGE_COUNT_OFF);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/TrackingPageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/TrackingPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/TrackingPageIO.java
index 2263130..2051778 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/TrackingPageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/TrackingPageIO.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.tree.io;
import java.nio.ByteBuffer;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import org.jetbrains.annotations.Nullable;
/**
* We use dedicated page for tracking pages updates.
@@ -182,6 +183,7 @@ public class TrackingPageIO extends PageIO {
* @param buf Buffer.
* @param pageId Page id.
* @param curSnapshotTag Snapshot tag.
+ * @param lastSuccessfulSnapshotTag Last successful snapshot id.
* @param pageSize Page size.
*/
public boolean wasChanged(ByteBuffer buf, long pageId, long curSnapshotTag, long lastSuccessfulSnapshotTag, int pageSize) {
@@ -265,10 +267,12 @@ public class TrackingPageIO extends PageIO {
* @param buf Buffer.
* @param start Start.
* @param curSnapshotTag Snapshot id.
+ * @param lastSuccessfulSnapshotTag Last successful snapshot id.
* @param pageSize Page size.
- * @return set pageId if it was changed or next closest one, if there is no changed page null will be returned
+ * @return set pageId if it was changed or next closest one, if there is no changed page {@code null} will be returned
*/
- public Long findNextChangedPage(ByteBuffer buf, long start, long curSnapshotTag, long lastSuccessfulSnapshotTag, int pageSize) {
+ @Nullable public Long findNextChangedPage(ByteBuffer buf, long start, long curSnapshotTag,
+ long lastSuccessfulSnapshotTag, int pageSize) {
validateSnapshotId(buf, curSnapshotTag + 1, lastSuccessfulSnapshotTag, pageSize);
int cntOfPage = countOfPageToTrack(pageSize);
http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java
index a87525a..3316980 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.tree.util;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.PageSupport;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
@@ -440,7 +441,7 @@ public abstract class PageHandler<X, R> {
* @return {@code true} If we need to make a delta WAL record for the change in this page.
*/
public static boolean isWalDeltaRecordNeeded(
- PageMemory pageMem,
+ PageSupport pageMem,
int cacheId,
long pageId,
long page,