You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/03/19 09:35:25 UTC
[ignite] branch master updated: IGNITE-7792 Generalize logic for
checkpointing machinery - Fixes #7513.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2261487 IGNITE-7792 Generalize logic for checkpointing machinery - Fixes #7513.
2261487 is described below
commit 2261487c8504fd5d00d1f624c43b95c45f7c05b4
Author: zstan <st...@gmail.com>
AuthorDate: Thu Mar 19 11:15:33 2020 +0300
IGNITE-7792 Generalize logic for checkpointing machinery - Fixes #7513.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../cache/persistence/CheckpointProgress.java | 15 +
.../GridCacheDatabaseSharedManager.java | 416 ++++++++++-----------
.../cache/persistence/metastorage/MetaStorage.java | 2 +-
.../cache/persistence/pagemem/PageMemoryImpl.java | 12 +-
.../util/GridConcurrentMultiPairQueue.java | 202 ++++++++++
.../internal/util/GridReadOnlyArrayView.java | 80 ----
.../persistence/IgnitePdsCorruptedStoreTest.java | 3 +-
.../util/GridConcurrentMultiPairQueueTest.java | 155 ++++++++
.../ignite/internal/util/IgniteUtilsSelfTest.java | 5 +-
.../ignite/testsuites/IgniteUtilSelfTestSuite.java | 2 +
10 files changed, 587 insertions(+), 305 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointProgress.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointProgress.java
index 52db794b..17a0b58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointProgress.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointProgress.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.NotNull;
/**
* Represents information of a progress of a given checkpoint and
@@ -29,4 +30,18 @@ public interface CheckpointProgress {
/** */
public GridFutureAdapter futureFor(CheckpointState state);
+
+ /**
+ * Mark this checkpoint execution as failed.
+ *
+ * @param error Causal error of fail.
+ */
+ public void fail(Throwable error);
+
+ /**
+ * Changing checkpoint state if order of state is correct.
+ *
+ * @param newState New checkpoint state.
+ */
+ public void transitTo(@NotNull CheckpointState newState);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index ea7af29..8bb2bcc 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -48,10 +48,10 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
@@ -68,6 +68,7 @@ import java.util.function.ToLongFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+
import org.apache.ignite.DataRegionMetricsProvider;
import org.apache.ignite.DataStorageMetrics;
import org.apache.ignite.IgniteCheckedException;
@@ -155,8 +156,8 @@ import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridCountDownCallback;
+import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue;
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
-import org.apache.ignite.internal.util.GridReadOnlyArrayView;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.TimeBag;
@@ -539,6 +540,36 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
+ * @param allowToReplace The sign which allows to replace pages from a checkpoint by page replacer.
+ *
+ * @return holder of FullPageIds obtained from each PageMemory, overall number of dirty
+ * pages, and flag defines at least one user page became a dirty since last checkpoint.
+ */
+ private CheckpointPagesInfoHolder beginAllCheckpoints(IgniteInternalFuture<?> allowToReplace) {
+ Collection<Map.Entry<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>>> res =
+ new ArrayList<>(dataRegions().size());
+
+ int pagesNum = 0;
+
+ boolean hasUserDirtyPages = false;
+
+ for (DataRegion reg : dataRegions()) {
+ if (!reg.config().isPersistenceEnabled())
+ continue;
+
+ GridMultiCollectionWrapper<FullPageId> nextCpPages = ((PageMemoryEx)reg.pageMemory()).beginCheckpoint(allowToReplace);
+
+ pagesNum += nextCpPages.size();
+
+ res.add(new T2<>((PageMemoryEx)reg.pageMemory(), nextCpPages));
+ }
+
+ currCheckpointPagesCnt = pagesNum;
+
+ return new CheckpointPagesInfoHolder(res, pagesNum, hasUserDirtyPages);
+ }
+
+ /**
* Create metastorage data region configuration with enabled persistence by default.
*
* @param storageCfg Data storage configuration.
@@ -2640,6 +2671,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @return PageMemoryEx instance.
* @throws IgniteCheckedException if no DataRegion is configured for a name obtained from cache descriptor.
*/
+ // TODO IGNITE-12722: Get rid of GridCacheDatabaseSharedManager#getPageMemoryForCacheGroup functionality.
private PageMemoryEx getPageMemoryForCacheGroup(int grpId) throws IgniteCheckedException {
if (grpId == MetaStorage.METASTORAGE_CACHE_ID)
return (PageMemoryEx)dataRegion(METASTORE_DATA_REGION_NAME).pageMemory();
@@ -3056,27 +3088,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
Collection<DataRegion> regions = dataRegions();
- Collection<GridMultiCollectionWrapper<FullPageId>> res = new ArrayList(regions.size());
-
- int pagesNum = 0;
-
- GridFinishedFuture finishedFuture = new GridFinishedFuture();
-
- // Collect collection of dirty pages from all regions.
- for (DataRegion memPlc : regions) {
- if (memPlc.config().isPersistenceEnabled()){
- GridMultiCollectionWrapper<FullPageId> nextCpPagesCol =
- ((PageMemoryEx)memPlc.pageMemory()).beginCheckpoint(finishedFuture);
-
- pagesNum += nextCpPagesCol.size();
-
- res.add(nextCpPagesCol);
- }
- }
+ CheckpointPagesInfoHolder cpPagesHolder = beginAllCheckpoints(new GridFinishedFuture<>());
// Sort and split all dirty pages set to several stripes.
- GridMultiCollectionWrapper<FullPageId> pages = splitAndSortCpPagesIfNeeded(
- new IgniteBiTuple<>(res, pagesNum), exec.stripesCount());
+ GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> pages =
+ splitAndSortCpPagesIfNeeded(cpPagesHolder);
// Identity stores set for future fsync.
Collection<PageStore> updStores = new GridConcurrentHashSet<>();
@@ -3086,13 +3102,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
// Shared refernce for tracking exception during write pages.
AtomicReference<IgniteCheckedException> writePagesError = new AtomicReference<>();
- for (int i = 0; i < pages.collectionsSize(); i++) {
- // Calculate stripe index.
- int stripeIdx = i % exec.stripesCount();
-
- // Inner collection index.
- int innerIdx = i;
-
+ for (int stripeIdx = 0; stripeIdx < exec.stripesCount(); stripeIdx++) {
exec.execute(stripeIdx, () -> {
PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> {
assert tag != PageMemoryImpl.TRY_AGAIN_TAG : "Lock is held by other thread for page " + fullPageId;
@@ -3112,34 +3122,34 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
writePageBuf.order(ByteOrder.nativeOrder());
- Collection<FullPageId> pages0 = pages.innerCollection(innerIdx);
+ GridConcurrentMultiPairQueue.Result<PageMemoryEx, FullPageId> res =
+ new GridConcurrentMultiPairQueue.Result<>();
- FullPageId fullPageId = null;
+ int pagesWritten = 0;
try {
- for (FullPageId fullId : pages0) {
+ while (pages.next(res)) {
// Fail-fast break if some exception occurred.
if (writePagesError.get() != null)
break;
- // Save pageId to local variable for future using if exception occurred.
- fullPageId = fullId;
-
- PageMemoryEx pageMem = getPageMemoryForCacheGroup(fullId.groupId());
+ PageMemoryEx pageMem = res.getKey();
// Write page content to page store via pageStoreWriter.
// Tracker is null, because no need to track checkpoint metrics on recovery.
- pageMem.checkpointWritePage(fullId, writePageBuf, pageStoreWriter, null);
- }
+ pageMem.checkpointWritePage(res.getValue(), writePageBuf, pageStoreWriter, null);
- // Add number of handled pages.
- cpPagesCnt.addAndGet(pages0.size());
+ // Add number of handled pages.
+ pagesWritten++;
+ }
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to write page to pageStore, pageId=" + fullPageId);
+ U.error(log, "Failed to write page to pageStore: " + res);
writePagesError.compareAndSet(null, e);
}
+
+ cpPagesCnt.addAndGet(pagesWritten);
});
}
@@ -3769,26 +3779,21 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
ConcurrentLinkedHashMap<PageStore, LongAdder> updStores = new ConcurrentLinkedHashMap<>();
CountDownFuture doneWriteFut = new CountDownFuture(
- asyncRunner == null ? 1 : chp.cpPages.collectionsSize());
+ asyncRunner == null ? 1 : persistenceCfg.getCheckpointThreads());
tracker.onPagesWriteStart();
- final int totalPagesToWriteCnt = chp.cpPages.size();
+ final int totalPagesToWriteCnt = chp.pagesSize;
if (asyncRunner != null) {
- for (int i = 0; i < chp.cpPages.collectionsSize(); i++) {
+ for (int i = 0; i < persistenceCfg.getCheckpointThreads(); i++) {
Runnable write = new WriteCheckpointPages(
tracker,
- chp.cpPages.innerCollection(i),
+ chp.cpPages,
updStores,
doneWriteFut,
totalPagesToWriteCnt,
- new Runnable() {
- @Override public void run() {
- updateHeartbeat();
- }
- },
- asyncRunner
+ () -> updateHeartbeat()
);
try {
@@ -3812,12 +3817,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
updStores,
doneWriteFut,
totalPagesToWriteCnt,
- new Runnable() {
- @Override public void run() {
- updateHeartbeat();
- }
- },
- null);
+ () -> updateHeartbeat());
write.run();
}
@@ -4118,9 +4118,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
IgniteFuture snapFut = null;
- IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple;
+ CheckpointPagesInfoHolder cpPagesHolder;
+
+ int dirtyPagesCount;
- boolean hasPages, hasPartitionsToDestroy;
+ boolean hasUserPages, hasPartitionsToDestroy;
DbCheckpointContextImpl ctx0 = new DbCheckpointContextImpl(curr, new PartitionAllocationMap());
@@ -4161,15 +4163,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
fillCacheGroupState(cpRec);
//There are allowable to replace pages only after checkpoint entry was stored to disk.
- cpPagesTuple = beginAllCheckpoints(curr.futureFor(MARKER_STORED_TO_DISK));
+ cpPagesHolder = beginAllCheckpoints(curr.futureFor(MARKER_STORED_TO_DISK));
+
+ dirtyPagesCount = cpPagesHolder.pagesNum();
- hasPages = hasPageForWrite(cpPagesTuple.get1());
+ hasUserPages = !cpPagesHolder.onlySystemPages();
hasPartitionsToDestroy = !curr.destroyQueue.pendingReqs.isEmpty();
WALPointer cpPtr = null;
- if (hasPages || curr.nextSnapshot || hasPartitionsToDestroy) {
+ if (dirtyPagesCount > 0 || curr.nextSnapshot || hasPartitionsToDestroy) {
// No page updates for this checkpoint are allowed from now on.
cpPtr = cctx.wal().log(cpRec);
@@ -4177,7 +4181,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
cpPtr = CheckpointStatus.NULL_PTR;
}
- if (hasPages || hasPartitionsToDestroy) {
+ if (dirtyPagesCount > 0 || hasPartitionsToDestroy) {
cp = prepareCheckpointEntry(
tmpWriteBuf,
cpTs,
@@ -4195,7 +4199,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
tracker.onLockRelease();
}
- DbCheckpointListener.Context ctx = createOnCheckpointBeginContext(ctx0, hasPages);
+ DbCheckpointListener.Context ctx = createOnCheckpointBeginContext(ctx0, hasUserPages);
curr.transitTo(LOCK_RELEASED);
@@ -4212,7 +4216,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
}
- if (hasPages || hasPartitionsToDestroy) {
+ if (dirtyPagesCount > 0 || hasPartitionsToDestroy) {
assert cp != null;
assert cp.checkpointMark() != null;
@@ -4229,31 +4233,32 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
tracker.onSplitAndSortCpPagesStart();
- GridMultiCollectionWrapper<FullPageId> cpPages = splitAndSortCpPagesIfNeeded(
- cpPagesTuple, persistenceCfg.getCheckpointThreads());
+ GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages =
+ splitAndSortCpPagesIfNeeded(cpPagesHolder);
tracker.onSplitAndSortCpPagesEnd();
if (printCheckpointStats && log.isInfoEnabled()) {
long possibleJvmPauseDur = possibleLongJvmPauseDuration(tracker);
- log.info(
- String.format(
- CHECKPOINT_STARTED_LOG_FORMAT,
- cpRec.checkpointId(),
- cp.checkpointMark(),
- tracker.beforeLockDuration(),
- tracker.lockWaitDuration(),
- tracker.listenersExecuteDuration(),
- tracker.lockHoldDuration(),
- tracker.walCpRecordFsyncDuration(),
- tracker.writeCheckpointEntryDuration(),
- tracker.splitAndSortCpPagesDuration(),
- possibleJvmPauseDur > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms," : "",
- cpPages.size(),
- curr.reason
- )
- );
+ if (log.isInfoEnabled())
+ log.info(
+ String.format(
+ CHECKPOINT_STARTED_LOG_FORMAT,
+ cpRec.checkpointId(),
+ cp.checkpointMark(),
+ tracker.beforeLockDuration(),
+ tracker.lockWaitDuration(),
+ tracker.listenersExecuteDuration(),
+ tracker.lockHoldDuration(),
+ tracker.walCpRecordFsyncDuration(),
+ tracker.writeCheckpointEntryDuration(),
+ tracker.splitAndSortCpPagesDuration(),
+ possibleJvmPauseDur > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms," : "",
+ dirtyPagesCount,
+ curr.reason
+ )
+ );
}
return new Checkpoint(cp, cpPages, curr);
@@ -4274,7 +4279,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
curr.reason));
}
- return new Checkpoint(null, new GridMultiCollectionWrapper<>(new Collection[0]), curr);
+ return new Checkpoint(null, GridConcurrentMultiPairQueue.EMPTY, curr);
}
}
@@ -4456,53 +4461,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
- * Check that at least one collection is not empty.
- *
- * @param cpPagesCollWrapper Collection of {@link GridMultiCollectionWrapper} checkpoint pages.
- */
- private boolean hasPageForWrite(Collection<GridMultiCollectionWrapper<FullPageId>> cpPagesCollWrapper) {
- boolean hasPages = false;
-
- for (Collection c : cpPagesCollWrapper)
- if (!c.isEmpty()) {
- hasPages = true;
-
- break;
- }
-
- return hasPages;
- }
-
- /**
- * @return tuple with collections of FullPageIds obtained from each PageMemory, overall number of dirty
- * pages, and flag defines at least one user page became a dirty since last checkpoint.
- * @param allowToReplace The sign which allows to replace pages from a checkpoint by page replacer.
- */
- private IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> beginAllCheckpoints(
- IgniteInternalFuture allowToReplace
- ) {
- Collection<GridMultiCollectionWrapper<FullPageId>> res = new ArrayList(dataRegions().size());
-
- int pagesNum = 0;
-
- for (DataRegion memPlc : dataRegions()) {
- if (!memPlc.config().isPersistenceEnabled())
- continue;
-
- GridMultiCollectionWrapper<FullPageId> nextCpPagesCol = ((PageMemoryEx)memPlc.pageMemory())
- .beginCheckpoint(allowToReplace);
-
- pagesNum += nextCpPagesCol.size();
-
- res.add(nextCpPagesCol);
- }
-
- currCheckpointPagesCnt = pagesNum;
-
- return new IgniteBiTuple<>(res, pagesNum);
- }
-
- /**
* @param chp Checkpoint snapshot.
*/
private void markCheckpointEnd(Checkpoint chp) throws IgniteCheckedException {
@@ -4650,68 +4608,60 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
- * Reorders list of checkpoint pages and splits them into needed number of sublists according to
+ * Reorders list of checkpoint pages and splits them into appropriate number of sublists according to
* {@link DataStorageConfiguration#getCheckpointThreads()} and
* {@link DataStorageConfiguration#getCheckpointWriteOrder()}.
*
- * @param cpPagesTuple Checkpoint pages tuple.
- * @param threads Checkpoint runner threads.
+ * @param cpPages Checkpoint pages with overall count and user pages info.
*/
- private GridMultiCollectionWrapper<FullPageId> splitAndSortCpPagesIfNeeded(
- IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple,
- int threads
+ private GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> splitAndSortCpPagesIfNeeded(
+ CheckpointPagesInfoHolder cpPages
) throws IgniteCheckedException {
- FullPageId[] pagesArr = new FullPageId[cpPagesTuple.get2()];
+ Set<T2<PageMemoryEx, FullPageId[]>> cpPagesPerRegion = new HashSet<>();
int realPagesArrSize = 0;
- for (GridMultiCollectionWrapper<FullPageId> colWrapper : cpPagesTuple.get1()) {
- for (int i = 0; i < colWrapper.collectionsSize(); i++)
- for (FullPageId page : colWrapper.innerCollection(i)) {
- if (realPagesArrSize == pagesArr.length)
- throw new AssertionError("Incorrect estimated dirty pages number: " + pagesArr.length);
-
- pagesArr[realPagesArrSize++] = page;
- }
- }
+ int totalPagesCnt = cpPages.pagesNum();
- FullPageId fakeMaxFullPageId = new FullPageId(Long.MAX_VALUE, Integer.MAX_VALUE);
+ for (Map.Entry<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>> regPages : cpPages.cpPages()) {
+ FullPageId[] pages = new FullPageId[regPages.getValue().size()];
- // Some pages may have been replaced, need to fill end of array with fake ones to prevent NPE during sort.
- for (int i = realPagesArrSize; i < pagesArr.length; i++)
- pagesArr[i] = fakeMaxFullPageId;
+ int pagePos = 0;
- if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) {
- Comparator<FullPageId> cmp = new Comparator<FullPageId>() {
- @Override public int compare(FullPageId o1, FullPageId o2) {
- int cmp = Long.compare(o1.groupId(), o2.groupId());
- if (cmp != 0)
- return cmp;
+ for (int i = 0; i < regPages.getValue().collectionsSize(); i++) {
+ for (FullPageId page : regPages.getValue().innerCollection(i)) {
+ if (realPagesArrSize++ == totalPagesCnt)
+ throw new AssertionError("Incorrect estimated dirty pages number: " + totalPagesCnt);
- return Long.compare(o1.effectivePageId(), o2.effectivePageId());
+ pages[pagePos++] = page;
}
- };
+ }
- if (pagesArr.length >= parallelSortThreshold)
- parallelSortInIsolatedPool(pagesArr, cmp);
+ // Some pages may have been already replaced.
+ if (pagePos != pages.length)
+ cpPagesPerRegion.add(new T2<>(regPages.getKey(), Arrays.copyOf(pages, pagePos)));
else
- Arrays.sort(pagesArr, cmp);
+ cpPagesPerRegion.add(new T2<>(regPages.getKey(), pages));
}
- int pagesSubLists = threads == 1 ? 1 : threads * 4;
- // Splitting pages to (threads * 4) subtasks. If any thread will be faster, it will help slower threads.
-
- Collection[] pagesSubListArr = new Collection[pagesSubLists];
+ if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) {
+ Comparator<FullPageId> cmp = Comparator.comparingInt(FullPageId::groupId)
+ .thenComparingLong(FullPageId::effectivePageId);
- for (int i = 0; i < pagesSubLists; i++) {
- int from = (int)((long)realPagesArrSize * i / pagesSubLists);
+ ForkJoinPool pool = null;
- int to = (int)((long)realPagesArrSize * (i + 1) / pagesSubLists);
+ for (T2<PageMemoryEx, FullPageId[]> pagesPerReg : cpPagesPerRegion) {
+ if (pagesPerReg.getValue().length >= parallelSortThreshold)
+ pool = parallelSortInIsolatedPool(pagesPerReg.get2(), cmp, pool);
+ else
+ Arrays.sort(pagesPerReg.get2(), cmp);
+ }
- pagesSubListArr[i] = new GridReadOnlyArrayView(pagesArr, from, to);
+ if (pool != null)
+ pool.shutdown();
}
- return new GridMultiCollectionWrapper<FullPageId>(pagesSubListArr);
+ return new GridConcurrentMultiPairQueue<>(cpPagesPerRegion);
}
/**
@@ -4719,10 +4669,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
*
* @param pagesArr Pages array.
* @param cmp Cmp.
+ *
+ * @return ForkJoinPool instance, check {@link ForkJoinTask#fork()} realization.
*/
- private static void parallelSortInIsolatedPool(
+ private static ForkJoinPool parallelSortInIsolatedPool(
FullPageId[] pagesArr,
- Comparator<FullPageId> cmp
+ Comparator<FullPageId> cmp,
+ ForkJoinPool pool
) throws IgniteCheckedException {
ForkJoinPool.ForkJoinWorkerThreadFactory factory = new ForkJoinPool.ForkJoinWorkerThreadFactory() {
@Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
@@ -4734,9 +4687,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
};
- ForkJoinPool forkJoinPool = new ForkJoinPool(PARALLEL_SORT_THREADS + 1, factory, null, false);
+ ForkJoinPool execPool = pool == null ?
+ new ForkJoinPool(PARALLEL_SORT_THREADS + 1, factory, null, false) : pool;
- ForkJoinTask sortTask = forkJoinPool.submit(() -> Arrays.parallelSort(pagesArr, cmp));
+ Future<?> sortTask = execPool.submit(() -> Arrays.parallelSort(pagesArr, cmp));
try {
sortTask.get();
@@ -4748,7 +4702,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
throw new IgniteCheckedException("Failed to perform pages array parallel sort", e.getCause());
}
- forkJoinPool.shutdown();
+ return execPool;
}
/** Pages write task */
@@ -4757,7 +4711,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private final CheckpointMetricsTracker tracker;
/** Collection of page IDs to write under this task. Overall pages to write may be greater than this collection */
- private final Collection<FullPageId> writePageIds;
+ private final GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePageIds;
/** */
private final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores;
@@ -4771,9 +4725,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** */
private final Runnable beforePageWrite;
- /** If any pages were skipped, new task with remaining pages will be submitted here. */
- private final ExecutorService retryWriteExecutor;
-
/**
* Creates task for write pages
*
@@ -4783,16 +4734,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @param doneFut
* @param totalPagesToWrite total pages to be written under this checkpoint
* @param beforePageWrite Action to be performed before every page write.
- * @param retryWriteExecutor Retry write executor.
*/
private WriteCheckpointPages(
final CheckpointMetricsTracker tracker,
- final Collection<FullPageId> writePageIds,
+ final GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePageIds,
final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores,
final CountDownFuture doneFut,
final int totalPagesToWrite,
- final Runnable beforePageWrite,
- final ExecutorService retryWriteExecutor
+ final Runnable beforePageWrite
) {
this.tracker = tracker;
this.writePageIds = writePageIds;
@@ -4800,22 +4749,21 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
this.doneFut = doneFut;
this.totalPagesToWrite = totalPagesToWrite;
this.beforePageWrite = beforePageWrite;
- this.retryWriteExecutor = retryWriteExecutor;
}
/** {@inheritDoc} */
@Override public void run() {
snapshotMgr.beforeCheckpointPageWritten();
- Collection<FullPageId> writePageIds = this.writePageIds;
+ GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePageIds = this.writePageIds;
try {
- List<FullPageId> pagesToRetry = writePages(writePageIds);
+ GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> pagesToRetry = writePages(writePageIds);
if (pagesToRetry.isEmpty())
doneFut.onDone();
else {
- LT.warn(log, pagesToRetry.size() + " checkpoint pages were not written yet due to unsuccessful " +
+ LT.warn(log, pagesToRetry.initialSize() + " checkpoint pages were not written yet due to unsuccessful " +
"page write lock acquisition and will be retried");
while (!pagesToRetry.isEmpty())
@@ -4833,8 +4781,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @param writePageIds Collections of pages to write.
* @return pagesToRetry Pages which should be retried.
*/
- private List<FullPageId> writePages(Collection<FullPageId> writePageIds) throws IgniteCheckedException {
- List<FullPageId> pagesToRetry = new ArrayList<>();
+ private GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePages(
+ GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePageIds
+ ) throws IgniteCheckedException {
+ Map<PageMemoryEx, List<FullPageId>> pagesToRetry = new HashMap<>();
CheckpointMetricsTracker tracker = persStoreMetrics.metricsEnabled() ? this.tracker : null;
@@ -4844,31 +4794,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
boolean throttlingEnabled = resolveThrottlingPolicy() != PageMemoryImpl.ThrottlingPolicy.DISABLED;
- for (FullPageId fullId : writePageIds) {
+ GridConcurrentMultiPairQueue.Result<PageMemoryEx, FullPageId> res =
+ new GridConcurrentMultiPairQueue.Result<>();
+
+ while (writePageIds.next(res)) {
if (checkpointer.shutdownNow)
break;
beforePageWrite.run();
- int grpId = fullId.groupId();
-
- PageMemoryEx pageMem;
-
- // TODO IGNITE-7792 add generic mapping.
- if (grpId == MetaStorage.METASTORAGE_CACHE_ID)
- pageMem = (PageMemoryEx)metaStorage.pageMemory();
- else if (grpId == TxLog.TX_LOG_CACHE_ID)
- pageMem = (PageMemoryEx)dataRegion(TxLog.TX_LOG_CACHE_NAME).pageMemory();
- else {
- CacheGroupContext grp = context().cache().cacheGroup(grpId);
+ FullPageId fullId = res.getValue();
- DataRegion region = grp != null ? grp.dataRegion() : null;
-
- if (region == null || !region.config().isPersistenceEnabled())
- continue;
-
- pageMem = (PageMemoryEx)region.pageMemory();
- }
+ PageMemoryEx pageMem = res.getKey();
snapshotMgr.beforePageWrite(fullId);
@@ -4892,7 +4829,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
}
- return pagesToRetry;
+ return pagesToRetry.isEmpty() ?
+ GridConcurrentMultiPairQueue.EMPTY :
+ new GridConcurrentMultiPairQueue<>(pagesToRetry);
}
/**
@@ -4901,12 +4840,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @param pagesToRetry List pages for retry.
* @return Checkpoint page write context.
*/
- private PageStoreWriter createPageStoreWriter(List<FullPageId> pagesToRetry) {
+ private PageStoreWriter createPageStoreWriter(Map<PageMemoryEx, List<FullPageId>> pagesToRetry) {
return new PageStoreWriter() {
/** {@inheritDoc} */
@Override public void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteCheckedException {
if (tag == PageMemoryImpl.TRY_AGAIN_TAG) {
- pagesToRetry.add(fullPageId);
+ PageMemoryEx pageMem = getPageMemoryForCacheGroup(fullPageId.groupId());
+
+ pagesToRetry.computeIfAbsent(pageMem, k -> new ArrayList<>()).add(fullPageId);
return;
}
@@ -4942,10 +4883,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
@Nullable private final CheckpointEntry cpEntry;
/** Checkpoint pages. */
- private final GridMultiCollectionWrapper<FullPageId> cpPages;
+ private final GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages;
/** */
- private final CheckpointProgressImpl progress;
+ private final CheckpointProgress progress;
/** Number of deleted WAL files. */
private int walFilesDeleted;
@@ -4963,14 +4904,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
*/
private Checkpoint(
@Nullable CheckpointEntry cpEntry,
- @NotNull GridMultiCollectionWrapper<FullPageId> cpPages,
- CheckpointProgressImpl progress
+ @NotNull GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages,
+ CheckpointProgress progress
) {
this.cpEntry = cpEntry;
this.cpPages = cpPages;
this.progress = progress;
- pagesSize = cpPages.size();
+ pagesSize = cpPages.initialSize();
}
/**
@@ -5118,7 +5059,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
*
* @param error Causal error of fail.
*/
- public void fail(Throwable error) {
+ @Override public void fail(Throwable error) {
failCause = error;
transitTo(FINISHED);
@@ -5129,7 +5070,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
*
* @param newState New checkpoint state.
*/
- public void transitTo(@NotNull CheckpointState newState) {
+ @Override public void transitTo(@NotNull CheckpointState newState) {
CheckpointState state = this.state.get();
if (state.ordinal() < newState.ordinal()) {
@@ -5895,4 +5836,41 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
super(msg);
}
}
+
+ /** Current checkpoint pages information. */
+ private static class CheckpointPagesInfoHolder {
+ /** If {@code true} there are user pages in checkpoint. */
+ private final boolean hasUserDirtyPages;
+
+ /** Total pages count in cp. */
+ private final int pagesNum;
+
+ /** Collection of pages per PageMemory distribution. */
+ private final Collection<Map.Entry<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>>> cpPages;
+
+ /** */
+ private CheckpointPagesInfoHolder(
+ Collection<Map.Entry<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>>> pages,
+ int num,
+ boolean hasUserPages) {
+ cpPages = pages;
+ pagesNum = num;
+ hasUserDirtyPages = hasUserPages;
+ }
+
+ /** If {@code true} there are user pages in checkpoint. */
+ private boolean onlySystemPages() {
+ return !hasUserDirtyPages;
+ }
+
+ /** Total pages count in cp. */
+ private int pagesNum() {
+ return pagesNum;
+ }
+
+ /** Collection of pages per PageMemory distribution. */
+ private Collection<Map.Entry<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>>> cpPages() {
+ return cpPages;
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index c9e9ec4..caccbbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -655,7 +655,7 @@ public class MetaStorage implements DbCheckpointListener, ReadWriteMetastorage {
* @throws IgniteCheckedException If failed.
*/
private void saveStoreMetadata() throws IgniteCheckedException {
- PageMemoryEx pageMem = (PageMemoryEx) pageMemory();
+ PageMemoryEx pageMem = (PageMemoryEx)pageMemory();
long partMetaId = pageMem.partitionMetaPageId(METASTORAGE_CACHE_ID, partId);
long partMetaPage = pageMem.acquirePage(METASTORAGE_CACHE_ID, partMetaId);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index d2a8562..32c235d 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -1133,8 +1133,7 @@ public class PageMemoryImpl implements PageMemoryEx {
seg.checkpointPages = new CheckpointPages(dirtyPages, allowToReplace);
- seg.dirtyPages = new GridConcurrentHashSet<>();
- seg.dirtyPagesCntr.set(0);
+ seg.resetDirtyPages();
}
safeToUpdate.set(true);
@@ -2101,6 +2100,15 @@ public class PageMemoryImpl implements PageMemoryEx {
}
/**
+ * Clear dirty pages collection and reset counter.
+ */
+ private void resetDirtyPages() {
+ dirtyPages = new GridConcurrentHashSet<>();
+
+ dirtyPagesCntr.set(0);
+ }
+
+ /**
* Prepares a page removal for page replacement, if needed.
*
* @param fullPageId Candidate page full ID.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueue.java
new file mode 100644
index 0000000..7ea9e79
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueue.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Concurrent queue that wraps collection of {@code Pair<K, V[]>}
+ * The only garantee {@link #next} provided is sequentially emptify values per key array.
+ * i.e. input like: <br>
+ * p1 = new Pair<1, [1, 3, 5, 7]> <br>
+ * p2 = new Pair<2, [2, 3]> <br>
+ * p3 = new Pair<3, [200, 100]> <br>
+ * and further sequence of {@code poll} or {@code forEach} calls may produce output like: <br>
+ * [3, 200], [3, 100], [1, 1], [1, 3], [1, 5], [1, 7], [2, 2], [2, 3]
+ *
+ * @param <K> The type of key in input pair collection.
+ * @param <V> The type of value array.
+ */
+public class GridConcurrentMultiPairQueue<K, V> {
+ /** */
+ public static final GridConcurrentMultiPairQueue EMPTY =
+ new GridConcurrentMultiPairQueue<>(Collections.emptyMap());
+
+ /** Inner holder. */
+ private final V[][] vals;
+
+ /** Storage for every array length. */
+ private final int[] lenSeq;
+
+ /** Current absolute position. */
+ private final AtomicInteger pos = new AtomicInteger();
+
+ /** Precalculated max position. */
+ private final int maxPos;
+
+ /** Keys array. */
+ private final K[] keysArr;
+
+ /** */
+ public GridConcurrentMultiPairQueue(Map<K, ? extends Collection<V>> items) {
+ int pairCnt = (int)items.entrySet().stream().map(Map.Entry::getValue).filter(k -> k.size() > 0).count();
+
+ vals = (V[][])new Object[pairCnt][];
+
+ keysArr = (K[])new Object[pairCnt];
+
+ lenSeq = new int[pairCnt];
+
+ int keyPos = 0;
+
+ int size = -1;
+
+ for (Map.Entry<K, ? extends Collection<V>> p : items.entrySet()) {
+ if (p.getValue().isEmpty())
+ continue;
+
+ keysArr[keyPos] = p.getKey();
+
+ lenSeq[keyPos] = size += p.getValue().size();
+
+ vals[keyPos++] = (V[])p.getValue().toArray();
+ }
+
+ maxPos = size + 1;
+ }
+
+ /** */
+ public GridConcurrentMultiPairQueue(Collection<T2<K, V[]>> items) {
+ int pairCnt = (int)items.stream().map(Map.Entry::getValue).filter(k -> k.length > 0).count();
+
+ vals = (V[][])new Object[pairCnt][];
+
+ keysArr = (K[])new Object[pairCnt];
+
+ lenSeq = new int[pairCnt];
+
+ int keyPos = 0;
+
+ int size = -1;
+
+ for (Map.Entry<K, V[]> p : items) {
+ if (p.getValue().length == 0)
+ continue;
+
+ keysArr[keyPos] = p.getKey();
+
+ lenSeq[keyPos] = size += p.getValue().length;
+
+ vals[keyPos++] = p.getValue();
+ }
+
+ maxPos = size + 1;
+ }
+
+ /**
+ * Retrieves and removes the head of this queue,
+ * or returns {@code false} if this queue is empty.
+ *
+ * @return {@code true} if {@link #next} return non empty result, or {@code false} if this queue is empty
+ */
+ public boolean next(Result<K, V> res) {
+ int absPos = pos.getAndIncrement();
+
+ if (absPos >= maxPos) {
+ res.set(null, null, 0);
+
+ return false;
+ }
+
+ int segment = res.getSegment();
+
+ if (absPos > lenSeq[segment]) {
+ segment = Arrays.binarySearch(lenSeq, segment, lenSeq.length - 1, absPos);
+
+ segment = segment < 0 ? -segment - 1 : segment;
+ }
+
+ int relPos = segment == 0 ? absPos : (absPos - lenSeq[segment - 1] - 1);
+
+ K key = keysArr[segment];
+
+ res.set(key, vals[segment][relPos], segment);
+
+ return true;
+ }
+
+ /**
+ * @return {@code true} if empty.
+ */
+ public boolean isEmpty() {
+ return pos.get() >= maxPos;
+ }
+
+ /**
+ * @return Constant initialisation size.
+ */
+ public int initialSize() {
+ return maxPos;
+ }
+
+ /** State holder. */
+ public static class Result<K, V> {
+ /** Current segment. */
+ private int segment;
+
+ /** Key holder. */
+ private K key;
+
+ /** Value holeder. */
+ private V val;
+
+ /** Current state setter. */
+ public void set(K k, V v, int seg) {
+ key = k;
+ val = v;
+ segment = seg;
+ }
+
+ /** Current segment. */
+ private int getSegment() {
+ return segment;
+ }
+
+ /** Current key. */
+ public K getKey() {
+ return key;
+ }
+
+ /** Current value. */
+ public V getValue() {
+ return val;
+ }
+
+ /** */
+ @Override public String toString() {
+ return S.toString(Result.class, this);
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridReadOnlyArrayView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridReadOnlyArrayView.java
deleted file mode 100644
index 14ae0d5..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridReadOnlyArrayView.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.ignite.internal.util;
-
-import java.util.AbstractCollection;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import org.apache.ignite.internal.util.typedef.internal.A;
-
-/**
- * Provides read-only collection interface to objects subarray.
- */
-public class GridReadOnlyArrayView<T> extends AbstractCollection<T> {
- /** Array. */
- private final T[] arr;
-
- /** Array index view starts from (inclusive). */
- private final int from;
-
- /** Array index view ends with (exclusive). */
- private final int to;
-
- /**
- * @param arr Array.
- * @param from Array index view starts from (inclusive).
- * @param to Array index view ends with (exclusive).
- */
- public GridReadOnlyArrayView(T[] arr, int from, int to) {
- A.ensure(from <= to, "Lower bound is greater than upper bound [from=" + from + ", to=" + to + ']');
-
- this.arr = arr;
- this.from = from;
- this.to = to;
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return to - from;
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<T> iterator() {
- return new Itr();
- }
-
- /**
- * Iterator.
- */
- private class Itr implements Iterator<T> {
- /** Cursor index. */
- int cursor = from;
-
- /** {@inheritDoc} */
- @Override public boolean hasNext() {
- return cursor < to;
- }
-
- /** {@inheritDoc} */
- @Override public T next() {
- if (cursor >= to)
- throw new NoSuchElementException();
-
- return arr[cursor++];
- }
- }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
index 318cb61..691c35f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
@@ -245,7 +245,8 @@ public class IgnitePdsCorruptedStoreTest extends GridCommonAbstractTest {
MetaStorage metaStorage = ignite.context().cache().context().database().metaStorage();
- corruptTreeRoot(ignite, (PageMemoryEx)metaStorage.pageMemory(), METASTORAGE_CACHE_ID, PageIdAllocator.METASTORE_PARTITION);
+ corruptTreeRoot(ignite, (PageMemoryEx)metaStorage.pageMemory(), METASTORAGE_CACHE_ID,
+ PageIdAllocator.METASTORE_PARTITION);
stopGrid(0);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueueTest.java
new file mode 100644
index 0000000..743ed0b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueueTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * GridConcurrentMultiPairQueue test.
+ **/
+public class GridConcurrentMultiPairQueueTest extends GridCommonAbstractTest {
+ /** */
+ GridConcurrentMultiPairQueue<Integer, Integer> queue;
+
+ /** */
+ GridConcurrentMultiPairQueue<Integer, Integer> queue2;
+
+ /** */
+ Map<Integer, Collection<Integer>> mapForCheck;
+
+ /** */
+ Map<Integer, Collection<Integer>>mapForCheck2;
+
+ /** */
+ Integer[] arr2 = {2, 4};
+
+ /** */
+ Integer[] arr1 = {1, 3, 5, 7, 9, 11, 13, 15, 17, 19};
+
+ /** */
+ Integer[] arr4 = {};
+
+ /** */
+ Integer[] arr5 = {};
+
+ /** */
+ Integer[] arr3 = {100, 200, 300, 400, 500, 600, 600, 700};
+
+ /** */
+ Integer[] arr6 = {};
+
+ /** */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ Collection<T2<Integer, Integer[]>> keyWithArr = new HashSet<>();
+
+ mapForCheck = new ConcurrentHashMap<>();
+
+ mapForCheck2 = new ConcurrentHashMap<>();
+
+ keyWithArr.add(new T2<>(10, arr2));
+ keyWithArr.add(new T2<>(20, arr1));
+ keyWithArr.add(new T2<>(30, arr4));
+ keyWithArr.add(new T2<>(40, arr5));
+ keyWithArr.add(new T2<>(50, arr3));
+ keyWithArr.add(new T2<>(60, arr6));
+
+ mapForCheck.put(10, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr2))));
+ mapForCheck.put(20, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr1))));
+ mapForCheck.put(50, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr3))));
+
+ mapForCheck2.put(10, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr2))));
+ mapForCheck2.put(20, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr1))));
+ mapForCheck2.put(50, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr3))));
+
+ queue = new GridConcurrentMultiPairQueue<>(keyWithArr);
+
+ Map<Integer, Collection<Integer>> keyWithColl = new HashMap<>();
+
+ keyWithColl.put(10, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr2))));
+ keyWithColl.put(20, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr1))));
+ keyWithColl.put(30, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr4))));
+ keyWithColl.put(40, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr5))));
+ keyWithColl.put(50, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr3))));
+ keyWithColl.put(60, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr6))));
+
+ queue2 = new GridConcurrentMultiPairQueue<>(keyWithColl);
+ }
+
+ /** */
+ @Test
+ public void testGridConcurrentMultiPairQueueCorrectness() throws Exception {
+ GridTestUtils.runMultiThreaded(() -> {
+ GridConcurrentMultiPairQueue.Result<Integer, Integer> res =
+ new GridConcurrentMultiPairQueue.Result<>();
+
+ while (queue.next(res)) {
+ assertTrue(mapForCheck.containsKey(res.getKey()));
+
+ assertTrue(mapForCheck.get(res.getKey()).remove(res.getValue()));
+
+ Collection<Integer> coll = mapForCheck.get(res.getKey());
+
+ if (coll != null && coll.isEmpty())
+ mapForCheck.remove(res.getKey(), coll);
+ }
+ }, ThreadLocalRandom.current().nextInt(1, 20), "GridConcurrentMultiPairQueue arr test");
+
+ assertTrue(mapForCheck.isEmpty());
+
+ assertTrue(queue.isEmpty());
+
+ assertTrue(queue.initialSize() == arr1.length + arr2.length + arr3.length + arr4.length);
+
+ GridTestUtils.runMultiThreaded(() -> {
+ GridConcurrentMultiPairQueue.Result<Integer, Integer> res =
+ new GridConcurrentMultiPairQueue.Result<>();
+
+ while (queue2.next(res)) {
+ assertTrue(mapForCheck2.containsKey(res.getKey()));
+
+ assertTrue(mapForCheck2.get(res.getKey()).remove(res.getValue()));
+
+ Collection<Integer> coll = mapForCheck2.get(res.getKey());
+
+ if (coll != null && coll.isEmpty())
+ mapForCheck2.remove(res.getKey(), coll);
+ }
+ }, ThreadLocalRandom.current().nextInt(1, 20), "GridConcurrentMultiPairQueue coll test");
+
+ assertTrue(mapForCheck2.isEmpty());
+
+ assertTrue(queue2.isEmpty());
+
+ assertTrue(queue2.initialSize() == arr1.length + arr2.length + arr3.length + arr4.length);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 7168e0b..772d234 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -373,7 +374,7 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
ref = this;
- arr = new SelfReferencedJob[] {this, this};
+ arr = new SelfReferencedJob[]{this, this};
col = asList(this, this, this);
@@ -1197,7 +1198,7 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
if (Integer.valueOf(1).equals(i))
throw new IgniteCheckedException(expectedException);
- return null;
+ return null;
}
);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
index ba8f3fe..c6d3705 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtilsSelfTest;
import org.apache.ignite.internal.util.GridArraysSelfTest;
import org.apache.ignite.internal.util.GridCountDownCallbackTest;
import org.apache.ignite.internal.util.IgniteDevOnlyLogTest;
+import org.apache.ignite.internal.util.GridConcurrentMultiPairQueueTest;
import org.apache.ignite.internal.util.IgniteExceptionRegistrySelfTest;
import org.apache.ignite.internal.util.IgniteUtilsSelfTest;
import org.apache.ignite.internal.util.nio.GridNioDelimitedBufferSelfTest;
@@ -100,6 +101,7 @@ import org.junit.runners.Suite;
GridTopologyHeapSizeSelfTest.class,
GridTransientTest.class,
IgniteDevOnlyLogTest.class,
+ GridConcurrentMultiPairQueueTest.class,
// Sensitive toString.
IncludeSensitiveAtomicTest.class,