You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/10/13 17:43:16 UTC
[31/50] [abbrv] ignite git commit: IGNITE-6334 Throttle writing
threads during ongoing checkpoint - Fixes #2710.
IGNITE-6334 Throttle writing threads during ongoing checkpoint - Fixes #2710.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/520c2e36
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/520c2e36
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/520c2e36
Branch: refs/heads/ignite-2.1.5-p1
Commit: 520c2e368baf4cb79912a14663ea3fd1c7da6487
Parents: 73e1578
Author: Ivan Rakov <iv...@gmail.com>
Authored: Fri Sep 22 12:40:22 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Sep 22 12:44:22 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 5 +
.../PersistentStoreConfiguration.java | 28 +-
.../GridCacheDatabaseSharedManager.java | 48 ++-
.../persistence/pagemem/PageMemoryImpl.java | 96 +++++-
.../persistence/pagemem/PagesWriteThrottle.java | 104 ++++++
.../pagemem/BPlusTreePageMemoryImplTest.java | 4 +-
.../BPlusTreeReuseListPageMemoryImplTest.java | 3 +-
.../MetadataStoragePageMemoryImplTest.java | 4 +-
.../pagemem/PageMemoryImplNoLoadTest.java | 4 +-
.../persistence/pagemem/PageMemoryImplTest.java | 4 +-
.../pagemem/PagesWriteThrottleSandboxTest.java | 264 +++++++++++++++
.../pagemem/PagesWriteThrottleSmokeTest.java | 322 +++++++++++++++++++
.../ignite/testsuites/IgnitePdsTestSuite.java | 4 +
13 files changed, 866 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/520c2e36/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 628b165..f627e24 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Properties;
import javax.net.ssl.HostnameVerifier;
import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
@@ -718,6 +719,10 @@ public final class IgniteSystemProperties {
*/
public static final String IGNITE_WAL_LOG_TX_RECORDS = "IGNITE_WAL_LOG_TX_RECORDS";
+ /** If this property is set, {@link PersistentStoreConfiguration#writeThrottlingEnabled} will be overridden to true
+ * independent of initial value in configuration. */
+ public static final String IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED = "IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED";
+
/**
* Enforces singleton.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/520c2e36/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
index abca5a5..c44e92d 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
@@ -84,6 +84,9 @@ public class PersistentStoreConfiguration implements Serializable {
/** Default wal archive directory. */
public static final String DFLT_WAL_ARCHIVE_PATH = "db/wal/archive";
+ /** Default write throttling enabled. */
+ public static final boolean DFLT_WRITE_THROTTLING_ENABLED = false;
+
/** */
private String persistenceStorePath;
@@ -162,6 +165,11 @@ public class PersistentStoreConfiguration implements Serializable {
private long walAutoArchiveAfterInactivity = -1;
/**
+ * If true, threads that generate dirty pages too fast during ongoing checkpoint will be throttled.
+ */
+ private boolean writeThrottlingEnabled = DFLT_WRITE_THROTTLING_ENABLED;
+
+ /**
* Returns a path the root directory where the Persistent Store will persist data and indexes.
*/
public String getPersistentStorePath() {
@@ -240,7 +248,7 @@ public class PersistentStoreConfiguration implements Serializable {
/**
* Sets a number of threads to use for the checkpointing purposes.
*
- * @param checkpointingThreads Number of checkpointing threads. One thread is used by default.
+ * @param checkpointingThreads Number of checkpointing threads. Four threads are used by default.
* @return {@code this} for chaining.
*/
public PersistentStoreConfiguration setCheckpointingThreads(int checkpointingThreads) {
@@ -402,6 +410,24 @@ public class PersistentStoreConfiguration implements Serializable {
}
/**
+ * Gets flag indicating whether write throttling is enabled.
+ */
+ public boolean isWriteThrottlingEnabled() {
+ return writeThrottlingEnabled;
+ }
+
+ /**
+ * Sets flag indicating whether write throttling is enabled.
+ *
+ * @param writeThrottlingEnabled Write throttling enabled flag.
+ */
+ public PersistentStoreConfiguration setWriteThrottlingEnabled(boolean writeThrottlingEnabled) {
+ this.writeThrottlingEnabled = writeThrottlingEnabled;
+
+ return this;
+ }
+
+ /**
* Gets the length of the time interval for rate-based metrics. This interval defines a window over which
* hits will be tracked. Default value is {@link #DFLT_RATE_TIME_INTERVAL_MILLIS}.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/520c2e36/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 277143c..1b5dae6 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -302,6 +302,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** */
private ObjectName persistenceMetricsMbeanName;
+ /** Counter for written checkpoint pages. Not null only if checkpoint is running. */
+ private volatile AtomicInteger writtenPagesCntr = null;
+
+ /** Number of pages in current checkpoint. */
+ private volatile int currCheckpointPagesCnt;
+
/**
* @param ctx Kernal context.
*/
@@ -666,6 +672,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
"Checkpoint page buffer size is too big, setting to an adjusted cache size [size="
+ U.readableSize(cacheSize, false) + ", memPlc=" + plcCfg.getName() + ']');
+ boolean writeThrottlingEnabled = persistenceCfg.isWriteThrottlingEnabled();
+
+ if (IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED, false))
+ writeThrottlingEnabled = true;
+
PageMemoryImpl pageMem = new PageMemoryImpl(
memProvider,
calculateFragmentSizes(
@@ -698,7 +709,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
},
this,
- memMetrics
+ memMetrics,
+ writeThrottlingEnabled
);
memMetrics.pageMemory(pageMem);
@@ -940,7 +952,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
- * Gets the checkpoint read lock. While this lock is held, checkpoint thread will not acquiSnapshotWorkerre memory state.
+ * Gets the checkpoint read lock. While this lock is held, checkpoint thread will not acquireSnapshotWorker memory state.
*/
@SuppressWarnings("LockAcquiredButNotSafelyReleased")
@Override public void checkpointReadLock() {
@@ -1902,6 +1914,20 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
+ * Counter for written checkpoint pages. Not null only if checkpoint is running.
+ */
+ public AtomicInteger writtenPagesCounter() {
+ return writtenPagesCntr;
+ }
+
+ /**
+ * @return Number of pages in current checkpoint. If checkpoint is not running, returns 0.
+ */
+ public int currentCheckpointPagesCount() {
+ return currCheckpointPagesCnt;
+ }
+
+ /**
* @param cpTs Checkpoint timestamp.
* @param cpId Checkpoint ID.
* @param type Checkpoint type.
@@ -2034,6 +2060,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
Checkpoint chp = markCheckpointBegin(tracker);
+ currCheckpointPagesCnt = chp.pagesSize;
+
+ writtenPagesCntr = new AtomicInteger();
+
boolean interrupted = true;
try {
@@ -2045,7 +2075,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
asyncRunner == null ? 1 : chp.cpPages.collectionsSize());
tracker.onPagesWriteStart();
- final AtomicInteger writtenPagesCtr = new AtomicInteger();
+
final int totalPagesToWriteCnt = chp.cpPages.size();
if (asyncRunner != null) {
@@ -2055,7 +2085,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
chp.cpPages.innerCollection(i),
updStores,
doneWriteFut,
- writtenPagesCtr,
totalPagesToWriteCnt
);
@@ -2074,7 +2103,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
chp.cpPages,
updStores,
doneWriteFut,
- writtenPagesCtr,
totalPagesToWriteCnt);
write.run();
@@ -2398,6 +2426,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
chp.cpEntry.checkpointMark(),
null,
CheckpointEntryType.END);
+
+ writtenPagesCntr = null;
+
+ currCheckpointPagesCnt = 0;
}
checkpointHist.onCheckpointFinished(chp);
@@ -2494,9 +2526,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** */
private CountDownFuture doneFut;
- /** Counter for all written pages. May be shared between several workers */
- private AtomicInteger writtenPagesCntr;
-
/** Total pages to write, counter may be greater than {@link #writePageIds} size*/
private final int totalPagesToWrite;
@@ -2506,7 +2535,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @param writePageIds Collection of page IDs to write.
* @param updStores
* @param doneFut
- * @param writtenPagesCntr all written pages counter, may be shared between several write tasks
* @param totalPagesToWrite total pages to be written under this checkpoint
*/
private WriteCheckpointPages(
@@ -2514,13 +2542,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
final Collection<FullPageId> writePageIds,
final GridConcurrentHashSet<PageStore> updStores,
final CountDownFuture doneFut,
- @NotNull final AtomicInteger writtenPagesCntr,
final int totalPagesToWrite) {
this.tracker = tracker;
this.writePageIds = writePageIds;
this.updStores = updStores;
this.doneFut = doneFut;
- this.writtenPagesCntr = writtenPagesCntr;
this.totalPagesToWrite = totalPagesToWrite;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/520c2e36/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index dbb64f8..1da17b5 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -56,6 +57,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.MemoryMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO;
@@ -179,6 +181,9 @@ public class PageMemoryImpl implements PageMemoryEx {
/** State checker. */
private final CheckpointLockStateChecker stateChecker;
+ /** Number of used pages in checkpoint buffer. */
+ private final AtomicInteger cpBufPagesCntr = new AtomicInteger(0);
+
/** */
private ExecutorService asyncRunner = new ThreadPoolExecutor(
0,
@@ -217,6 +222,12 @@ public class PageMemoryImpl implements PageMemoryEx {
/** Flush dirty page closure. When possible, will be called by evictPage(). */
private final GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker;
+ /** Pages write throttle. */
+ private PagesWriteThrottle writeThrottle;
+
+ /** Write throttle enabled flag. */
+ private boolean throttleEnabled;
+
/** */
private boolean pageEvictWarned;
@@ -232,6 +243,7 @@ public class PageMemoryImpl implements PageMemoryEx {
* @param pageSize Page size.
* @param flushDirtyPage Callback invoked when a dirty page is evicted.
* @param changeTracker Callback invoked to track changes in pages.
+ * @param throttleEnabled Write throttle enabled flag.
*/
public PageMemoryImpl(
DirectMemoryProvider directMemoryProvider,
@@ -241,7 +253,8 @@ public class PageMemoryImpl implements PageMemoryEx {
GridInClosure3X<FullPageId, ByteBuffer, Integer> flushDirtyPage,
GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker,
CheckpointLockStateChecker stateChecker,
- MemoryMetricsImpl memMetrics
+ MemoryMetricsImpl memMetrics,
+ boolean throttleEnabled
) {
assert sharedCtx != null;
@@ -253,6 +266,7 @@ public class PageMemoryImpl implements PageMemoryEx {
this.flushDirtyPage = flushDirtyPage;
this.changeTracker = changeTracker;
this.stateChecker = stateChecker;
+ this.throttleEnabled = throttleEnabled;
storeMgr = sharedCtx.pageStore();
walMgr = sharedCtx.wal();
@@ -290,7 +304,7 @@ public class PageMemoryImpl implements PageMemoryEx {
DirectMemoryRegion cpReg = regions.get(regs - 1);
- checkpointPool = new PagePool(regs - 1, cpReg);
+ checkpointPool = new PagePool(regs - 1, cpReg, cpBufPagesCntr);
long checkpointBuf = cpReg.size();
@@ -305,12 +319,14 @@ public class PageMemoryImpl implements PageMemoryEx {
totalAllocated += reg.size();
- segments[i] = new Segment(i, regions.get(i), checkpointPool.pages() / segments.length);
+ segments[i] = new Segment(i, regions.get(i), checkpointPool.pages() / segments.length, throttleEnabled);
pages += segments[i].pages();
totalTblSize += segments[i].tableSize();
}
+ initWriteThrottle();
+
if (log.isInfoEnabled())
log.info("Started page memory [memoryAllocated=" + U.readableSize(totalAllocated, false) +
", pages=" + pages +
@@ -319,6 +335,21 @@ public class PageMemoryImpl implements PageMemoryEx {
']');
}
+ /**
+ *
+ */
+ private void initWriteThrottle() {
+ if (!(sharedCtx.database() instanceof GridCacheDatabaseSharedManager)) {
+ log.error("Write throttle can't start. Unexpected class of database manager: " +
+ sharedCtx.database().getClass());
+
+ throttleEnabled = false;
+ }
+
+ if (throttleEnabled)
+ writeThrottle = new PagesWriteThrottle(this, (GridCacheDatabaseSharedManager)sharedCtx.database());
+ }
+
/** {@inheritDoc} */
@SuppressWarnings("OverlyStrongTypeCast")
@Override public void stop() throws IgniteException {
@@ -774,6 +805,18 @@ public class PageMemoryImpl implements PageMemoryEx {
return true;
}
+ /**
+ * @param dirtyRatioThreshold Throttle threshold.
+ */
+ boolean shouldThrottle(double dirtyRatioThreshold) {
+ for (Segment segment : segments) {
+ if (segment.shouldThrottle(dirtyRatioThreshold))
+ return true;
+ }
+
+ return false;
+ }
+
/** {@inheritDoc} */
@Override public GridMultiCollectionWrapper<FullPageId> beginCheckpoint() throws IgniteException {
Collection[] collections = new Collection[segments.length];
@@ -799,6 +842,9 @@ public class PageMemoryImpl implements PageMemoryEx {
@Override public void finishCheckpoint() {
for (Segment seg : segments)
seg.segCheckpointPages = null;
+
+ if (throttleEnabled)
+ writeThrottle.onFinishCheckpoint();
}
/** {@inheritDoc} */
@@ -1219,6 +1265,9 @@ public class PageMemoryImpl implements PageMemoryEx {
try {
rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, PageIdUtils.tag(pageId));
+
+ if (throttleEnabled && !restore && markDirty && !dirty)
+ writeThrottle.onMarkDirty(isInCheckpoint(fullId));
}
catch (AssertionError ex) {
StringBuilder sb = new StringBuilder(sysPageSize * 2);
@@ -1310,6 +1359,20 @@ public class PageMemoryImpl implements PageMemoryEx {
}
/**
+ * Number of used pages in checkpoint buffer.
+ */
+ public int checkpointBufferPagesCount() {
+ return cpBufPagesCntr.get();
+ }
+
+ /**
+ * Number of used pages in checkpoint buffer.
+ */
+ public int checkpointBufferPagesSize() {
+ return checkpointPool.pages();
+ }
+
+ /**
* This method must be called in synchronized context.
*
* @param absPtr Absolute pointer.
@@ -1385,6 +1448,9 @@ public class PageMemoryImpl implements PageMemoryEx {
/** Direct memory region. */
protected final DirectMemoryRegion region;
+ /** Pool pages counter. */
+ protected final AtomicInteger pagesCntr;
+
/** */
protected long lastAllocatedIdxPtr;
@@ -1397,10 +1463,12 @@ public class PageMemoryImpl implements PageMemoryEx {
/**
* @param idx Index.
* @param region Region
+ * @param pagesCntr Pages counter.
*/
- protected PagePool(int idx, DirectMemoryRegion region) {
+ protected PagePool(int idx, DirectMemoryRegion region, AtomicInteger pagesCntr) {
this.idx = idx;
this.region = region;
+ this.pagesCntr = pagesCntr;
long base = (region.address() + 7) & ~0x7;
@@ -1427,6 +1495,9 @@ public class PageMemoryImpl implements PageMemoryEx {
* @throws GridOffHeapOutOfMemoryException If failed to allocate new free page.
*/
private long borrowOrAllocateFreePage(long pageId) throws GridOffHeapOutOfMemoryException {
+ if (pagesCntr != null)
+ pagesCntr.getAndIncrement();
+
long relPtr = borrowFreePage();
return relPtr != INVALID_REL_PTR ? relPtr : allocateFreePage(pageId);
@@ -1500,6 +1571,9 @@ public class PageMemoryImpl implements PageMemoryEx {
assert !PageHeader.isAcquired(absPtr) : "Release pinned page: " + PageHeader.fullPageId(absPtr);
+ if (pagesCntr != null)
+ pagesCntr.getAndDecrement();
+
while (true) {
long freePageRelPtrMasked = GridUnsafe.getLong(freePageListPtr);
@@ -1580,8 +1654,9 @@ public class PageMemoryImpl implements PageMemoryEx {
/**
* @param region Memory region.
+ * @param throttlingEnabled Write throttling enabled flag.
*/
- private Segment(int idx, DirectMemoryRegion region, int cpPoolPages) {
+ private Segment(int idx, DirectMemoryRegion region, int cpPoolPages, boolean throttlingEnabled) {
long totalMemory = region.size();
int pages = (int)(totalMemory / sysPageSize);
@@ -1596,9 +1671,9 @@ public class PageMemoryImpl implements PageMemoryEx {
DirectMemoryRegion poolRegion = region.slice(memPerTbl + 8);
- pool = new PagePool(idx, poolRegion);
+ pool = new PagePool(idx, poolRegion, null);
- maxDirtyPages = Math.min(pool.pages() * 2 / 3, cpPoolPages);
+ maxDirtyPages = throttlingEnabled ? pool.pages() * 3 / 4 : Math.min(pool.pages() * 2 / 3, cpPoolPages);
}
/**
@@ -1609,6 +1684,13 @@ public class PageMemoryImpl implements PageMemoryEx {
}
/**
+ * @param dirtyRatioThreshold Throttle threshold.
+ */
+ private boolean shouldThrottle(double dirtyRatioThreshold) {
+ return ((double)dirtyPages.size()) / pages() > dirtyRatioThreshold;
+ }
+
+ /**
* @return Max number of pages this segment can allocate.
*/
private int pages() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/520c2e36/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
new file mode 100644
index 0000000..d0c67c7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
@@ -0,0 +1,104 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+
+/**
+ * Throttles threads that generate dirty pages during ongoing checkpoint.
+ * Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed.
+ */
+public class PagesWriteThrottle {
+ /** Page memory. */
+ private final PageMemoryImpl pageMemory;
+
+ /** Database manager. */
+ private final GridCacheDatabaseSharedManager dbSharedMgr;
+
+ /** Starting throttle time. Limits write speed to 1000 MB/s. */
+ private static final long STARTING_THROTTLE_NANOS = 4000;
+
+ /** Backoff ratio. Each next park will be this times longer. */
+ private static final double BACKOFF_RATIO = 1.05;
+
+ /** Exponential backoff counter. */
+ private final AtomicInteger exponentialBackoffCntr = new AtomicInteger(0);
+ /**
+ * @param pageMemory Page memory.
+ * @param dbSharedMgr Database manager.
+ */
+ public PagesWriteThrottle(PageMemoryImpl pageMemory, GridCacheDatabaseSharedManager dbSharedMgr) {
+ this.pageMemory = pageMemory;
+ this.dbSharedMgr = dbSharedMgr;
+ }
+
+ /**
+ *
+ */
+ public void onMarkDirty(boolean isInCheckpoint) {
+ assert dbSharedMgr.checkpointLockIsHeldByThread();
+
+ AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter();
+
+ if (writtenPagesCntr == null)
+ return; // Don't throttle if checkpoint is not running.
+
+ boolean shouldThrottle = false;
+
+ if (isInCheckpoint) {
+ int checkpointBufLimit = pageMemory.checkpointBufferPagesSize() * 2 / 3;
+
+ shouldThrottle = pageMemory.checkpointBufferPagesCount() > checkpointBufLimit;
+ }
+
+ if (!shouldThrottle) {
+ int cpWrittenPages = writtenPagesCntr.get();
+
+ int cpTotalPages = dbSharedMgr.currentCheckpointPagesCount();
+
+ if (cpWrittenPages == cpTotalPages) {
+ // Checkpoint is already in fsync stage, increasing maximum ratio of dirty pages to 3/4
+ shouldThrottle = pageMemory.shouldThrottle(3.0 / 4);
+ } else {
+ double dirtyRatioThreshold = ((double)cpWrittenPages) / cpTotalPages;
+
+ // Starting with 0.05 to avoid throttle right after checkpoint start
+ // 7/12 is maximum ratio of dirty pages
+ dirtyRatioThreshold = (dirtyRatioThreshold * 0.95 + 0.05) * 7 / 12;
+
+ shouldThrottle = pageMemory.shouldThrottle(dirtyRatioThreshold);
+ }
+ }
+
+ if (shouldThrottle) {
+ int throttleLevel = exponentialBackoffCntr.getAndIncrement();
+
+ LockSupport.parkNanos((long)(STARTING_THROTTLE_NANOS * Math.pow(BACKOFF_RATIO, throttleLevel)));
+ }
+ else
+ exponentialBackoffCntr.set(0);
+ }
+
+ /**
+ *
+ */
+ public void onFinishCheckpoint() {
+ exponentialBackoffCntr.set(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/520c2e36/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
index 6f58782..56d09f8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
@@ -82,7 +82,9 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest {
return true;
}
},
- new MemoryMetricsImpl(new MemoryPolicyConfiguration()));
+ new MemoryMetricsImpl(new MemoryPolicyConfiguration()),
+ false
+ );
mem.start();
http://git-wip-us.apache.org/repos/asf/ignite/blob/520c2e36/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
index b263d4f..39183b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
@@ -82,7 +82,8 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest
return true;
}
},
- new MemoryMetricsImpl(new MemoryPolicyConfiguration())
+ new MemoryMetricsImpl(new MemoryPolicyConfiguration()),
+ false
);
mem.start();
http://git-wip-us.apache.org/repos/asf/ignite/blob/520c2e36/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
index d9257bd..a427c63 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
@@ -97,6 +97,8 @@ public class MetadataStoragePageMemoryImplTest extends MetadataStorageSelfTest{
return true;
}
},
- new MemoryMetricsImpl(new MemoryPolicyConfiguration()));
+ new MemoryMetricsImpl(new MemoryPolicyConfiguration()),
+ false
+ );
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/520c2e36/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
index 1fff1f0..467ede4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
@@ -88,7 +88,9 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
return true;
}
},
- new MemoryMetricsImpl(new MemoryPolicyConfiguration()));
+ new MemoryMetricsImpl(new MemoryPolicyConfiguration()),
+ false
+ );
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/520c2e36/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index 0366eca..c5997fa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -110,7 +110,9 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
return true;
}
},
- new MemoryMetricsImpl(new MemoryPolicyConfiguration()));
+ new MemoryMetricsImpl(new MemoryPolicyConfiguration()),
+ false
+ );
mem.start();
http://git-wip-us.apache.org/repos/asf/ignite/blob/520c2e36/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java
new file mode 100644
index 0000000..409ab84
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java
@@ -0,0 +1,264 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.MemoryMetrics;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test to visualize and debug {@link PagesWriteThrottle}.
+ * Prints puts/gets rate, number of dirty pages, pages written in current checkpoint and pages in checkpoint buffer.
+ * Not intended to be part of any test suite.
+ */
+public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest {
+ /** Ip finder. */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache1";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+ discoverySpi.setIpFinder(ipFinder);
+
+ MemoryConfiguration dbCfg = new MemoryConfiguration();
+
+ dbCfg.setMemoryPolicies(new MemoryPolicyConfiguration()
+ .setMaxSize(4000L * 1024 * 1024)
+ .setName("dfltMemPlc")
+ .setMetricsEnabled(true));
+
+ dbCfg.setDefaultMemoryPolicyName("dfltMemPlc");
+
+ cfg.setMemoryConfiguration(dbCfg);
+
+ CacheConfiguration ccfg1 = new CacheConfiguration();
+
+ ccfg1.setName(CACHE_NAME);
+ ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ ccfg1.setAffinity(new RendezvousAffinityFunction(false, 64));
+
+ cfg.setCacheConfiguration(ccfg1);
+
+ cfg.setPersistentStoreConfiguration(
+ new PersistentStoreConfiguration()
+ .setWalMode(WALMode.BACKGROUND)
+ .setCheckpointingFrequency(20_000)
+ .setCheckpointingPageBufferSize(1000L * 1000 * 1000)
+ .setWriteThrottlingEnabled(true));
+
+ cfg.setConsistentId(gridName);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ deleteWorkFiles();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ deleteWorkFiles();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 100 * 60 * 1000;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testThrottle() throws Exception {
+ startGrids(1).active(true);
+
+ try {
+ final Ignite ig = ignite(0);
+
+ final int keyCnt = 4_000_000;
+
+ final AtomicBoolean run = new AtomicBoolean(true);
+
+ final HitRateMetrics getRate = new HitRateMetrics(5000, 5);
+
+ GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ while (run.get()) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int key = rnd.nextInt(keyCnt * 2);
+
+ ignite(0).cache(CACHE_NAME).get(key);
+
+ getRate.onHit();
+ }
+
+ return null;
+ }
+ }, 2, "read-loader");
+
+ final HitRateMetrics putRate = new HitRateMetrics(1000, 5);
+
+ GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ while (run.get()) {
+ long dirtyPages = 0;
+
+ for (MemoryMetrics m : ig.memoryMetrics())
+ if (m.getName().equals("dfltMemPlc"))
+ dirtyPages = m.getDirtyPages();
+
+ long cpBufPages = 0;
+
+ long cpWrittenPages;
+
+ AtomicInteger cntr = ((GridCacheDatabaseSharedManager)(((IgniteEx)ignite(0))
+ .context().cache().context().database())).writtenPagesCounter();
+
+ cpWrittenPages = cntr == null ? 0 : cntr.get();
+
+ try {
+ cpBufPages = ((PageMemoryImpl)((IgniteEx)ignite(0)).context().cache().context().database()
+ .memoryPolicy("dfltMemPlc").pageMemory()).checkpointBufferPagesCount();
+ }
+ catch (IgniteCheckedException e) {
+ e.printStackTrace();
+ }
+
+ System.out.println("@@@ putsPerSec=," + (putRate.getRate()) + ", getsPerSec=," + (getRate.getRate()) + ", dirtyPages=," + dirtyPages + ", cpWrittenPages=," + cpWrittenPages +", cpBufPages=," + cpBufPages);
+
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }, "metrics-view");
+
+ try (IgniteDataStreamer<Object, Object> ds = ig.dataStreamer(CACHE_NAME)) {
+ ds.allowOverwrite(true);
+
+ for (int i = 0; i < keyCnt * 10; i++) {
+ ds.addData(ThreadLocalRandom.current().nextInt(keyCnt), new TestValue(ThreadLocalRandom.current().nextInt(),
+ ThreadLocalRandom.current().nextInt()));
+
+ putRate.onHit();
+ }
+ }
+
+ run.set(false);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestValue implements Serializable {
+ /** */
+ private final int v1;
+
+ /** */
+ private final int v2;
+
+ /** */
+ private byte[] payload = new byte[400 + ThreadLocalRandom.current().nextInt(20)];
+
+ /**
+ * @param v1 Value 1.
+ * @param v2 Value 2.
+ */
+ private TestValue(int v1, int v2) {
+ this.v1 = v1;
+ this.v2 = v2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestValue val = (TestValue)o;
+
+ return v1 == val.v1 && v2 == val.v2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = v1;
+
+ res = 31 * res + v2;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestValue.class, this);
+ }
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void deleteWorkFiles() throws IgniteCheckedException {
+ deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+ deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "snapshot", false));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/520c2e36/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
new file mode 100644
index 0000000..12a601d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
@@ -0,0 +1,322 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class PagesWriteThrottleSmokeTest extends GridCommonAbstractTest {
+ /** Ip finder. */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** Slow checkpoint enabled. */
+ private final AtomicBoolean slowCheckpointEnabled = new AtomicBoolean(true);
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache1";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+ discoverySpi.setIpFinder(ipFinder);
+
+ MemoryConfiguration dbCfg = new MemoryConfiguration();
+
+ dbCfg.setMemoryPolicies(new MemoryPolicyConfiguration()
+ .setMaxSize(400 * 1024 * 1024)
+ .setName("dfltMemPlc")
+ .setMetricsEnabled(true));
+
+ dbCfg.setDefaultMemoryPolicyName("dfltMemPlc");
+
+ cfg.setMemoryConfiguration(dbCfg);
+
+ CacheConfiguration ccfg1 = new CacheConfiguration();
+
+ ccfg1.setName(CACHE_NAME);
+ ccfg1.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+ ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
+ ccfg1.setAffinity(new RendezvousAffinityFunction(false, 64));
+
+ cfg.setCacheConfiguration(ccfg1);
+
+ cfg.setPersistentStoreConfiguration(
+ new PersistentStoreConfiguration()
+ .setWalMode(WALMode.BACKGROUND)
+ .setCheckpointingFrequency(20_000)
+ .setCheckpointingPageBufferSize(200 * 1000 * 1000)
+ .setWriteThrottlingEnabled(true)
+ .setCheckpointingThreads(1)
+ .setFileIOFactory(new SlowCheckpointFileIOFactory()));
+
+ cfg.setConsistentId(gridName);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ deleteWorkFiles();
+
+ slowCheckpointEnabled.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ deleteWorkFiles();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 6 * 60 * 1000;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testThrottle() throws Exception {
+ startGrids(2).active(true);
+
+ try {
+ Ignite ig = ignite(0);
+
+ final int keyCnt = 2_000_000;
+
+ final AtomicBoolean run = new AtomicBoolean(true);
+
+ final AtomicBoolean zeroDropdown = new AtomicBoolean(false);
+
+ final HitRateMetrics putRate10secs = new HitRateMetrics(10_000, 20);
+
+ final HitRateMetrics putRate1sec = new HitRateMetrics(1_000, 20);
+
+ GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ Thread.sleep(5000);
+
+ while (run.get()) {
+ System.out.println(
+ "Put rate over last 10 seconds: " + (putRate10secs.getRate() / 10) +
+ " puts/sec, over last 1 second: " + putRate1sec.getRate());
+
+ if (putRate10secs.getRate() == 0) {
+ zeroDropdown.set(true);
+
+ run.set(false);
+ }
+
+ Thread.sleep(1000);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ finally {
+ run.set(false);
+ }
+ }
+ }, "rate-checker");
+
+ final IgniteCache<Integer, TestValue> cache = ig.getOrCreateCache(CACHE_NAME);
+
+ GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ long startTs = System.currentTimeMillis();
+
+ for (int i = 0; i < keyCnt * 10 && System.currentTimeMillis() - startTs < 3 * 60 * 1000; i++) {
+ if (!run.get())
+ break;
+
+ cache.put(ThreadLocalRandom.current().nextInt(keyCnt), new TestValue(ThreadLocalRandom.current().nextInt(),
+ ThreadLocalRandom.current().nextInt()));
+
+ putRate10secs.onHit();
+
+ putRate1sec.onHit();
+ }
+
+ run.set(false);
+ }
+ }, "loader");
+
+ while (run.get())
+ LockSupport.parkNanos(10_000);
+
+ if (zeroDropdown.get()) {
+ slowCheckpointEnabled.set(false);
+
+ IgniteInternalFuture cpFut1 = ((IgniteEx)ignite(0)).context().cache().context().database()
+ .wakeupForCheckpoint("test");
+
+ IgniteInternalFuture cpFut2 = ((IgniteEx)ignite(1)).context().cache().context().database()
+ .wakeupForCheckpoint("test");
+
+ cpFut1.get();
+
+ cpFut2.get();
+
+ fail("Put rate degraded to zero for at least 10 seconds");
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestValue implements Serializable {
+ /** */
+ private final int v1;
+
+ /** */
+ private final int v2;
+
+ /** */
+ private byte[] payload = new byte[400 + ThreadLocalRandom.current().nextInt(20)];
+
+ /**
+ * @param v1 Value 1.
+ * @param v2 Value 2.
+ */
+ private TestValue(int v1, int v2) {
+ this.v1 = v1;
+ this.v2 = v2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestValue val = (TestValue)o;
+
+ return v1 == val.v1 && v2 == val.v2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = v1;
+
+ res = 31 * res + v2;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestValue.class, this);
+ }
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void deleteWorkFiles() throws IgniteCheckedException {
+ deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+ deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "snapshot", false));
+ }
+
+ /**
+ * Create File I/O that emulates poor checkpoint write speed.
+ */
+ private class SlowCheckpointFileIOFactory implements FileIOFactory {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Delegate factory. */
+ private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory();
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file) throws IOException {
+ return create(file, "rw");
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, String mode) throws IOException {
+ final FileIO delegate = delegateFactory.create(file, mode);
+
+ return new FileIODecorator(delegate) {
+ @Override public int write(ByteBuffer srcBuf) throws IOException {
+ if (slowCheckpointEnabled.get() && Thread.currentThread().getName().contains("checkpoint"))
+ LockSupport.parkNanos(5_000_000);
+
+ return delegate.write(srcBuf);
+ }
+
+ @Override public int write(ByteBuffer srcBuf, long position) throws IOException {
+ if (slowCheckpointEnabled.get() && Thread.currentThread().getName().contains("checkpoint"))
+ LockSupport.parkNanos(5_000_000);
+
+ return delegate.write(srcBuf, position);
+ }
+
+ @Override public void write(byte[] buf, int off, int len) throws IOException {
+ if (slowCheckpointEnabled.get() && Thread.currentThread().getName().contains("checkpoint"))
+ LockSupport.parkNanos(5_000_000);
+
+ delegate.write(buf, off, len);
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/520c2e36/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index b2a1f65..ef7682f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.BPlusTree
import org.apache.ignite.internal.processors.cache.persistence.pagemem.MetadataStoragePageMemoryImplTest;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplNoLoadTest;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplTest;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottleSmokeTest;
import org.apache.ignite.internal.processors.database.IgniteDbClientNearCachePutGetTest;
import org.apache.ignite.internal.processors.database.IgniteDbDynamicCacheSelfTest;
import org.apache.ignite.internal.processors.database.IgniteDbMultiNodePutGetTest;
@@ -80,6 +81,9 @@ public class IgnitePdsTestSuite extends TestSuite {
suite.addTestSuite(DefaultPageSizeBackwardsCompatibilityTest.class);
+ // Write throttling
+ suite.addTestSuite(PagesWriteThrottleSmokeTest.class);
+
return suite;
}
}