You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/08/08 13:10:19 UTC
[ignite] branch master updated: IGNITE-12048 Bugs & tests fixes -
Fixes #6756.
This is an automated email from the ASF dual-hosted git repository.
dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1f8cf04 IGNITE-12048 Bugs & tests fixes - Fixes #6756.
1f8cf04 is described below
commit 1f8cf042f67f523e23f795571f609a9c81726258
Author: Dmitriy Govorukhin <dm...@gmail.com>
AuthorDate: Thu Aug 8 16:09:41 2019 +0300
IGNITE-12048 Bugs & tests fixes - Fixes #6756.
Page replacement can reload invalid page during checkpoint
There is a race between writeCheckpointPages and page replacement process:
Checkpointer thread begins a checkpoint
Checkpointer thread calls getPageForCheckpoint(), which will copy page content and clear dirty flag
Page replacement tries to find a page for replacement and chooses this page, the page is thrown away
Before the page is written back to the store, the page is acquired again.
As a result, an older copy of the page is brought back to memory, which causes all kinds of corruption exceptions and assertions.
checkpointReadLock() may hang during node stop
I got this hang during one of PDS (Indexing) runs (thread-dump is attached).
The following code hang:
checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut
.getUninterruptibly();
It looks like wakeupForCheckpoint can be called after the checkpointer is stopped and cpBeginFut will be never completed.
Fixed ZookeeperDiscoveryCommunicationFailureTest.testCommunicationFailureResolve_CachesInfo1
Fixed *.testFailAfterStart
Reduce test time execution (scale factor for a long-running tests)
---
.../internal/pagemem/wal/record/PageSnapshot.java | 5 -
.../dht/topology/GridDhtPartitionTopologyImpl.java | 23 +-
.../GridCacheDatabaseSharedManager.java | 96 ++++----
.../cache/persistence/PageStoreWriter.java | 40 ++++
...eWrite.java => DelayedDirtyPageStoreWrite.java} | 14 +-
.../pagemem/DelayedPageReplacementTracker.java | 19 +-
.../cache/persistence/pagemem/PageMemoryEx.java | 20 +-
.../cache/persistence/pagemem/PageMemoryImpl.java | 160 +++++++++----
.../persistence/pagemem/ReplacedPageWriter.java | 35 ---
.../cache/persistence/tree/io/PageIO.java | 39 ++--
modules/core/src/test/config/tests.properties | 4 +-
.../internal/GridTaskFailoverAffinityRunTest.java | 3 +-
.../ignite/internal/TaskNodeRestartTest.java | 3 +-
.../GridCacheAbstractNodeRestartSelfTest.java | 26 +--
.../dht/IgniteCachePutRetryAbstractSelfTest.java | 2 +-
.../IgnitePdsCacheRebalancingAbstractTest.java | 59 ++---
...itePdsCacheStartStopWithFreqCheckpointTest.java | 5 +-
.../IgnitePdsContinuousRestartTest.java | 27 +--
.../persistence/IgnitePdsCorruptedIndexTest.java | 16 +-
.../IgnitePdsExchangeDuringCheckpointTest.java | 7 +-
.../cache/persistence/IgnitePdsPageSizesTest.java | 2 +-
.../IgnitePdsRecoveryAfterFileCorruptionTest.java | 46 ++--
.../persistence/db/IgnitePdsPageEvictionTest.java | 16 +-
...gnitePdsRebalancingOnNotStableTopologyTest.java | 9 +-
.../db/IgnitePdsWholeClusterRestartTest.java | 11 +-
.../db/checkpoint/CheckpointFreeListTest.java | 11 +-
.../IgnitePdsCacheDestroyDuringCheckpointTest.java | 5 +-
.../db/file/IgnitePdsCacheIntegrationTest.java | 14 +-
...CheckpointSimulationWithRealCpDisabledTest.java | 15 +-
.../wal/IgniteNodeStoppedDuringDisableWALTest.java | 41 +++-
...eWalFlushMultiNodeFailoverAbstractSelfTest.java | 59 ++---
.../IgnitePageMemReplaceDelayedWriteUnitTest.java | 7 +-
.../persistence/pagemem/PageMemoryImplTest.java | 254 ++++++++++++++++++++-
.../database/IgniteDbPutGetAbstractTest.java | 31 +--
...SessionSetJobAttributeWaitListenerSelfTest.java | 3 +-
.../tcp/TcpDiscoveryMultiThreadedTest.java | 9 +-
.../testframework/junits/GridAbstractTest.java | 7 +-
...acheLockPartitionOnAffinityRunAbstractTest.java | 7 +-
.../persistence/db/IgniteTcBotInitNewPageTest.java | 18 +-
.../persistence/db/wal/IgniteWalRecoveryTest.java | 13 +-
...ZookeeperDiscoveryCommunicationFailureTest.java | 2 +-
...perDiscoveryTopologyChangeAndReconnectTest.java | 4 +-
42 files changed, 789 insertions(+), 398 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
index 6e3a9b0..1fb4ae6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.pagemem.wal.record;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.util.GridUnsafe;
@@ -148,10 +147,6 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware
+ "],\nsuper = ["
+ super.toString() + "]]";
}
- catch (IgniteCheckedException ignored) {
- return "Error during call 'toString' of PageSnapshot [fullPageId=" + fullPageId() +
- ", pageData = " + Arrays.toString(pageData()) + ", super=" + super.toString() + "]";
- }
finally {
GridUnsafe.cleanDirectBuffer(buf);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index b18ce66..3c9fa6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -2243,6 +2243,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
ctx.database().checkpointReadLock();
try {
+ Map<UUID, Set<Integer>> addToWaitGroups = new HashMap<>();
+
lock.writeLock().lock();
try {
@@ -2274,7 +2276,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
GridDhtPartitionState state = partMap.get(part);
- if (state == null || state != OWNING)
+ if (state != OWNING)
continue;
if (!newOwners.contains(remoteNodeId)) {
@@ -2294,9 +2296,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
UUID nodeId = entry.getKey();
Set<Integer> rebalancedParts = entry.getValue();
- // Add to wait groups to ensure late assignment switch after all partitions are rebalanced.
- for (Integer part : rebalancedParts)
- ctx.cache().context().affinity().addToWaitGroup(groupId(), part, nodeId, topologyVersionFuture().initialVersion());
+ addToWaitGroups.put(nodeId, new HashSet<>(rebalancedParts));
if (!rebalancedParts.isEmpty()) {
Set<Integer> historical = rebalancedParts.stream()
@@ -2315,9 +2315,22 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
node2part = new GridDhtPartitionFullMap(node2part, updateSeq.incrementAndGet());
- } finally {
+ }
+ finally {
lock.writeLock().unlock();
}
+
+ for (Map.Entry<UUID, Set<Integer>> entry : addToWaitGroups.entrySet()) {
+ // Add to wait groups to ensure late assignment switch after all partitions are rebalanced.
+ for (Integer part : entry.getValue()) {
+ ctx.cache().context().affinity().addToWaitGroup(
+ groupId(),
+ part,
+ entry.getKey(),
+ topologyVersionFuture().initialVersion()
+ );
+ }
+ }
}
finally {
ctx.database().checkpointReadUnlock();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index c11b67c..e18b4fb 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -205,7 +205,10 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_FILE_MATCHER;
+import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType;
+import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getVersion;
import static org.apache.ignite.internal.util.IgniteUtils.checkpointBufferSize;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
/**
*
@@ -941,7 +944,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
long time = System.currentTimeMillis();
- checkpointReadLock();
+ CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() + 1);
try {
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
@@ -973,7 +976,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
throw e;
}
finally {
- checkpointReadUnlock();
+ CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() - 1);
}
}
@@ -3059,6 +3062,19 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
int innerIdx = i;
exec.execute(stripeIdx, () -> {
+ PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> {
+ assert tag != PageMemoryImpl.TRY_AGAIN_TAG : "Lock is held by other thread for page " + fullPageId;
+
+ int groupId = fullPageId.groupId();
+ long pageId = fullPageId.pageId();
+
+ // Write buf to page store.
+ PageStore store = storeMgr.writeInternal(groupId, pageId, buf, tag, true);
+
+ // Save store for future fsync.
+ updStores.add(store);
+ };
+
// Local buffer for write pages.
ByteBuffer writePageBuf = ByteBuffer.allocateDirect(pageSize());
@@ -3066,7 +3082,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
Collection<FullPageId> pages0 = pages.innerCollection(innerIdx);
- FullPageId pageId = null;
+ FullPageId fullPageId = null;
try {
for (FullPageId fullId : pages0) {
@@ -3074,38 +3090,21 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (writePagesError.get() != null)
break;
- writePageBuf.rewind();
+ // Save pageId to local variable for future using if exception occurred.
+ fullPageId = fullId;
PageMemoryEx pageMem = getPageMemoryForCacheGroup(fullId.groupId());
- // Write page content to writePageBuf.
- Integer tag = pageMem.getForCheckpoint(fullId, writePageBuf, null);
-
- assert tag == null || tag != PageMemoryImpl.TRY_AGAIN_TAG :
- "Lock is held by other thread for page " + fullId;
-
- if (tag != null) {
- writePageBuf.rewind();
-
- // Save pageId to local variable for future using if exception occurred.
- pageId = fullId;
-
- // Write writePageBuf to page store.
- PageStore store = storeMgr.writeInternal(
- fullId.groupId(), fullId.pageId(), writePageBuf, tag, true);
-
- writePageBuf.rewind();
-
- // Save store for future fsync.
- updStores.add(store);
- }
+ // Write page content to page store via pageStoreWriter.
+ // Tracker is null, because no need to track checkpoint metrics on recovery.
+ pageMem.checkpointWritePage(fullId, writePageBuf, pageStoreWriter, null);
}
// Add number of handled pages.
cpPagesCnt.addAndGet(pages0.size());
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to write page to pageStore, pageId=" + pageId);
+ U.error(log, "Failed to write page to pageStore, pageId=" + fullPageId);
writePagesError.compareAndSet(null, e);
}
@@ -4765,10 +4764,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @return pagesToRetry Pages which should be retried.
*/
private List<FullPageId> writePages(Collection<FullPageId> writePageIds) throws IgniteCheckedException {
- ByteBuffer tmpWriteBuf = threadBuf.get();
-
List<FullPageId> pagesToRetry = new ArrayList<>();
+ CheckpointMetricsTracker tracker = persStoreMetrics.metricsEnabled() ? this.tracker : null;
+
+ PageStoreWriter pageStoreWriter = createPageStoreWriter(pagesToRetry);
+
+ ByteBuffer tmpWriteBuf = threadBuf.get();
+
for (FullPageId fullId : writePageIds) {
if (checkpointer.shutdownNow)
break;
@@ -4799,23 +4802,36 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
pageMem = (PageMemoryEx)region.pageMemory();
}
- Integer tag = pageMem.getForCheckpoint(
- fullId, tmpWriteBuf, persStoreMetrics.metricsEnabled() ? tracker : null);
+ pageMem.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, tracker);
+ }
- if (tag != null) {
+ return pagesToRetry;
+ }
+
+ /**
+ * Factory method for create {@link PageStoreWriter}.
+ *
+ * @param pagesToRetry List pages for retry.
+ * @return Checkpoint page write context.
+ */
+ private PageStoreWriter createPageStoreWriter(List<FullPageId> pagesToRetry) {
+ return new PageStoreWriter() {
+ /** {@inheritDoc} */
+ @Override public void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteCheckedException {
if (tag == PageMemoryImpl.TRY_AGAIN_TAG) {
- pagesToRetry.add(fullId);
+ pagesToRetry.add(fullPageId);
- continue;
+ return;
}
- assert PageIO.getType(tmpWriteBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId());
- assert PageIO.getVersion(tmpWriteBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId());
+ int groupId = fullPageId.groupId();
+ long pageId = fullPageId.pageId();
- tmpWriteBuf.rewind();
+ assert getType(buf) != 0 : "Invalid state. Type is 0! pageId = " + hexLong(pageId);
+ assert getVersion(buf) != 0 : "Invalid state. Version is 0! pageId = " + hexLong(pageId);
if (persStoreMetrics.metricsEnabled()) {
- int pageType = PageIO.getType(tmpWriteBuf);
+ int pageType = getType(buf);
if (PageIO.isDataPageType(pageType))
tracker.onDataPageWritten();
@@ -4823,13 +4839,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
writtenPagesCntr.incrementAndGet();
- PageStore store = storeMgr.writeInternal(grpId, fullId.pageId(), tmpWriteBuf, tag, true);
+ PageStore store = storeMgr.writeInternal(groupId, pageId, buf, tag, true);
updStores.computeIfAbsent(store, k -> new LongAdder()).increment();
}
- }
-
- return pagesToRetry;
+ };
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java
new file mode 100644
index 0000000..18f6d04
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+
+/**
+ * Interface for write page to {@link PageStore}.
+ */
+public interface PageStoreWriter {
+ /**
+ * Callback for write page. {@link PageMemoryEx} will copy page content to buffer before call.
+ *
+ * @param fullPageId Page ID to get byte buffer for. The page ID must be present in the collection returned by
+ * the {@link PageMemoryEx#beginCheckpoint()} method call.
+ * @param buf Temporary buffer to write changes into.
+ * @param tag {@code Partition generation} if data was read, {@code null} otherwise (data already saved to storage).
+ * @throws IgniteCheckedException If write page failed.
+ */
+ void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteCheckedException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageStoreWrite.java
similarity index 90%
rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java
rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageStoreWrite.java
index b08ddc2..2061b4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageStoreWrite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
import org.apache.ignite.internal.util.GridUnsafe;
import org.jetbrains.annotations.Nullable;
@@ -28,9 +29,9 @@ import org.jetbrains.annotations.Nullable;
* content without holding segment lock. Page data is copied into temp buffer during {@link #writePage(FullPageId,
* ByteBuffer, int)} and then sent to real implementation by {@link #finishReplacement()}.
*/
-public class DelayedDirtyPageWrite implements ReplacedPageWriter {
+public class DelayedDirtyPageStoreWrite implements PageStoreWriter {
/** Real flush dirty page implementation. */
- private final ReplacedPageWriter flushDirtyPage;
+ private final PageStoreWriter flushDirtyPage;
/** Page size. */
private final int pageSize;
@@ -56,9 +57,12 @@ public class DelayedDirtyPageWrite implements ReplacedPageWriter {
* @param pageSize page size.
* @param tracker tracker to lock/unlock page reads.
*/
- public DelayedDirtyPageWrite(ReplacedPageWriter flushDirtyPage,
- ThreadLocal<ByteBuffer> byteBufThreadLoc, int pageSize,
- DelayedPageReplacementTracker tracker) {
+ public DelayedDirtyPageStoreWrite(
+ PageStoreWriter flushDirtyPage,
+ ThreadLocal<ByteBuffer> byteBufThreadLoc,
+ int pageSize,
+ DelayedPageReplacementTracker tracker
+ ) {
this.flushDirtyPage = flushDirtyPage;
this.pageSize = pageSize;
this.byteBufThreadLoc = byteBufThreadLoc;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
index 60f9246..574b5fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
/**
* Delayed page writes tracker. Provides delayed write implementations and allows to check if page is actually being
@@ -35,7 +36,7 @@ public class DelayedPageReplacementTracker {
private final int pageSize;
/** Flush dirty page real implementation. */
- private final ReplacedPageWriter flushDirtyPage;
+ private final PageStoreWriter flushDirtyPage;
/** Logger. */
private final IgniteLogger log;
@@ -56,11 +57,11 @@ public class DelayedPageReplacementTracker {
};
/**
- * Dirty page write for replacement operations thread local. Because page write {@link DelayedDirtyPageWrite} is
+ * Dirty page write for replacement operations thread local. Because page write {@link DelayedDirtyPageStoreWrite} is
* stateful and not thread safe, this thread local protects from GC pressure on pages replacement. <br> Map is used
* instead of build-in thread local to allow GC to remove delayed writers for alive threads after node stop.
*/
- private final Map<Long, DelayedDirtyPageWrite> delayedPageWriteThreadLocMap = new ConcurrentHashMap<>();
+ private final Map<Long, DelayedDirtyPageStoreWrite> delayedPageWriteThreadLocMap = new ConcurrentHashMap<>();
/**
* @param pageSize Page size.
@@ -68,8 +69,12 @@ public class DelayedPageReplacementTracker {
* @param log Logger.
* @param segmentCnt Segments count.
*/
- public DelayedPageReplacementTracker(int pageSize, ReplacedPageWriter flushDirtyPage,
- IgniteLogger log, int segmentCnt) {
+ public DelayedPageReplacementTracker(
+ int pageSize,
+ PageStoreWriter flushDirtyPage,
+ IgniteLogger log,
+ int segmentCnt
+ ) {
this.pageSize = pageSize;
this.flushDirtyPage = flushDirtyPage;
this.log = log;
@@ -82,9 +87,9 @@ public class DelayedPageReplacementTracker {
/**
* @return delayed page write implementation, finish method to be called to actually write page.
*/
- public DelayedDirtyPageWrite delayedPageWrite() {
+ public DelayedDirtyPageStoreWrite delayedPageWrite() {
return delayedPageWriteThreadLocMap.computeIfAbsent(Thread.currentThread().getId(),
- id -> new DelayedDirtyPageWrite(flushDirtyPage, byteBufThreadLoc, pageSize, this));
+ id -> new DelayedDirtyPageStoreWrite(flushDirtyPage, byteBufThreadLoc, pageSize, this));
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
index f839bed..2c17431 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
@@ -21,12 +21,12 @@ import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
-import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
-import org.jetbrains.annotations.Nullable;
/**
* Page memory with some persistence related additions.
@@ -123,16 +123,22 @@ public interface PageMemoryEx extends PageMemory {
public void finishCheckpoint();
/**
- * Gets page byte buffer for the checkpoint procedure.
+ * Prepare page for write during checkpoint.
+ *{@link PageStoreWriter} will be called when the page will be ready to write.
*
* @param pageId Page ID to get byte buffer for. The page ID must be present in the collection returned by
* the {@link #beginCheckpoint()} method call.
- * @param outBuf Temporary buffer to write changes into.
+ * @param buf Temporary buffer to write changes into.
+ * @param pageWriter Checkpoint page write context.
* @param tracker Checkpoint metrics tracker.
- * @return {@code Partition generation} if data was read, {@code null} otherwise (data already saved to storage).
- * @throws IgniteException If failed to obtain page data.
+ * @throws IgniteCheckedException If failed to obtain page data.
*/
- @Nullable public Integer getForCheckpoint(FullPageId pageId, ByteBuffer outBuf, CheckpointMetricsTracker tracker);
+ public void checkpointWritePage(
+ FullPageId pageId,
+ ByteBuffer buf,
+ PageStoreWriter pageWriter,
+ CheckpointMetricsTracker tracker
+ ) throws IgniteCheckedException;
/**
* Marks partition as invalid / outdated.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index 8a543c6..e2bb433 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.freelist.io.PagesListMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
@@ -251,7 +252,7 @@ public class PageMemoryImpl implements PageMemoryEx {
private OffheapReadWriteLock rwLock;
/** Flush dirty page closure. When possible, will be called by evictPage(). */
- private final ReplacedPageWriter flushDirtyPage;
+ private final PageStoreWriter flushDirtyPage;
/**
* Delayed page replacement (rotation with disk) tracker. Because other thread may require exactly the same page to be loaded from store,
@@ -306,7 +307,7 @@ public class PageMemoryImpl implements PageMemoryEx {
long[] sizes,
GridCacheSharedContext<?, ?> ctx,
int pageSize,
- ReplacedPageWriter flushDirtyPage,
+ PageStoreWriter flushDirtyPage,
@Nullable GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker,
CheckpointLockStateChecker stateChecker,
DataRegionMetricsImpl memMetrics,
@@ -540,7 +541,7 @@ public class PageMemoryImpl implements PageMemoryEx {
// because there is no crc inside them.
Segment seg = segment(grpId, pageId);
- DelayedDirtyPageWrite delayedWriter = delayedPageReplacementTracker != null
+ DelayedDirtyPageStoreWrite delayedWriter = delayedPageReplacementTracker != null
? delayedPageReplacementTracker.delayedPageWrite() : null;
FullPageId fullId = new FullPageId(pageId, grpId);
@@ -743,7 +744,7 @@ public class PageMemoryImpl implements PageMemoryEx {
seg.readLock().unlock();
}
- DelayedDirtyPageWrite delayedWriter = delayedPageReplacementTracker != null
+ DelayedDirtyPageStoreWrite delayedWriter = delayedPageReplacementTracker != null
? delayedPageReplacementTracker.delayedPageWrite() : null;
seg.writeLock().lock();
@@ -1169,8 +1170,13 @@ public class PageMemoryImpl implements PageMemoryEx {
}
/** {@inheritDoc} */
- @Override public Integer getForCheckpoint(FullPageId fullId, ByteBuffer outBuf, CheckpointMetricsTracker tracker) {
- assert outBuf.remaining() == pageSize();
+ @Override public void checkpointWritePage(
+ FullPageId fullId,
+ ByteBuffer buf,
+ PageStoreWriter pageStoreWriter,
+ CheckpointMetricsTracker metricsTracker
+ ) throws IgniteCheckedException {
+ assert buf.remaining() == pageSize();
Segment seg = segment(fullId.groupId(), fullId.pageId());
@@ -1186,21 +1192,13 @@ public class PageMemoryImpl implements PageMemoryEx {
try {
if (!isInCheckpoint(fullId))
- return null;
+ return;
- tag = seg.partGeneration(fullId.groupId(), PageIdUtils.partId(fullId.pageId()));
-
- relPtr = seg.loadedPages.get(
- fullId.groupId(),
- PageIdUtils.effectivePageId(fullId.pageId()),
- tag,
- INVALID_REL_PTR,
- OUTDATED_REL_PTR
- );
+ relPtr = resolveRelativePointer(seg, fullId, tag = generationTag(seg, fullId));
// Page may have been cleared during eviction. We have nothing to do in this case.
if (relPtr == INVALID_REL_PTR)
- return null;
+ return;
if (relPtr != OUTDATED_REL_PTR) {
absPtr = seg.absolute(relPtr);
@@ -1221,19 +1219,10 @@ public class PageMemoryImpl implements PageMemoryEx {
try {
// Double-check.
- relPtr = seg.loadedPages.get(
- fullId.groupId(),
- PageIdUtils.effectivePageId(fullId.pageId()),
- seg.partGeneration(
- fullId.groupId(),
- PageIdUtils.partId(fullId.pageId())
- ),
- INVALID_REL_PTR,
- OUTDATED_REL_PTR
- );
+ relPtr = resolveRelativePointer(seg, fullId, generationTag(seg, fullId));
if (relPtr == INVALID_REL_PTR)
- return null;
+ return;
if (relPtr == OUTDATED_REL_PTR) {
relPtr = refreshOutdatedPage(
@@ -1246,36 +1235,40 @@ public class PageMemoryImpl implements PageMemoryEx {
seg.pool.releaseFreePage(relPtr);
}
- return null;
+ return;
}
finally {
seg.writeLock().unlock();
}
}
- else
- return copyPageForCheckpoint(absPtr, fullId, outBuf, pageSingleAcquire, tracker) ? tag : TRY_AGAIN_TAG;
+
+ copyPageForCheckpoint(absPtr, fullId, buf, tag, pageSingleAcquire, pageStoreWriter, metricsTracker);
}
/**
* @param absPtr Absolute ptr.
* @param fullId Full id.
- * @param outBuf Output buffer to write page content into.
+ * @param buf Buffer for copy page content for future write via {@link PageStoreWriter}.
* @param pageSingleAcquire Page is acquired only once. We don't pin the page second time (until page will not be
* copied) in case checkpoint temporary buffer is used.
- * @param tracker Checkpoint statistics tracker.
- *
- * @return False if someone else holds lock on page.
+ * @param pageStoreWriter Checkpoint page write context.
*/
- private boolean copyPageForCheckpoint(
+ private void copyPageForCheckpoint(
long absPtr,
FullPageId fullId,
- ByteBuffer outBuf,
+ ByteBuffer buf,
+ Integer tag,
boolean pageSingleAcquire,
+ PageStoreWriter pageStoreWriter,
CheckpointMetricsTracker tracker
- ) {
+ ) throws IgniteCheckedException {
assert absPtr != 0;
assert PageHeader.isAcquired(absPtr);
+ // Exception protection flag.
+ // No need to write if exception occurred.
+ boolean canWrite = false;
+
boolean locked = rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS);
if (!locked) {
@@ -1284,7 +1277,11 @@ public class PageMemoryImpl implements PageMemoryEx {
if (!pageSingleAcquire)
PageHeader.releasePage(absPtr);
- return false;
+ buf.clear();
+
+ pageStoreWriter.writePage(fullId, buf, TRY_AGAIN_TAG);
+
+ return;
}
try {
@@ -1299,7 +1296,7 @@ public class PageMemoryImpl implements PageMemoryEx {
long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr);
- copyInBuffer(tmpAbsPtr, outBuf);
+ copyInBuffer(tmpAbsPtr, buf);
GridUnsafe.setMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize(), (byte)0);
@@ -1312,24 +1309,31 @@ public class PageMemoryImpl implements PageMemoryEx {
// and page did not have tmp buffer page.
if (!pageSingleAcquire)
PageHeader.releasePage(absPtr);
-
}
else {
- copyInBuffer(absPtr, outBuf);
+ copyInBuffer(absPtr, buf);
PageHeader.dirty(absPtr, false);
}
- assert PageIO.getType(outBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId());
- assert PageIO.getVersion(outBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId());
-
- memMetrics.onPageWritten();
+ assert PageIO.getType(buf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId());
+ assert PageIO.getVersion(buf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId());
- return true;
+ canWrite = true;
}
finally {
rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS);
+ if (canWrite){
+ buf.rewind();
+
+ pageStoreWriter.writePage(fullId, buf, tag);
+
+ memMetrics.onPageWritten();
+
+ buf.rewind();
+ }
+
// We pinned the page either when allocated the temp buffer, or when resolved abs pointer.
// Must release the page only after write unlock.
PageHeader.releasePage(absPtr);
@@ -1359,6 +1363,38 @@ public class PageMemoryImpl implements PageMemoryEx {
}
}
+ /**
+ * Get current prartition generation tag.
+ *
+ * @param seg Segment.
+ * @param fullId Full page id.
+ * @return Current partition generation tag.
+ */
+ private int generationTag(Segment seg, FullPageId fullId) {
+ return seg.partGeneration(
+ fullId.groupId(),
+ PageIdUtils.partId(fullId.pageId())
+ );
+ }
+
+ /**
+ * Resolver relative pointer via {@link LoadedPagesMap}.
+ *
+ * @param seg Segment.
+ * @param fullId Full page id.
+ * @param reqVer Required version.
+ * @return Relative pointer.
+ */
+ private long resolveRelativePointer(Segment seg, FullPageId fullId, int reqVer) {
+ return seg.loadedPages.get(
+ fullId.groupId(),
+ PageIdUtils.effectivePageId(fullId.pageId()),
+ reqVer,
+ INVALID_REL_PTR,
+ OUTDATED_REL_PTR
+ );
+ }
+
/** {@inheritDoc} */
@Override public int invalidate(int grpId, int partId) {
synchronized (segmentsLock) {
@@ -1477,6 +1513,30 @@ public class PageMemoryImpl implements PageMemoryEx {
}
/**
+ * @param fullPageId Full page ID to check.
+ * @return {@code true} if the page is contained in the loaded pages table, {@code false} otherwise.
+ */
+ public boolean hasLoadedPage(FullPageId fullPageId) {
+ int grpId = fullPageId.groupId();
+ long pageId = PageIdUtils.effectivePageId(fullPageId.pageId());
+ int partId = PageIdUtils.partId(pageId);
+
+ Segment seg = segment(grpId, pageId);
+
+ seg.readLock().lock();
+
+ try {
+ long res =
+ seg.loadedPages.get(grpId, pageId, seg.partGeneration(grpId, partId), INVALID_REL_PTR, INVALID_REL_PTR);
+
+ return res != INVALID_REL_PTR;
+ }
+ finally {
+ seg.readLock().unlock();
+ }
+ }
+
+ /**
* @param absPtr Absolute pointer to read lock.
* @param pageId Page ID.
* @param force Force flag.
@@ -2186,7 +2246,7 @@ public class PageMemoryImpl implements PageMemoryEx {
* @return {@code True} if it is ok to replace this page, {@code false} if another page should be selected.
* @throws IgniteCheckedException If failed to write page to the underlying store during eviction.
*/
- private boolean preparePageRemoval(FullPageId fullPageId, long absPtr, ReplacedPageWriter saveDirtyPage) throws IgniteCheckedException {
+ private boolean preparePageRemoval(FullPageId fullPageId, long absPtr, PageStoreWriter saveDirtyPage) throws IgniteCheckedException {
assert writeLock().isHeldByCurrentThread();
// Do not evict cache meta pages.
@@ -2279,7 +2339,7 @@ public class PageMemoryImpl implements PageMemoryEx {
* @throws IgniteCheckedException If failed to evict page.
* @param saveDirtyPage Replaced page writer, implementation to save dirty page to persistent storage.
*/
- private long removePageForReplacement(ReplacedPageWriter saveDirtyPage) throws IgniteCheckedException {
+ private long removePageForReplacement(PageStoreWriter saveDirtyPage) throws IgniteCheckedException {
assert getWriteHoldCount() > 0;
if (!pageReplacementWarned) {
@@ -2451,7 +2511,7 @@ public class PageMemoryImpl implements PageMemoryEx {
* @param cap Capacity.
* @param saveDirtyPage Evicted page writer.
*/
- private long tryToFindSequentially(int cap, ReplacedPageWriter saveDirtyPage) throws IgniteCheckedException {
+ private long tryToFindSequentially(int cap, PageStoreWriter saveDirtyPage) throws IgniteCheckedException {
assert getWriteHoldCount() > 0;
long prevAddr = INVALID_REL_PTR;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java
deleted file mode 100644
index 30f9633..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.internal.processors.cache.persistence.pagemem;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.FullPageId;
-
-/**
- * Flush (write) dirty page implementation for freed page during page replacement. When possible, will be called by
- * removePageForReplacement().
- */
-public interface ReplacedPageWriter {
- /**
- * @param fullPageId Full page ID being evicted.
- * @param byteBuf Buffer with page data.
- * @param tag partition update tag, increasing counter.
- * @throws IgniteCheckedException if page write failed.
- */
- void writePage(FullPageId fullPageId, ByteBuffer byteBuf, int tag) throws IgniteCheckedException;
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
index 93ee3b3..f03303c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.metric.IndexPageType;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
@@ -46,8 +48,6 @@ import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccCacheIdAwa
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccCacheIdAwareDataLeafIO;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataInnerIO;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataLeafIO;
-import org.apache.ignite.internal.metric.IndexPageType;
-import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.util.GridStringBuilder;
import org.apache.ignite.spi.encryption.EncryptionSpi;
@@ -878,24 +878,29 @@ public abstract class PageIO {
/**
* @param addr Address.
*/
- public static String printPage(long addr, int pageSize) throws IgniteCheckedException {
- PageIO io = getPageIO(addr);
-
+ public static String printPage(long addr, int pageSize) {
GridStringBuilder sb = new GridStringBuilder("Header [\n\ttype=");
- sb.a(getType(addr)).a(" (").a(io.getClass().getSimpleName())
- .a("),\n\tver=").a(getVersion(addr)).a(",\n\tcrc=").a(getCrc(addr))
- .a(",\n\t").a(PageIdUtils.toDetailString(getPageId(addr)))
- .a("\n],\n");
-
- if (getCompressionType(addr) != 0) {
- sb.a("CompressedPage[\n\tcompressionType=").a(getCompressionType(addr))
- .a(",\n\tcompressedSize=").a(getCompressedSize(addr))
- .a(",\n\tcompactedSize=").a(getCompactedSize(addr))
- .a("\n]");
+ try {
+ PageIO io = getPageIO(addr);
+
+ sb.a(getType(addr)).a(" (").a(io.getClass().getSimpleName())
+ .a("),\n\tver=").a(getVersion(addr)).a(",\n\tcrc=").a(getCrc(addr))
+ .a(",\n\t").a(PageIdUtils.toDetailString(getPageId(addr)))
+ .a("\n],\n");
+
+ if (getCompressionType(addr) != 0) {
+ sb.a("CompressedPage[\n\tcompressionType=").a(getCompressionType(addr))
+ .a(",\n\tcompressedSize=").a(getCompressedSize(addr))
+ .a(",\n\tcompactedSize=").a(getCompactedSize(addr))
+ .a("\n]");
+ }
+ else
+ io.printPage(addr, pageSize, sb);
+ }
+ catch (IgniteCheckedException e) {
+ sb.a("Failed to print page: ").a(e.getMessage());
}
- else
- io.printPage(addr, pageSize, sb);
return sb.toString();
}
diff --git a/modules/core/src/test/config/tests.properties b/modules/core/src/test/config/tests.properties
index 9d72eb7..18b3606 100644
--- a/modules/core/src/test/config/tests.properties
+++ b/modules/core/src/test/config/tests.properties
@@ -51,9 +51,9 @@ deploy.uri.ftp=ftp://ftptest:iddqd@94.72.60.102:21/test/deployment
# Classes scanner URI for classes deployment. Must be overridden for every user.
deploy.uri.cls=${CLASSES_URI}
# Http scanner URI for HTTP deployment.
-deploy.uri.http=http://216.93.179.140/ignite/test/deployment/
+deploy.uri.http=http://fake.uri
# Http scanner URI for secure SSL HTTPs deployment.
-deploy.uri.https=https://216.93.179.140:8445/ignite/test/deployment/
+deploy.uri.https=https://fake.uri
# Directory with descriptors to construct GAR files.
deploy.gar.descriptor.dir=modules/urideploy/src/test/java/org/apache/ignite/spi/deployment/uri/META-INF
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java
index b7ef90c..64e340c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -109,7 +110,7 @@ public class GridTaskFailoverAffinityRunTest extends GridCommonAbstractTest {
final AtomicInteger gridIdx = new AtomicInteger(1);
- final long stopTime = System.currentTimeMillis() + 60_000;
+ final long stopTime = System.currentTimeMillis() + SF.applyLB(30_000, 10_000);
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java
index 14dd933..585adfd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -100,7 +101,7 @@ public class TaskNodeRestartTest extends GridCommonAbstractTest {
IgniteInternalFuture<?> fut = null;
try {
- final long stopTime = System.currentTimeMillis() + 60_000;
+ final long stopTime = System.currentTimeMillis() + SF.applyLB(30_000, 10_000);
final AtomicInteger idx = new AtomicInteger();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
index 3dde336..9a62cfc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
@@ -351,7 +351,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
rebalancMode = ASYNC;
evict = false;
- long duration = SF.applyLB(60_000, 5_000);
+ long duration = SF.applyLB(30_000, 5_000);
checkRestartWithPut(duration, 2, 2);
}
@@ -368,7 +368,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
rebalancMode = ASYNC;
evict = false;
- long duration = SF.applyLB(60_000, 5_000);
+ long duration = SF.applyLB(30_000, 5_000);
checkRestartWithTx(duration, 2, 2, 3);
}
@@ -385,7 +385,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
rebalancMode = ASYNC;
evict = false;
- long duration = SF.applyLB(60_000, 5_000);
+ long duration = SF.applyLB(30_000, 5_000);
checkRestartWithPut(duration, 2, 2);
}
@@ -402,7 +402,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
rebalancMode = ASYNC;
evict = true;
- long duration = SF.applyLB(60_000, 5_000);
+ long duration = SF.applyLB(30_000, 5_000);
checkRestartWithPut(duration, 2, 2);
}
@@ -419,7 +419,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
rebalancMode = ASYNC;
evict = false;
- long duration = SF.applyLB(60_000, 5_000);
+ long duration = SF.applyLB(30_000, 5_000);
checkRestartWithTx(duration, 2, 2, 3);
}
@@ -453,7 +453,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
rebalancMode = ASYNC;
evict = false;
- long duration = SF.applyLB(90_000, 6_000);
+ long duration = SF.applyLB(30_000, 6_000);
checkRestartWithPut(duration, 3, 3);
}
@@ -470,7 +470,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
rebalancMode = ASYNC;
evict = false;
- long duration = SF.applyLB(90_000, 6_000);
+ long duration = SF.applyLB(30_000, 6_000);
checkRestartWithTx(duration, 3, 3, 3);
}
@@ -487,7 +487,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
rebalancMode = ASYNC;
evict = false;
- long duration = SF.applyLB(90_000, 6_000);
+ long duration = SF.applyLB(30_000, 6_000);
checkRestartWithPut(duration, 4, 4);
}
@@ -504,7 +504,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
rebalancMode = ASYNC;
evict = false;
- long duration = SF.applyLB(90_000, 6_000);
+ long duration = SF.applyLB(30_000, 6_000);
checkRestartWithTx(duration, 4, 4, 3);
}
@@ -521,7 +521,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
rebalancMode = ASYNC;
evict = false;
- long duration = SF.applyLB(90_000, 6_000);
+ long duration = SF.applyLB(30_000, 6_000);
checkRestartWithPut(duration, 5, 5);
}
@@ -538,7 +538,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
rebalancMode = ASYNC;
evict = false;
- long duration = SF.applyLB(90_000, 6_000);
+ long duration = SF.applyLB(30_000, 6_000);
checkRestartWithTx(duration, 5, 5, 3);
}
@@ -555,7 +555,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
rebalancMode = ASYNC;
evict = false;
- long duration = SF.applyLB(90_000, 6_000);
+ long duration = SF.applyLB(30_000, 6_000);
checkRestartWithTxPutAll(duration, 5, 5);
}
@@ -572,7 +572,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
rebalancMode = ASYNC;
evict = false;
- long duration = SF.applyLB(90_000, 6_000);
+ long duration = SF.applyLB(30_000, 6_000);
checkRestartWithTxPutAll(duration, 2, 2);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index d035d14..7a321f1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -62,7 +62,7 @@ import static org.apache.ignite.testframework.GridTestUtils.runAsync;
*/
public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbstractTest {
/** */
- protected static final long DURATION = 60_000;
+ protected static final long DURATION = GridTestUtils.SF.applyLB(30_000, 7_000);
/** */
protected static final int GRID_CNT = 4;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
index db3cb71..0f1d016 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
@@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.persistence;
+import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
@@ -33,7 +34,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.collect.Lists;
+import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
@@ -43,6 +44,7 @@ import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -51,10 +53,12 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
@@ -323,10 +327,9 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
final int entriesCnt = 10_000;
final int maxNodesCnt = 4;
- final int topChanges = 25;
+ final int topChanges = SF.applyLB(15, 5);
final boolean allowRemoves = true;
- final AtomicLong orderCounter = new AtomicLong();
final AtomicBoolean stop = new AtomicBoolean();
final AtomicBoolean suspend = new AtomicBoolean();
final AtomicBoolean suspended = new AtomicBoolean();
@@ -337,16 +340,16 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
ignite.cluster().active(true);
- IgniteCache<Integer, TestValue> cache = ignite.cache(INDEXED_CACHE);
-
- for (int i = 0; i < entriesCnt; i++) {
- long order = orderCounter.get();
+ try (IgniteDataStreamer<Integer, TestValue> ds = ignite.dataStreamer(INDEXED_CACHE)) {
+ for (int i = 0; i < entriesCnt; i++) {
+ ds.addData(i, new TestValue(i, i, i));
+ map.put(i, new TestValue(i, i, i));
+ }
+ }
- cache.put(i, new TestValue(order, i, i));
- map.put(i, new TestValue(order, i, i));
+ IgniteCache<Integer, TestValue> cache = ignite.cache(INDEXED_CACHE);
- orderCounter.incrementAndGet();
- }
+ final AtomicLong orderCounter = new AtomicLong(entriesCnt);
final AtomicInteger nodesCnt = new AtomicInteger(4);
@@ -445,7 +448,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
if (U.currentTimeMillis() > timeOut)
break;
- U.sleep(3_000);
+ U.sleep(SF.applyLB(3_000, 500));
boolean addNode;
@@ -581,7 +584,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
*/
@Test
public void testPartitionCounterConsistencyOnUnstableTopology() throws Exception {
- final Ignite ig = startGrids(4);
+ Ignite ig = startGridsMultiThreaded(4);
ig.cluster().active(true);
@@ -596,23 +599,25 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
assertPartitionsSame(idleVerify(grid(0), CACHE));
- for (int it = 0; it < 10; it++) {
+ for (int it = 0; it < SF.applyLB(10, 3); it++) {
final int it0 = it;
IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
try {
+ int dataLoadTimeout = SF.applyLB(500, 250);
+
stopGrid(3);
- U.sleep(500); // Wait for data load.
+ U.sleep(dataLoadTimeout); // Wait for data load.
startGrid(3);
- U.sleep(500); // Wait for data load.
+ U.sleep(dataLoadTimeout); // Wait for data load.
if (it0 % 2 != 0) {
stopGrid(2);
- U.sleep(500); // Wait for data load.
+ U.sleep(dataLoadTimeout); // Wait for data load.
startGrid(2);
}
@@ -651,8 +656,10 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
cntrs.put(part.id(), part.updateCounter());
}
- for (int k0 = 0; k0 < keys; k0++)
- assertEquals(String.valueOf(k0) + " " + g, k0, ig0.cache(CACHE).get(k0));
+ IgniteCache<Integer, String> ig0cache = ig0.cache(CACHE);
+
+ for (Cache.Entry<Integer, String> entry : ig0cache.query(new ScanQuery<Integer, String>()))
+ assertEquals(entry.getKey() + " " + g, entry.getKey(), entry.getValue());
}
assertEquals(ig.affinity(CACHE).partitions(), cntrs.size());
@@ -720,12 +727,8 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
private static class CoordinatorNodeFilter implements IgnitePredicate<ClusterNode> {
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode node) {
- try {
- return node.order() > 1;
- }
- catch (UnsupportedOperationException e) {
- return false;
- }
+ // Do not start cache on coordinator.
+ return !node.<String>attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME).endsWith("0");
}
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheStartStopWithFreqCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheStartStopWithFreqCheckpointTest.java
index 0843ce3..e3599f7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheStartStopWithFreqCheckpointTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheStartStopWithFreqCheckpointTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
@@ -47,7 +48,7 @@ import org.junit.Test;
*/
public class IgnitePdsCacheStartStopWithFreqCheckpointTest extends GridCommonAbstractTest {
/** Caches. */
- private static final int CACHES = 10;
+ private static final int CACHES = SF.applyLB(10, 3);
/** Cache name. */
private static final String CACHE_NAME = "test";
@@ -152,7 +153,7 @@ public class IgnitePdsCacheStartStopWithFreqCheckpointTest extends GridCommonAbs
}
});
- U.sleep(60_000);
+ U.sleep(SF.applyLB(60_000, 10_000));
log.info("Stopping caches start/stop process.");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
index b51354d..ed3658a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.TransactionRollbackException;
@@ -138,7 +139,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
*/
@Test
public void testRebalancingDuringLoad_1000_500_1_1() throws Exception {
- checkRebalancingDuringLoad(1000, 500, 1, 1);
+ checkRebalancingDuringLoad(SF.apply(1000), SF.apply(500), 1, 1);
}
/**
@@ -146,7 +147,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
*/
@Test
public void testRebalancingDuringLoad_8000_500_1_1() throws Exception {
- checkRebalancingDuringLoad(8000, 500, 1, 1);
+ checkRebalancingDuringLoad(SF.apply(8000), SF.apply(500), 1, 1);
}
/**
@@ -154,7 +155,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
*/
@Test
public void testRebalancingDuringLoad_1000_20000_1_1() throws Exception {
- checkRebalancingDuringLoad(1000, 20000, 1, 1);
+ checkRebalancingDuringLoad(SF.apply(1000), SF.apply(20000), 1, 1);
}
/**
@@ -162,7 +163,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
*/
@Test
public void testRebalancingDuringLoad_8000_8000_1_1() throws Exception {
- checkRebalancingDuringLoad(8000, 8000, 1, 1);
+ checkRebalancingDuringLoad(SF.apply(8000), SF.apply(8000), 1, 1);
}
/**
@@ -170,7 +171,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
*/
@Test
public void testRebalancingDuringLoad_1000_500_8_1() throws Exception {
- checkRebalancingDuringLoad(1000, 500, 8, 1);
+ checkRebalancingDuringLoad(SF.apply(1000), SF.apply(500), 8, 1);
}
/**
@@ -178,7 +179,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
*/
@Test
public void testRebalancingDuringLoad_8000_500_8_1() throws Exception {
- checkRebalancingDuringLoad(8000, 500, 8, 1);
+ checkRebalancingDuringLoad(SF.apply(8000), SF.apply(500), 8, 1);
}
/**
@@ -186,7 +187,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
*/
@Test
public void testRebalancingDuringLoad_1000_20000_8_1() throws Exception {
- checkRebalancingDuringLoad(1000, 20000, 8, 1);
+ checkRebalancingDuringLoad(SF.apply(1000), SF.apply(20000), 8, 1);
}
/**
@@ -194,7 +195,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
*/
@Test
public void testRebalancingDuringLoad_8000_8000_8_1() throws Exception {
- checkRebalancingDuringLoad(8000, 8000, 8, 1);
+ checkRebalancingDuringLoad(SF.apply(8000), SF.apply(8000), 8, 1);
}
/**
@@ -202,7 +203,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
*/
@Test
public void testRebalancingDuringLoad_1000_500_8_16() throws Exception {
- checkRebalancingDuringLoad(1000, 500, 8, 16);
+ checkRebalancingDuringLoad(SF.apply(1000), SF.apply(500), 8, 16);
}
/**
@@ -210,7 +211,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
*/
@Test
public void testRebalancingDuringLoad_8000_500_8_16() throws Exception {
- checkRebalancingDuringLoad(8000, 500, 8, 16);
+ checkRebalancingDuringLoad(SF.apply(8000), SF.apply(500), 8, 16);
}
/**
@@ -218,7 +219,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
*/
@Test
public void testRebalancingDuringLoad_1000_20000_8_16() throws Exception {
- checkRebalancingDuringLoad(1000, 20000, 8, 16);
+ checkRebalancingDuringLoad(SF.apply(1000), SF.apply(20000), 8, 16);
}
/**
@@ -226,7 +227,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
*/
@Test
public void testRebalancingDuringLoad_8000_8000_8_16() throws Exception {
- checkRebalancingDuringLoad(8000, 8000, 8, 16);
+ checkRebalancingDuringLoad(SF.apply(8000), SF.apply(8000), 8, 16);
}
/**
@@ -308,7 +309,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
}
}, threads, "updater");
- long end = System.currentTimeMillis() + 90_000;
+ long end = System.currentTimeMillis() + SF.apply(90000);
Random rnd = ThreadLocalRandom.current();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedIndexTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedIndexTest.java
index 085b9b0..28eec74 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedIndexTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedIndexTest.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactor
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy;
import org.junit.Test;
@@ -55,6 +56,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJU
/**
* Test to reproduce corrupted indexes problem after partition file eviction and truncation.
*/
+@WithSystemProperty(key = IGNITE_BASELINE_AUTO_ADJUST_ENABLED, value = "false")
public class IgnitePdsCorruptedIndexTest extends GridCommonAbstractTest {
/** Cache name. */
private static final String CACHE = "cache";
@@ -100,20 +102,6 @@ public class IgnitePdsCorruptedIndexTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- System.setProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED, "false");
-
- super.beforeTestsStarted();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- super.afterTestsStopped();
-
- System.clearProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED);
- }
-
- /** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
stopAllGrids();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java
index de5cd96..08505de 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -39,7 +40,7 @@ public class IgnitePdsExchangeDuringCheckpointTest extends GridCommonAbstractTes
*/
@Test
public void testExchangeOnNodeLeft() throws Exception {
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < SF.applyLB(5, 2); i++) {
startGrids(3);
IgniteEx ignite = grid(1);
ignite.active(true);
@@ -61,7 +62,7 @@ public class IgnitePdsExchangeDuringCheckpointTest extends GridCommonAbstractTes
*/
@Test
public void testExchangeOnNodeJoin() throws Exception {
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < SF.applyLB(5, 2); i++) {
startGrids(2);
IgniteEx ignite = grid(1);
ignite.active(true);
@@ -78,8 +79,6 @@ public class IgnitePdsExchangeDuringCheckpointTest extends GridCommonAbstractTes
}
}
- /**
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java
index 5a953a7..d8750cf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java
@@ -134,7 +134,7 @@ public class IgnitePdsPageSizesTest extends GridCommonAbstractTest {
try {
final IgniteCache<Object, Object> cache = ignite.cache(cacheName);
- final long endTime = System.currentTimeMillis() + 60_000;
+ final long endTime = System.currentTimeMillis() + GridTestUtils.SF.applyLB(60_000, 10_000);
GridTestUtils.runMultiThreaded(new Callable<Object>() {
@Override public Object call() throws Exception {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
index 6928cc2..0cd040e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -320,44 +321,43 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract
info("Acquired pages for checkpoint: " + pageIds.size());
try {
- ByteBuffer tmpBuf = ByteBuffer.allocate(mem.pageSize());
-
- tmpBuf.order(ByteOrder.nativeOrder());
-
long begin = System.currentTimeMillis();
long cp = 0;
- long write = 0;
+ AtomicLong write = new AtomicLong();
- for (FullPageId fullId : pages) {
- if (pageIds.contains(fullId)) {
- long cpStart = System.nanoTime();
+ PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> {
+ int groupId = fullPageId.groupId();
+ long pageId = fullPageId.pageId();
- Integer tag = mem.getForCheckpoint(fullId, tmpBuf, null);
+ for (int j = PageIO.COMMON_HEADER_END; j < mem.realPageSize(groupId); j += 4)
+ assertEquals(j + (int)pageId, buf.getInt(j));
- if (tag == null)
- continue;
+ buf.rewind();
- long cpEnd = System.nanoTime();
+ long writeStart = System.nanoTime();
- cp += cpEnd - cpStart;
- tmpBuf.rewind();
+ storeMgr.write(cacheId, pageId, buf, tag);
- for (int j = PageIO.COMMON_HEADER_END; j < mem.realPageSize(fullId.groupId()); j += 4)
- assertEquals(j + (int)fullId.pageId(), tmpBuf.getInt(j));
+ long writeEnd = System.nanoTime();
- tmpBuf.rewind();
+ write.getAndAdd(writeEnd - writeStart);
+ };
- long writeStart = System.nanoTime();
+ ByteBuffer tmpBuf = ByteBuffer.allocate(mem.pageSize());
- storeMgr.write(cacheId, fullId.pageId(), tmpBuf, tag);
+ tmpBuf.order(ByteOrder.nativeOrder());
- long writeEnd = System.nanoTime();
+ for (FullPageId fullId : pages) {
+ if (pageIds.contains(fullId)) {
+ long cpStart = System.nanoTime();
+
+ mem.checkpointWritePage(fullId, tmpBuf, pageStoreWriter, null);
- write += writeEnd - writeStart;
+ long cpEnd = System.nanoTime();
- tmpBuf.rewind();
+ cp += cpEnd - cpStart;
}
}
@@ -368,7 +368,7 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract
long end = System.currentTimeMillis();
info("Written pages in " + (end - begin) + "ms, copy took " + (cp / 1_000_000) + "ms, " +
- "write took " + (write / 1_000_000) + "ms, sync took " + (end - syncStart) + "ms");
+ "write took " + (write.get() / 1_000_000) + "ms, sync took " + (end - syncStart) + "ms");
}
finally {
info("Finishing checkpoint...");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsPageEvictionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsPageEvictionTest.java
index 1878d63..33b3492 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsPageEvictionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsPageEvictionTest.java
@@ -19,20 +19,23 @@ package org.apache.ignite.internal.processors.cache.persistence.db;
import java.io.Serializable;
import java.util.List;
+import javax.cache.Cache;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -41,7 +44,7 @@ import org.junit.Test;
*/
public class IgnitePdsPageEvictionTest extends GridCommonAbstractTest {
/** Test entry count. */
- public static final int ENTRY_CNT = 1_000_000;
+ public static final int ENTRY_CNT = SF.applyLB(300_000, 100_000);
/** Cache name. */
private static final String CACHE_NAME = "cache";
@@ -109,14 +112,17 @@ public class IgnitePdsPageEvictionTest extends GridCommonAbstractTest {
IgniteCache<DbKey, DbValue> cache = ignite(0).cache(CACHE_NAME);
- for (int i = 0; i < ENTRY_CNT; i++) {
- assertEquals(Long.MAX_VALUE - i, cache.get(new DbKey(i)).lVal);
+ int i = 0;
+ for (Cache.Entry<DbKey, DbValue> entry : cache.query(new ScanQuery<DbKey, DbValue>())) {
+ assertEquals(Long.MAX_VALUE - entry.getKey().val, entry.getValue().lVal);
if (i > 0 && i % 10_000 == 0)
info("Done get: " + i);
+
+ i++;
}
- for (int i = 0; i < ENTRY_CNT; i++) {
+ for (i = 0; i < ENTRY_CNT; i++) {
List<List<?>> rows = cache.query(
new SqlFieldsQuery("select lVal from DbValue where iVal=?").setArgs(i)
).getAll();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsRebalancingOnNotStableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsRebalancingOnNotStableTopologyTest.java
index 231bf61..a736305 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsRebalancingOnNotStableTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsRebalancingOnNotStableTopologyTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy;
import org.junit.Test;
@@ -48,7 +49,7 @@ public class IgnitePdsRebalancingOnNotStableTopologyTest extends GridCommonAbstr
private static final long CHECKPOINT_FREQUENCY = 2_000_000;
/** Cluster size. */
- private static final int CLUSTER_SIZE = 5;
+ private static final int CLUSTER_SIZE = SF.applyLB(5,3 );
/**
* @throws Exception When fails.
@@ -110,14 +111,14 @@ public class IgnitePdsRebalancingOnNotStableTopologyTest extends GridCommonAbstr
for (int i = 2; i < CLUSTER_SIZE; i++) {
startGrid(i);
- U.sleep(5000);
+ U.sleep(SF.apply(3000));
}
- U.sleep(10000);
+ U.sleep(SF.apply(5000));
IgniteProcessProxy.kill("db.RebalancingOnNotStableTopologyTest2");
- Thread.sleep(5000);
+ Thread.sleep(SF.apply(3000));
IgniteProcessProxy.kill("db.RebalancingOnNotStableTopologyTest1");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWholeClusterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWholeClusterRestartTest.java
index b5f214b..c22cecf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWholeClusterRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWholeClusterRestartTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.spi.checkpoint.noop.NoopCheckpointSpi;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -40,10 +41,10 @@ import org.junit.Test;
*/
public class IgnitePdsWholeClusterRestartTest extends GridCommonAbstractTest {
/** */
- private static final int GRID_CNT = 5;
+ private static final int GRID_CNT = SF.applyLB(5, 3);
/** */
- private static final int ENTRIES_COUNT = 1_000;
+ private static final int ENTRIES_COUNT = SF.apply(1_000);
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -90,7 +91,7 @@ public class IgnitePdsWholeClusterRestartTest extends GridCommonAbstractTest {
cleanPersistenceDir();
}
- /**
+ /**
* @throws Exception if failed.
*/
@Test
@@ -113,7 +114,7 @@ public class IgnitePdsWholeClusterRestartTest extends GridCommonAbstractTest {
for (int i = 0; i < GRID_CNT; i++)
idxs.add(i);
- for (int r = 0; r < 10; r++) {
+ for (int r = 0; r < SF.applyLB(10, 3); r++) {
Collections.shuffle(idxs);
info("Will start in the following order: " + idxs);
@@ -129,7 +130,7 @@ public class IgnitePdsWholeClusterRestartTest extends GridCommonAbstractTest {
for (int k = 0; k < ENTRIES_COUNT; k++)
assertEquals("Failed to read [g=" + g + ", part=" + ig.affinity(DEFAULT_CACHE_NAME).partition(k) +
- ", nodes=" + ig.affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(k) + ']',
+ ", nodes=" + ig.affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(k) + ']',
k, ig.cache(DEFAULT_CACHE_NAME).get(k));
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFreeListTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFreeListTest.java
index 5074329..e83148b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFreeListTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFreeListTest.java
@@ -55,6 +55,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -75,7 +76,7 @@ public class CheckpointFreeListTest extends GridCommonAbstractTest {
private static final String CACHE_NAME = "cacheOne";
/** Cache size */
- public static final int CACHE_SIZE = 30000;
+ public static final int CACHE_SIZE = SF.apply(30000);
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
@@ -126,7 +127,7 @@ public class CheckpointFreeListTest extends GridCommonAbstractTest {
.setAtomicityMode(mode)
.setBackups(1)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
- .setAffinity(new RendezvousAffinityFunction(false, 1024))
+ .setAffinity(new RendezvousAffinityFunction(false, SF.apply(1024)))
.setIndexedTypes(String.class, String.class);
}
@@ -157,7 +158,7 @@ public class CheckpointFreeListTest extends GridCommonAbstractTest {
IgniteCache<Integer, Object> cache = igniteClient.cache(CACHE_NAME);
for (int j = 0; j < CACHE_SIZE; j++) {
- cache.put(j, new byte[random.nextInt(3072)]);
+ cache.put(j, new byte[random.nextInt(SF.apply(3072))]);
if (random.nextBoolean())
cache.remove(j);
@@ -224,7 +225,7 @@ public class CheckpointFreeListTest extends GridCommonAbstractTest {
IgniteCache<Integer, Object> cache = ignite0.cache(CACHE_NAME);
for (int j = 0; j < CACHE_SIZE; j++) {
- byte[] val = new byte[random.nextInt(3072)];
+ byte[] val = new byte[random.nextInt(SF.apply(3072))];
cache.put(j, val);
@@ -257,7 +258,7 @@ public class CheckpointFreeListTest extends GridCommonAbstractTest {
CyclicBarrier nodeStartBarrier = new CyclicBarrier(2);
- int approximateIterationCount = 10;
+ int approximateIterationCount = SF.applyLB(10, 6);
//Approximate count of entries to put per one iteration.
int iterationDataCount = entriesToRemove.size() / approximateIterationCount;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCacheDestroyDuringCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCacheDestroyDuringCheckpointTest.java
index 8b41007..27f4171 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCacheDestroyDuringCheckpointTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCacheDestroyDuringCheckpointTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -38,10 +39,10 @@ public class IgnitePdsCacheDestroyDuringCheckpointTest extends GridCommonAbstrac
private static final String NAME_PREFIX = "CACHE-";
/** */
- private static final int NUM_ITERATIONS = 10;
+ private static final int NUM_ITERATIONS = SF.applyLB(5, 3);
/** */
- private static final int NUM_CACHES = 10;
+ private static final int NUM_CACHES = SF.applyLB(10, 3);
/** */
private static final int NUM_ENTRIES_PER_CACHE = 200;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCacheIntegrationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCacheIntegrationTest.java
index 4b80170..c3ddba5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCacheIntegrationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCacheIntegrationTest.java
@@ -170,7 +170,7 @@ public class IgnitePdsCacheIntegrationTest extends GridCommonAbstractTest {
private void checkPutGetSql(Ignite ig, boolean write) {
IgniteCache<Integer, DbValue> cache = ig.cache(CACHE_NAME);
- int entryCnt = 50_000;
+ int entryCnt = GridTestUtils.SF.apply(50_000);
if (write) {
try (IgniteDataStreamer<Object, Object> streamer = ig.dataStreamer(CACHE_NAME)) {
@@ -196,20 +196,20 @@ public class IgnitePdsCacheIntegrationTest extends GridCommonAbstractTest {
assertEquals("i = " + i, new DbValue(i, "value-" + i, i), cache.get(i));
List<List<?>> res = cache.query(new SqlFieldsQuery("select ival from dbvalue where ival < ? order by ival asc")
- .setArgs(10_000)).getAll();
+ .setArgs(2_000)).getAll();
- assertEquals(10_000, res.size());
+ assertEquals(2_000, res.size());
- for (int i = 0; i < 10_000; i++) {
+ for (int i = 0; i < 2_000; i++) {
assertEquals(1, res.get(i).size());
assertEquals(i, res.get(i).get(0));
}
- assertEquals(1, cache.query(new SqlFieldsQuery("select lval from dbvalue where ival = 7899")).getAll().size());
- assertEquals(5000, cache.query(new SqlFieldsQuery("select lval from dbvalue where ival >= 5000 and ival < 10000"))
+ assertEquals(1, cache.query(new SqlFieldsQuery("select lval from dbvalue where ival = 799")).getAll().size());
+ assertEquals(500, cache.query(new SqlFieldsQuery("select lval from dbvalue where ival >= 500 and ival < 1000"))
.getAll().size());
- for (int i = 0; i < 10_000; i++)
+ for (int i = 0; i < 1_000; i++)
assertEquals(new DbValue(i, "value-" + i, i), cache.get(i));
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
index fcc2363..30de694 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
@@ -70,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionImpl;
import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
@@ -643,7 +645,8 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
buf.rewind();
- mem.getForCheckpoint(fullId, buf, null);
+ mem.checkpointWritePage(fullId, buf, (fullPageId, buffer, tag) -> {
+ }, null);
buf.position(PageIO.COMMON_HEADER_END);
@@ -973,8 +976,16 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
Integer tag;
+ AtomicReference<Integer> tag0 = new AtomicReference<>();
+
+ PageStoreWriter pageStoreWriter = (fullPageId, buf, tagx) -> {
+ tag0.set(tagx);
+ };
+
while (true) {
- tag = mem.getForCheckpoint(fullId, tmpBuf, null);
+ mem.checkpointWritePage(fullId, tmpBuf, pageStoreWriter, null);
+
+ tag = tag0.get();
if (tag != null && tag == PageMemoryImpl.TRY_AGAIN_TAG)
continue;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
index 6293b52..bf3be7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
@@ -23,6 +23,8 @@ import java.nio.file.FileVisitResult;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
@@ -38,9 +40,12 @@ import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import static java.nio.file.FileVisitResult.CONTINUE;
import static java.nio.file.Files.walkFileTree;
@@ -55,7 +60,25 @@ import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
/***
*
*/
+@RunWith(Parameterized.class)
public class IgniteNodeStoppedDuringDisableWALTest extends GridCommonAbstractTest {
+
+ /** Crash point. */
+ private NodeStopPoint nodeStopPoint;
+
+ /**
+ * Default constructor to avoid BeforeFirstAndAfterLastTestRule.
+ */
+ private IgniteNodeStoppedDuringDisableWALTest() {
+ }
+
+ /**
+ * @param nodeStopPoint Crash point.
+ */
+ public IgniteNodeStoppedDuringDisableWALTest(NodeStopPoint nodeStopPoint) {
+ this.nodeStopPoint = nodeStopPoint;
+ }
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(name);
@@ -89,13 +112,11 @@ public class IgniteNodeStoppedDuringDisableWALTest extends GridCommonAbstractTes
*/
@Test
public void test() throws Exception {
- for (NodeStopPoint nodeStopPoint : NodeStopPoint.values()) {
- testStopNodeWithDisableWAL(nodeStopPoint);
+ testStopNodeWithDisableWAL(nodeStopPoint);
- stopAllGrids();
+ stopAllGrids();
- cleanPersistenceDir();
- }
+ cleanPersistenceDir();
}
/**
@@ -164,7 +185,7 @@ public class IgniteNodeStoppedDuringDisableWALTest extends GridCommonAbstractTes
try (IgniteDataStreamer<Integer, Integer> st = ig0.dataStreamer(DEFAULT_CACHE_NAME)) {
st.allowOverwrite(true);
- for (int i = 0; i < 10_000; i++)
+ for (int i = 0; i < GridTestUtils.SF.apply(10_000); i++)
st.addData(i, -i);
}
@@ -239,6 +260,14 @@ public class IgniteNodeStoppedDuringDisableWALTest extends GridCommonAbstractTes
}
/**
+ * @return Node stop point.
+ */
+ @Parameterized.Parameters(name = "{0}")
+ public static Iterable<Object[]> providedTestData() {
+ return Arrays.stream(NodeStopPoint.values()).map(it -> new Object[] {it}).collect(Collectors.toList());
+ }
+
+ /**
* Crash point.
*/
private enum NodeStopPoint {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
index 5d25da2..3c76b47 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
@@ -26,8 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -44,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAhea
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.MvccFeatureChecker;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
@@ -52,15 +51,14 @@ import org.junit.Assume;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
/**
* Tests error recovery while node flushing
*/
public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends GridCommonAbstractTest {
/** */
- private static final String TEST_CACHE = "testCache";
-
- /** */
private static final int ITRS = 2000;
/** */
@@ -132,22 +130,23 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
cfg.setConsistentId(gridName);
- CacheConfiguration cacheCfg = new CacheConfiguration(TEST_CACHE)
- .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ cfg.setCacheConfiguration(
+ new CacheConfiguration(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(TRANSACTIONAL)
.setBackups(1)
- .setRebalanceMode(CacheRebalanceMode.SYNC)
- .setAffinity(new RendezvousAffinityFunction(false, 32));
+ .setRebalanceMode(SYNC)
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
+ );
- cfg.setCacheConfiguration(cacheCfg);
-
- DataStorageConfiguration memCfg = new DataStorageConfiguration()
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
- new DataRegionConfiguration().setMaxSize(2048L * 1024 * 1024).setPersistenceEnabled(true))
- .setWalMode(this.walMode())
+ new DataRegionConfiguration()
+ .setMaxSize(2048L * 1024 * 1024)
+ .setPersistenceEnabled(true))
+ .setWalMode(walMode())
.setWalSegmentSize(512 * 1024)
- .setWalBufferSize(512 * 1024);
-
- cfg.setDataStorageConfiguration(memCfg);
+ .setWalBufferSize(512 * 1024));
cfg.setFailureHandler(new StopNodeFailureHandler());
@@ -160,6 +159,7 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
* @throws Exception In case of fail
*/
@Test
+ @WithSystemProperty(key = "IGNITE_DISABLE_WAL_DURING_REBALANCING", value = "false")
public void testFailWhileStart() throws Exception {
failWhilePut(true);
}
@@ -170,6 +170,7 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
* @throws Exception In case of fail
*/
@Test
+ @WithSystemProperty(key = "IGNITE_DISABLE_WAL_DURING_REBALANCING", value = "false")
public void testFailAfterStart() throws Exception {
failWhilePut(false);
}
@@ -178,15 +179,15 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
* @throws Exception if failed.
*/
private void failWhilePut(boolean failWhileStart) throws Exception {
- final Ignite grid = startGridsMultiThreaded(gridCount());
+ Ignite ig = startGrids(gridCount());
- grid.cluster().active(true);
+ ig.cluster().active(true);
- IgniteCache<Object, Object> cache = grid.cache(TEST_CACHE);
+ IgniteCache<Object, Object> cache = ig.cache(DEFAULT_CACHE_NAME);
for (int i = 0; i < ITRS; i++) {
while (!Thread.currentThread().isInterrupted()) {
- try (Transaction tx = grid.transactions().txStart(
+ try (Transaction tx = ig.transactions().txStart(
TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
cache.put(i, "testValue" + i);
@@ -208,7 +209,7 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
setFileIOFactory(grid(gridCount()).context().cache().context().wal());
- grid.cluster().setBaselineTopology(grid.cluster().topologyVersion());
+ ig.cluster().setBaselineTopology(ig.cluster().topologyVersion());
awaitPartitionMapExchange();
}
@@ -224,7 +225,7 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
// We should await successful stop of node.
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- return grid.cluster().nodes().size() == gridCount();
+ return ig.cluster().nodes().size() == gridCount();
}
}, getTestTimeout());
@@ -234,11 +235,9 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
Ignite grid0 = startGrids(gridCount() + 1);
- setFileIOFactory(grid(gridCount()).context().cache().context().wal());
-
grid0.cluster().active(true);
- cache = grid0.cache(TEST_CACHE);
+ cache = grid0.cache(DEFAULT_CACHE_NAME);
for (int i = 0; i < ITRS; i++)
assertEquals(cache.get(i), "testValue" + i);
@@ -277,7 +276,9 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
return new FileIODecorator(delegate) {
/** {@inheritDoc} */
@Override public int write(ByteBuffer srcBuf) throws IOException {
- if (fail != null && fail.get())
+ System.out.println(">>>!!!! W "+file.getName());
+
+ if (fail != null && file.getName().endsWith(".wal") && fail.get())
throw new IOException("No space left on device");
return super.write(srcBuf);
@@ -285,7 +286,9 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
/** {@inheritDoc} */
@Override public MappedByteBuffer map(int sizeBytes) throws IOException {
- if (fail != null && fail.get())
+ System.out.println(">>>!!!! M "+file.getName());
+
+ if (fail != null && file.getName().endsWith(".wal") && fail.get())
throw new IOException("No space left on deive");
return delegate.map(sizeBytes);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
index 0c71174..c019ecc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.persistence.CheckpointWritePr
import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
@@ -92,7 +93,7 @@ public class IgnitePageMemReplaceDelayedWriteUnitTest {
AtomicInteger totalEvicted = new AtomicInteger();
- ReplacedPageWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> {
+ PageStoreWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> {
log.info("Evicting " + fullPageId);
assert getLockedPages(fullPageId).contains(fullPageId);
@@ -149,7 +150,7 @@ public class IgnitePageMemReplaceDelayedWriteUnitTest {
AtomicInteger totalEvicted = new AtomicInteger();
- ReplacedPageWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> {
+ PageStoreWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> {
log.info("Evicting " + fullPageId);
assert getSegment(fullPageId).writeLock().isHeldByCurrentThread();
@@ -214,7 +215,7 @@ public class IgnitePageMemReplaceDelayedWriteUnitTest {
* @return implementation for test
*/
@NotNull
- private PageMemoryImpl createPageMemory(IgniteConfiguration cfg, ReplacedPageWriter pageWriter, int pageSize) {
+ private PageMemoryImpl createPageMemory(IgniteConfiguration cfg, PageStoreWriter pageWriter, int pageSize) {
IgniteCacheDatabaseSharedManager db = mock(GridCacheDatabaseSharedManager.class);
when(db.checkpointLockIsHeldByThread()).thenReturn(true);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index 9976b61..9f38348 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -35,18 +37,21 @@ import org.apache.ignite.internal.mem.DirectMemoryProvider;
import org.apache.ignite.internal.mem.IgniteOutOfMemoryException;
import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagemem.FullPageId;
-import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
+import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
import org.apache.ignite.internal.util.lang.GridInClosure3X;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
@@ -58,6 +63,8 @@ import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
import org.junit.Test;
import org.mockito.Mockito;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
import static org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl.CHECKPOINT_POOL_OVERFLOW_ERROR_MSG;
import static org.apache.ignite.internal.processors.database.DataRegionMetricsSelfTest.NO_OP_METRICS;
@@ -83,7 +90,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
try {
while (!Thread.currentThread().isInterrupted())
- memory.allocatePage(1, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX);
+ memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
}
catch (IgniteOutOfMemoryException ignore) {
//Success
@@ -103,7 +110,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
try {
while (!Thread.currentThread().isInterrupted()) {
- long pageId = memory.allocatePage(1, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX);
+ long pageId = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
FullPageId fullPageId = new FullPageId(pageId, 1);
@@ -173,6 +180,170 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testCheckpointProtocolWriteDirtyPageAfterWriteUnlock() throws Exception {
+ TestPageStoreManager pageStoreMgr = new TestPageStoreManager();
+
+ // Create a 1 mb page memory.
+ PageMemoryImpl memory = createPageMemory(
+ 1,
+ PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED,
+ pageStoreMgr,
+ pageStoreMgr
+ );
+
+ int initPageCnt = 10;
+
+ List<FullPageId> allocated = new ArrayList<>(initPageCnt);
+
+ for (int i = 0; i < initPageCnt; i++) {
+ long id = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
+
+ FullPageId fullId = new FullPageId(id, 1);
+
+ allocated.add(fullId);
+
+ writePage(memory, fullId, (byte)1);
+ }
+
+ doCheckpoint(memory.beginCheckpoint(), memory, pageStoreMgr);
+
+ FullPageId cowPageId = allocated.get(0);
+
+ // Mark some pages as dirty.
+ writePage(memory, cowPageId, (byte)2);
+
+ GridMultiCollectionWrapper<FullPageId> cpPages = memory.beginCheckpoint();
+
+ assertEquals(1, cpPages.size());
+
+ // At this point COW mechanics kicks in.
+ writePage(memory, cowPageId, (byte)3);
+
+ doCheckpoint(cpPages, memory, pageStoreMgr);
+
+ byte[] data = pageStoreMgr.storedPages.get(cowPageId);
+
+ for (int i = PageIO.COMMON_HEADER_END; i < PAGE_SIZE; i++)
+ assertEquals(2, data[i]);
+ }
+
+ /**
+ * @param cpPages Checkpoint pages acuiqred by {@code beginCheckpoint()}.
+ * @param memory Page memory.
+ * @param pageStoreMgr Test page store manager.
+ * @throws Exception If failed.
+ */
+ private void doCheckpoint(
+ GridMultiCollectionWrapper<FullPageId> cpPages,
+ PageMemoryImpl memory,
+ TestPageStoreManager pageStoreMgr
+ ) throws Exception {
+ PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> {
+ assertNotNull(tag);
+
+ pageStoreMgr.write(fullPageId.groupId(), fullPageId.pageId(), buf, 1);
+ };
+
+ for (FullPageId cpPage : cpPages) {
+ byte[] data = new byte[PAGE_SIZE];
+
+ ByteBuffer buf = ByteBuffer.wrap(data);
+
+ memory.checkpointWritePage(cpPage, buf, pageStoreWriter, null);
+ }
+
+ memory.finishCheckpoint();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testCheckpointProtocolCannotReplaceUnwrittenPage() throws Exception {
+ TestPageStoreManager pageStoreMgr = new TestPageStoreManager();
+
+ // Create a 1 mb page memory.
+ PageMemoryImpl memory = createPageMemory(
+ 1,
+ PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED,
+ pageStoreMgr,
+ pageStoreMgr);
+
+ int initPageCnt = 500;
+
+ List<FullPageId> allocated = new ArrayList<>(initPageCnt);
+
+ for (int i = 0; i < initPageCnt; i++) {
+ long id = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
+
+ FullPageId fullId = new FullPageId(id, 1);
+ allocated.add(fullId);
+
+ writePage(memory, fullId, (byte)1);
+ }
+
+ // CP Write lock.
+ memory.beginCheckpoint();
+ // CP Write unlock.
+
+ byte[] buf = new byte[PAGE_SIZE];
+
+ memory.checkpointWritePage(allocated.get(0), ByteBuffer.wrap(buf),
+ (fullPageId, buf0, tag) -> {
+ assertNotNull(tag);
+
+ boolean oom = false;
+
+ try {
+ // Try force page replacement.
+ while (true) {
+ memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
+ }
+ }
+ catch (IgniteOutOfMemoryException ex) {
+ oom = true;
+ }
+
+ assertTrue("Should oom before check replaced page.", oom);
+
+ assertTrue("Missing page: " + fullPageId, memory.hasLoadedPage(fullPageId));
+ }
+ , null);
+ }
+
+ /**
+ * @param mem Page memory.
+ * @param fullPageId Full page ID to write.
+ * @param val Value to write.
+ * @throws Exception If failed.
+ */
+ private void writePage(PageMemoryImpl mem, FullPageId fullPageId, byte val) throws Exception {
+ int grpId = fullPageId.groupId();
+ long pageId = fullPageId.pageId();
+ long page = mem.acquirePage(grpId, pageId);
+
+ try {
+ long ptr = mem.writeLock(grpId, pageId, page);
+
+ try {
+ DummyPageIO.VERSIONS.latest().initNewPage(ptr, pageId, PAGE_SIZE);
+
+ for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++)
+ PageUtils.putByte(ptr, i, val);
+ }
+ finally {
+ mem.writeUnlock(grpId, pageId, page, Boolean.FALSE, true);
+ }
+ }
+ finally {
+ mem.releasePage(grpId, pageId, page);
+ }
+ }
+
+ /**
* @throws Exception If failed.
*/
private void testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy plc) throws Exception {
@@ -181,7 +352,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
List<FullPageId> pages = new ArrayList<>();
for (int i = 0; i < (MAX_SIZE - 10) * MB / PAGE_SIZE / 2; i++) {
- long pageId = memory.allocatePage(1, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX);
+ long pageId = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
FullPageId fullPageId = new FullPageId(pageId, 1);
@@ -195,12 +366,15 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
CheckpointMetricsTracker mockTracker = Mockito.mock(CheckpointMetricsTracker.class);
for (FullPageId checkpointPage : pages)
- memory.getForCheckpoint(checkpointPage, ByteBuffer.allocate(PAGE_SIZE), mockTracker);
+ memory.checkpointWritePage(checkpointPage, ByteBuffer.allocate(PAGE_SIZE),
+ (fullPageId, buffer, tag) -> {
+ // No-op.
+ }, mockTracker);
memory.finishCheckpoint();
for (int i = (int)((MAX_SIZE - 10) * MB / PAGE_SIZE / 2); i < (MAX_SIZE - 20) * MB / PAGE_SIZE; i++) {
- long pageId = memory.allocatePage(1, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX);
+ long pageId = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
FullPageId fullPageId = new FullPageId(pageId, 1);
@@ -270,11 +444,31 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
* @param throttlingPlc Throttling Policy.
* @throws Exception If creating mock failed.
*/
- private PageMemoryImpl createPageMemory(PageMemoryImpl.ThrottlingPolicy throttlingPlc) throws Exception {
+ private PageMemoryImpl createPageMemory(
+ PageMemoryImpl.ThrottlingPolicy throttlingPlc) throws Exception {
+ return createPageMemory(
+ MAX_SIZE,
+ throttlingPlc,
+ new NoOpPageStoreManager(),
+ (fullPageId, byteBuf, tag) -> {
+ assert false : "No page replacement (rotation with disk) should happen during the test";
+ });
+ }
+
+ /**
+ * @param throttlingPlc Throttling Policy.
+ * @throws Exception If creating mock failed.
+ */
+ private PageMemoryImpl createPageMemory(
+ int maxSize,
+ PageMemoryImpl.ThrottlingPolicy throttlingPlc,
+ IgnitePageStoreManager mgr,
+ PageStoreWriter replaceWriter
+ ) throws Exception {
long[] sizes = new long[5];
for (int i = 0; i < sizes.length; i++)
- sizes[i] = MAX_SIZE * MB / 4;
+ sizes[i] = maxSize * MB / 4;
sizes[4] = 5 * MB;
@@ -304,7 +498,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
null,
null,
null,
- new NoOpPageStoreManager(),
+ mgr,
new NoOpWALManager(),
null,
new IgniteCacheDatabaseSharedManager(),
@@ -334,9 +528,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
sizes,
sharedCtx,
PAGE_SIZE,
- (fullPageId, byteBuf, tag) -> {
- assert false : "No page replacement (rotation with disk) should happen during the test";
- },
+ replaceWriter,
new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
@Override public void applyx(Long page, FullPageId fullId, PageMemoryEx pageMem) {
}
@@ -356,4 +548,42 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
return mem;
}
+
+ /**
+ *
+ */
+ private static class TestPageStoreManager extends NoOpPageStoreManager implements PageStoreWriter {
+ /** */
+ private Map<FullPageId, byte[]> storedPages = new HashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public void read(int grpId, long pageId, ByteBuffer pageBuf) throws IgniteCheckedException {
+ FullPageId fullPageId = new FullPageId(pageId, grpId);
+
+ byte[] bytes = storedPages.get(fullPageId);
+
+ if (bytes != null)
+ pageBuf.put(bytes);
+ else
+ pageBuf.put(new byte[PAGE_SIZE]);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(int grpId, long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException {
+ byte[] data = new byte[PAGE_SIZE];
+
+ pageBuf.get(data);
+
+ storedPages.put(new FullPageId(pageId, grpId), data);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writePage(FullPageId fullPageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException {
+ byte[] data = new byte[PAGE_SIZE];
+
+ pageBuf.get(data);
+
+ storedPages.put(fullPageId, data);
+ }
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java
index 49234e9..59029d2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.database;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -55,7 +57,7 @@ import org.junit.Test;
*/
public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
/** */
- private static final int KEYS_COUNT = 20_000;
+ private static final int KEYS_COUNT = SF.applyLB(10_000, 2_000);
/**
* @return Ignite instance for testing.
@@ -91,7 +93,7 @@ public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
Map<Integer, DbValue> map = new HashMap<>();
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < SF.applyLB(5, 3); i++) {
info("Iteration: " + i);
info("Grow...");
@@ -146,7 +148,7 @@ public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
public void testRandomRemove() throws Exception {
IgniteCache<Integer, DbValue> cache = cache(DEFAULT_CACHE_NAME);
- final int cnt = 50_000;
+ final int cnt = SF.apply(30_000);
long seed = System.nanoTime();
@@ -161,9 +163,6 @@ public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
for (int i : keys) {
DbValue v0 = new DbValue(i, "test-value", i);
-// if (i % 1000 == 0)
-// X.println(" --> " + i);
-
cache.put(i, v0);
assertEquals(v0, cache.get(i));
@@ -173,11 +172,8 @@ public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
X.println("Rmv start");
- for (int i : keys) {
-// X.println(" --> " + i);
-
+ for (int i : keys)
assertTrue(cache.remove(i));
- }
}
/**
@@ -684,14 +680,7 @@ public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
for (int i = 0; i < cnt; i++)
keys[i] = i;
- for (int i = 0; i < cnt; i++) {
- int a = rnd.nextInt(cnt);
- int b = rnd.nextInt(cnt);
-
- int k = keys[a];
- keys[a] = keys[b];
- keys[b] = k;
- }
+ Collections.shuffle(Arrays.asList(keys));
return keys;
}
@@ -846,7 +835,7 @@ public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
long seed = System.currentTimeMillis();
- int iterations = SF.apply(MvccFeatureChecker.forcedMvcc() ? 100_000 : 300_000);
+ int iterations = SF.apply(MvccFeatureChecker.forcedMvcc() ? 30_000 : 90_000);
X.println("Seed: " + seed);
@@ -880,8 +869,8 @@ public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
assertEquals(map.size(), cache.size());
- for (Integer key : map.keySet())
- assertEquals(map.get(key), cache.get(key));
+ for (Cache.Entry<Integer, DbValue> entry : cache.query(new ScanQuery<Integer, DbValue>()))
+ assertEquals(map.get(entry.getKey()), entry.getValue());
}
@Test
diff --git a/modules/core/src/test/java/org/apache/ignite/session/GridSessionSetJobAttributeWaitListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/session/GridSessionSetJobAttributeWaitListenerSelfTest.java
index 8bfbd30..8b477af 100644
--- a/modules/core/src/test/java/org/apache/ignite/session/GridSessionSetJobAttributeWaitListenerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/session/GridSessionSetJobAttributeWaitListenerSelfTest.java
@@ -43,6 +43,7 @@ import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.resources.TaskSessionResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
@@ -56,7 +57,7 @@ public class GridSessionSetJobAttributeWaitListenerSelfTest extends GridCommonAb
public static final int SPLIT_COUNT = 5;
/** */
- private static final long WAIT_TIME = 20000;
+ private static final long WAIT_TIME = SF.applyLB(10_000, 5_000);
/** */
private static volatile CountDownLatch startSignal;
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 7ed3981..94c0812 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -55,6 +55,7 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Ignore;
import org.junit.Test;
@@ -202,7 +203,7 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
CLIENT_GRID_CNT,
"client-restart");
- Thread.sleep(getTestTimeout() - 60 * 1000);
+ Thread.sleep(SF.applyLB(10_000, 30_000));
done.set(true);
@@ -382,12 +383,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
srvs - 1,
"server-restart");
- final long timeToExec = getTestTimeout() - 60_000;
-
- final long endTime = System.currentTimeMillis() + timeToExec;
+ final long endTime = System.currentTimeMillis() + SF.applyLB(10_000, 30_000);
while (System.currentTimeMillis() < endTime) {
- Thread.sleep(3000);
+ Thread.sleep(1000);
if (error.get() != null) {
Throwable err = error.get();
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index d93e427..83afae3 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testframework.junits;
import java.io.ObjectStreamException;
import java.io.Serializable;
import java.lang.annotation.Annotation;
+import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
@@ -2578,7 +2579,11 @@ public abstract class GridAbstractTest extends JUnitAssertAware {
@Override public Statement apply(Statement base, Description desc) {
return new Statement() {
@Override public void evaluate() throws Throwable {
- GridAbstractTest fixtureInstance = (GridAbstractTest)desc.getTestClass().newInstance();
+ Constructor<?> testConstructor = desc.getTestClass().getDeclaredConstructor();
+
+ testConstructor.setAccessible(true);
+
+ GridAbstractTest fixtureInstance = (GridAbstractTest)testConstructor.newInstance();
fixtureInstance.evaluateInsideFixture(base);
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java
index 525f16a..265d1ed 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.failover.always.AlwaysFailoverSpi;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -72,10 +73,10 @@ public class IgniteCacheLockPartitionOnAffinityRunAbstractTest extends GridCache
protected static final int ORGS_COUNT_PER_NODE = 2;
/** Test duration. */
- protected static final long TEST_DURATION = 2 * 60_000;
+ protected static final long TEST_DURATION = 40_000;
/** Test timeout. */
- protected static final long TEST_TIMEOUT = TEST_DURATION + 2 * 60_000;
+ protected static final long TEST_TIMEOUT = TEST_DURATION + 60_000;
/** Timeout between restart of a node. */
protected static final long RESTART_TIMEOUT = 3_000;
@@ -182,7 +183,7 @@ public class IgniteCacheLockPartitionOnAffinityRunAbstractTest extends GridCache
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
- endTime = System.currentTimeMillis() + TEST_DURATION;
+ endTime = System.currentTimeMillis() + SF.applyLB((int)TEST_DURATION, 20_000);
super.beforeTest();
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteTcBotInitNewPageTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteTcBotInitNewPageTest.java
index 13b6186..429be3d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteTcBotInitNewPageTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteTcBotInitNewPageTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence.db;
import com.google.common.base.Strings;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -27,6 +28,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -56,13 +58,19 @@ public class IgniteTcBotInitNewPageTest extends GridCommonAbstractTest {
IgniteCache<Object, Object> cache = ignite.cache(CACHE);
- for (int i = 0; i < 1_000_000; i++)
- cache.put(i, i);
+ try (IgniteDataStreamer<Object, Object> ds = ignite.dataStreamer(CACHE)) {
+ for (int i = 0; i < SF.apply(1_000_000); i++)
+ ds.addData(i, i);
+ }
cache.clear();
- for (int i = 0; i < 1_000; i++)
- cache.put(i, Strings.repeat("Apache Ignite", 1000));
+ String longStr = Strings.repeat("Apache Ignite", SF.apply(1000));
+
+ try (IgniteDataStreamer<Object, Object> ds = ignite.dataStreamer(CACHE)) {
+ for (int i = 0; i < 1_000; i++)
+ ds.addData(i, longStr);
+ }
}
/** {@inheritDoc} */
@@ -76,7 +84,7 @@ public class IgniteTcBotInitNewPageTest extends GridCommonAbstractTest {
cfg.setCacheConfiguration(ccfg);
DataRegionConfiguration regCfg = new DataRegionConfiguration()
- .setMaxSize(2L * 1024 * 1024 * 1024)
+ .setMaxSize(SF.apply(128) * 1024 * 1024)
.setPersistenceEnabled(true);
DataStorageConfiguration dsCfg = new DataStorageConfiguration()
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index 540ad50..65c1bed 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
+import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
@@ -89,7 +90,6 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.Compactab
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.typedef.F;
@@ -104,6 +104,7 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy;
@@ -636,14 +637,14 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>("cache-" + i);
// We can get 'too many open files' with default number of partitions.
- ccfg.setAffinity(new RendezvousAffinityFunction(false, 128));
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
IgniteCache<Object, Object> cache = ignite.getOrCreateCache(ccfg);
cache.put(i, i);
}
- final long endTime = System.currentTimeMillis() + 30_000;
+ final long endTime = System.currentTimeMillis() + SF.applyLB(30_000, 5_000);
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() {
@@ -666,6 +667,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
}
fut.get();
+
+ ignite.context().cache().context().database().wakeupForCheckpoint("final-test-checkpoint").get();
}
finally {
customFailureDetectionTimeout = prevFDTimeout;
@@ -678,7 +681,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
private void checkWalRolloverMultithreaded() throws Exception {
walSegmentSize = 2 * 1024 * 1024;
- final long endTime = System.currentTimeMillis() + 60 * 1000;
+ final long endTime = System.currentTimeMillis() + SF.apply(50 * 1000);
IgniteEx ignite = startGrid(1);
@@ -690,7 +693,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
@Override public Void call() {
Random rnd = ThreadLocalRandom.current();
- while (U.currentTimeMillis() < endTime)
+ while (System.currentTimeMillis() < endTime)
cache.put(rnd.nextInt(50_000), rnd.nextInt());
return null;
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryCommunicationFailureTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryCommunicationFailureTest.java
index 36f9f63..f1a9c0e 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryCommunicationFailureTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryCommunicationFailureTest.java
@@ -399,7 +399,7 @@ public class ZookeeperDiscoveryCommunicationFailureTest extends ZookeeperDiscove
int nodeIdx = 10;
- for (int i = 0; i < GridTestUtils.SF.applyLB(10, 2); i++) {
+ for (int i = 0; i < GridTestUtils.SF.applyLB(4, 2); i++) {
info("Iteration: " + i);
for (Ignite node : G.allGrids())
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryTopologyChangeAndReconnectTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryTopologyChangeAndReconnectTest.java
index f7de039..eb7cfaf 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryTopologyChangeAndReconnectTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryTopologyChangeAndReconnectTest.java
@@ -111,7 +111,7 @@ public class ZookeeperDiscoveryTopologyChangeAndReconnectTest extends ZookeeperD
if (closeClientSock)
testSockNio = true;
- long stopTime = System.currentTimeMillis() + GridTestUtils.SF.applyLB(60_000, 5_000);
+ long stopTime = System.currentTimeMillis() + GridTestUtils.SF.applyLB(30_000, 5_000);
AtomicBoolean stop = new AtomicBoolean();
@@ -847,7 +847,7 @@ public class ZookeeperDiscoveryTopologyChangeAndReconnectTest extends ZookeeperD
int nextNodeIdx = 0;
int nextCacheIdx = 0;
- long stopTime = System.currentTimeMillis() + GridTestUtils.SF.applyLB(60_000, 5_000);
+ long stopTime = System.currentTimeMillis() + GridTestUtils.SF.applyLB(30_000, 5_000);
int MAX_NODES = 20;
int MAX_CACHES = 10;