You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/17 16:51:56 UTC
[09/19] ignite git commit: IGNITE-6033 Added sorted and multithreaded
modes in checkpointing algorithm - Fixes #2441.
IGNITE-6033 Added sorted and multithreaded modes in checkpointing algorithm - Fixes #2441.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/69e6f8b2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/69e6f8b2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/69e6f8b2
Branch: refs/heads/ignite-5901
Commit: 69e6f8b201a13f1c780948237960267c42ac5d2d
Parents: b417a36
Author: Ivan Rakov <iv...@gmail.com>
Authored: Thu Aug 17 15:54:21 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Thu Aug 17 15:54:21 2017 +0300
----------------------------------------------------------------------
.../configuration/CheckpointWriteOrder.java | 33 ++++++++
.../PersistentStoreConfiguration.java | 26 +++++++
.../GridCacheDatabaseSharedManager.java | 82 +++++++++++++++-----
...nitePersistenceSequentialCheckpointTest.java | 44 +++++++++++
.../IgnitePersistentStoreCacheGroupsTest.java | 31 ++++----
5 files changed, 183 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/69e6f8b2/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java
new file mode 100644
index 0000000..31feaf6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.configuration;
+
+/**
+ * This enum defines order of writing pages to disk storage during checkpoint.
+ */
+public enum CheckpointWriteOrder {
+ /**
+ * Pages are written in order provided by checkpoint pages collection iterator (which is basically a hashtable).
+ */
+ RANDOM,
+
+ /**
+ * All checkpoint pages are collected into single list and sorted by page index.
+ * Provides almost sequential disk writes, which can be much faster on some SSD models.
+ */
+ SEQUENTIAL
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/69e6f8b2/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
index e8a0ff4..5b902ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
@@ -47,6 +47,9 @@ public class PersistentStoreConfiguration implements Serializable {
/** Default number of checkpointing threads. */
public static final int DFLT_CHECKPOINTING_THREADS = 1;
+ /** Default checkpoint write order. */
+ public static final CheckpointWriteOrder DFLT_CHECKPOINT_WRITE_ORDER = CheckpointWriteOrder.RANDOM;
+
/** Default number of checkpoints to be kept in WAL after checkpoint is finished */
public static final int DFLT_WAL_HISTORY_SIZE = 20;
@@ -95,6 +98,9 @@ public class PersistentStoreConfiguration implements Serializable {
/** */
private int checkpointingThreads = DFLT_CHECKPOINTING_THREADS;
+ /** Checkpoint write order. */
+ private CheckpointWriteOrder checkpointWriteOrder = DFLT_CHECKPOINT_WRITE_ORDER;
+
/** Number of checkpoints to keep */
private int walHistSize = DFLT_WAL_HISTORY_SIZE;
@@ -587,6 +593,26 @@ public class PersistentStoreConfiguration implements Serializable {
return walAutoArchiveAfterInactivity;
}
+ /**
+ * This property defines order of writing pages to disk storage during checkpoint.
+ *
+ * @return Checkpoint write order.
+ */
+ public CheckpointWriteOrder getCheckpointWriteOrder() {
+ return checkpointWriteOrder;
+ }
+
+ /**
+ * This property defines order of writing pages to disk storage during checkpoint.
+ *
+ * @param checkpointWriteOrder Checkpoint write order.
+ */
+ public PersistentStoreConfiguration setCheckpointWriteOrder(CheckpointWriteOrder checkpointWriteOrder) {
+ this.checkpointWriteOrder = checkpointWriteOrder;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(PersistentStoreConfiguration.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/69e6f8b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 99e05dd..3c7ba28 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -49,7 +49,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -62,6 +61,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.PersistenceMetrics;
+import org.apache.ignite.configuration.CheckpointWriteOrder;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.MemoryConfiguration;
@@ -93,7 +93,6 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
@@ -135,6 +134,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.mxbean.PersistenceMetricsMXBean;
import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -382,11 +382,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
long cpBufSize = persistenceCfg.getCheckpointingPageBufferSize();
if (persistenceCfg.getCheckpointingThreads() > 1)
- asyncRunner = new ThreadPoolExecutor(
+ asyncRunner = new IgniteThreadPoolExecutor(
+ "checkpoint-runner",
+ cctx.igniteInstanceName(),
persistenceCfg.getCheckpointingThreads(),
persistenceCfg.getCheckpointingThreads(),
- 30L,
- TimeUnit.SECONDS,
+ 30_000,
new LinkedBlockingQueue<Runnable>()
);
@@ -2084,10 +2085,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
WALPointer cpPtr = null;
- GridMultiCollectionWrapper<FullPageId> cpPages;
-
final CheckpointProgress curr;
+ IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple;
+
tracker.onLockWaitStart();
checkpointLock.writeLock().lock();
@@ -2152,19 +2153,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (curr.nextSnapshot)
snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map);
- IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> tup = beginAllCheckpoints();
+ cpPagesTuple = beginAllCheckpoints();
- // Todo it maybe more optimally
- Collection<FullPageId> cpPagesList = new ArrayList<>(tup.get2());
-
- for (GridMultiCollectionWrapper<FullPageId> col : tup.get1()) {
- for (int i = 0; i < col.collectionsSize(); i++)
- cpPagesList.addAll(col.innerCollection(i));
- }
-
- cpPages = new GridMultiCollectionWrapper<>(cpPagesList);
-
- if (!F.isEmpty(cpPages)) {
+ if (!F.isEmpty(cpPagesTuple.get1())) {
// No page updates for this checkpoint are allowed from now on.
cpPtr = cctx.wal().log(cpRec);
@@ -2180,7 +2171,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
curr.cpBeginFut.onDone();
- if (!F.isEmpty(cpPages)) {
+ if (!F.isEmpty(cpPagesTuple.get1())) {
assert cpPtr != null;
// Sync log outside the checkpoint write lock.
@@ -2198,6 +2189,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
checkpointHist.addCheckpointEntry(cpEntry);
+ GridMultiCollectionWrapper<FullPageId> cpPages = splitAndSortCpPagesIfNeeded(cpPagesTuple);
+
if (printCheckpointStats)
if (log.isInfoEnabled())
log.info(String.format("Checkpoint started [checkpointId=%s, startPtr=%s, checkpointLockWait=%dms, " +
@@ -2295,6 +2288,55 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
}
+ /**
+ * Reorders list of checkpoint pages and splits them into needed number of sublists according to
+ * {@link PersistentStoreConfiguration#getCheckpointingThreads()} and
+ * {@link PersistentStoreConfiguration#getCheckpointWriteOrder()}.
+ *
+ * @param cpPagesTuple Checkpoint pages tuple.
+ */
+ private GridMultiCollectionWrapper<FullPageId> splitAndSortCpPagesIfNeeded(
+ IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple) {
+ List<FullPageId> cpPagesList = new ArrayList<>(cpPagesTuple.get2());
+
+ for (GridMultiCollectionWrapper<FullPageId> col : cpPagesTuple.get1()) {
+ for (int i = 0; i < col.collectionsSize(); i++)
+ cpPagesList.addAll(col.innerCollection(i));
+ }
+
+ if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) {
+ Collections.sort(cpPagesList, new Comparator<FullPageId>() {
+ @Override public int compare(FullPageId o1, FullPageId o2) {
+ int cmp = Long.compare(o1.groupId(), o2.groupId());
+ if (cmp != 0)
+ return cmp;
+
+ return Long.compare(PageIdUtils.effectivePageId(o1.pageId()),
+ PageIdUtils.effectivePageId(o2.pageId()));
+ }
+ });
+ }
+
+ int cpThreads = persistenceCfg.getCheckpointingThreads();
+
+ int pagesSubLists = cpThreads == 1 ? 1 : cpThreads * 4;
+ // Splitting pages to (threads * 4) subtasks. If any thread will be faster, it will help slower threads.
+
+ Collection[] pagesSubListArr = new Collection[pagesSubLists];
+
+ for (int i = 0; i < pagesSubLists; i++) {
+ int totalSize = cpPagesList.size();
+
+ int from = totalSize * i / (pagesSubLists);
+
+ int to = totalSize * (i + 1) / (pagesSubLists);
+
+ pagesSubListArr[i] = cpPagesList.subList(from, to);
+ }
+
+ return new GridMultiCollectionWrapper<FullPageId>(pagesSubListArr);
+ }
+
/** Pages write task */
private class WriteCheckpointPages implements Runnable {
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/69e6f8b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
new file mode 100644
index 0000000..9295000
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
@@ -0,0 +1,44 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.configuration.CheckpointWriteOrder;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+
+/**
+ *
+ */
+public class IgnitePersistenceSequentialCheckpointTest extends IgnitePersistentStoreCacheGroupsTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration()
+ .setWalMode(WALMode.LOG_ONLY)
+ .setCheckpointingThreads(4)
+ .setCheckpointWriteOrder(CheckpointWriteOrder.SEQUENTIAL));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int entriesCount() {
+ return 1000;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/69e6f8b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
index a945c73..b39b8cb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
@@ -87,7 +87,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
MemoryConfiguration memCfg = new MemoryConfiguration();
memCfg.setPageSize(1024);
- memCfg.setDefaultMemoryPolicySize(10 * 1024 * 1024);
+ memCfg.setDefaultMemoryPolicySize(100 * 1024 * 1024);
cfg.setMemoryConfiguration(memCfg);
@@ -115,6 +115,11 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
super.afterTest();
}
+ /** Entries count. */
+ protected int entriesCount() {
+ return 10;
+ }
+
/**
* @throws Exception If failed.
*/
@@ -236,7 +241,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
for (String cacheName : caches) {
IgniteCache<Object, Object> cache = node.cache(cacheName).withExpiryPolicy(plc);
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < entriesCount(); i++)
cache.put(i, cacheName + i);
}
@@ -253,10 +258,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
for (String cacheName : caches) {
IgniteCache<Object, Object> cache = node.cache(cacheName);
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < entriesCount(); i++)
assertEquals(cacheName + i, cache.get(i));
- assertEquals(10, cache.size());
+ assertEquals(entriesCount(), cache.size());
}
// Wait for expiration.
@@ -340,7 +345,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
for (String cacheName : caches) {
IgniteCache<Object, Object> cache = node.cache(cacheName);
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < entriesCount(); i++)
cache.put(i, new Person("" + i, cacheName));
}
}
@@ -353,10 +358,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
for (String cacheName : caches) {
IgniteCache<Object, Object> cache = node.cache(cacheName);
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < entriesCount(); i++)
assertEquals(new Person("" + i, cacheName), cache.get(i));
- assertEquals(10, cache.size());
+ assertEquals(entriesCount(), cache.size());
}
}
@@ -373,10 +378,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
List<Cache.Entry<Integer, Person>> persons = cache.query(qry.setArgs(cacheName)).getAll();
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < entriesCount(); i++)
assertEquals(new Person("" + i, cacheName), persons.get(i).getValue());
- assertEquals(10, persons.size());
+ assertEquals(entriesCount(), persons.size());
}
}
@@ -413,13 +418,13 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
for (String cacheName : caches) {
IgniteCache<Object, Object> cache = node.cache(cacheName);
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < entriesCount(); i++) {
cache.put(i, cacheName + i);
assertEquals(cacheName + i, cache.get(i));
}
- assertEquals(10, cache.size());
+ assertEquals(entriesCount(), cache.size());
}
stopAllGrids();
@@ -433,10 +438,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
for (String cacheName : caches) {
IgniteCache<Object, Object> cache = node.cache(cacheName);
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < entriesCount(); i++)
assertEquals(cacheName + i, cache.get(i));
- assertEquals(10, cache.size());
+ assertEquals(entriesCount(), cache.size());
}
}