You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/17 16:51:56 UTC

[09/19] ignite git commit: IGNITE-6033 Added sorted and multithreaded modes in checkpointing algorithm - Fixes #2441.

IGNITE-6033 Added sorted and multithreaded modes in checkpointing algorithm - Fixes #2441.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/69e6f8b2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/69e6f8b2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/69e6f8b2

Branch: refs/heads/ignite-5901
Commit: 69e6f8b201a13f1c780948237960267c42ac5d2d
Parents: b417a36
Author: Ivan Rakov <iv...@gmail.com>
Authored: Thu Aug 17 15:54:21 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Thu Aug 17 15:54:21 2017 +0300

----------------------------------------------------------------------
 .../configuration/CheckpointWriteOrder.java     | 33 ++++++++
 .../PersistentStoreConfiguration.java           | 26 +++++++
 .../GridCacheDatabaseSharedManager.java         | 82 +++++++++++++++-----
 ...nitePersistenceSequentialCheckpointTest.java | 44 +++++++++++
 .../IgnitePersistentStoreCacheGroupsTest.java   | 31 ++++----
 5 files changed, 183 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/69e6f8b2/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java
new file mode 100644
index 0000000..31feaf6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.configuration;
+
+/**
+ * This enum defines order of writing pages to disk storage during checkpoint.
+ */
+public enum CheckpointWriteOrder {
+    /**
+     * Pages are written in order provided by checkpoint pages collection iterator (which is basically a hashtable).
+     */
+    RANDOM,
+
+    /**
+     * All checkpoint pages are collected into single list and sorted by page index.
+     * Provides almost sequential disk writes, which can be much faster on some SSD models.
+     */
+    SEQUENTIAL
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/69e6f8b2/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
index e8a0ff4..5b902ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
@@ -47,6 +47,9 @@ public class PersistentStoreConfiguration implements Serializable {
     /** Default number of checkpointing threads. */
     public static final int DFLT_CHECKPOINTING_THREADS = 1;
 
+    /** Default checkpoint write order. */
+    public static final CheckpointWriteOrder DFLT_CHECKPOINT_WRITE_ORDER = CheckpointWriteOrder.RANDOM;
+
     /** Default number of checkpoints to be kept in WAL after checkpoint is finished */
     public static final int DFLT_WAL_HISTORY_SIZE = 20;
 
@@ -95,6 +98,9 @@ public class PersistentStoreConfiguration implements Serializable {
     /** */
     private int checkpointingThreads = DFLT_CHECKPOINTING_THREADS;
 
+    /** Checkpoint write order. */
+    private CheckpointWriteOrder checkpointWriteOrder = DFLT_CHECKPOINT_WRITE_ORDER;
+
     /** Number of checkpoints to keep */
     private int walHistSize = DFLT_WAL_HISTORY_SIZE;
 
@@ -587,6 +593,26 @@ public class PersistentStoreConfiguration implements Serializable {
         return walAutoArchiveAfterInactivity;
     }
 
+    /**
+     * This property defines order of writing pages to disk storage during checkpoint.
+     *
+     * @return Checkpoint write order.
+     */
+    public CheckpointWriteOrder getCheckpointWriteOrder() {
+        return checkpointWriteOrder;
+    }
+
+    /**
+     * This property defines order of writing pages to disk storage during checkpoint.
+     *
+     * @param checkpointWriteOrder Checkpoint write order.
+     */
+    public PersistentStoreConfiguration setCheckpointWriteOrder(CheckpointWriteOrder checkpointWriteOrder) {
+        this.checkpointWriteOrder = checkpointWriteOrder;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(PersistentStoreConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/69e6f8b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 99e05dd..3c7ba28 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -49,7 +49,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -62,6 +61,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.PersistenceMetrics;
+import org.apache.ignite.configuration.CheckpointWriteOrder;
 import org.apache.ignite.configuration.DataPageEvictionMode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.MemoryConfiguration;
@@ -93,7 +93,6 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
@@ -135,6 +134,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.mxbean.PersistenceMetricsMXBean;
 import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -382,11 +382,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         long cpBufSize = persistenceCfg.getCheckpointingPageBufferSize();
 
         if (persistenceCfg.getCheckpointingThreads() > 1)
-            asyncRunner = new ThreadPoolExecutor(
+            asyncRunner = new IgniteThreadPoolExecutor(
+                "checkpoint-runner",
+                cctx.igniteInstanceName(),
                 persistenceCfg.getCheckpointingThreads(),
                 persistenceCfg.getCheckpointingThreads(),
-                30L,
-                TimeUnit.SECONDS,
+                30_000,
                 new LinkedBlockingQueue<Runnable>()
             );
 
@@ -2084,10 +2085,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             WALPointer cpPtr = null;
 
-            GridMultiCollectionWrapper<FullPageId> cpPages;
-
             final CheckpointProgress curr;
 
+            IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple;
+
             tracker.onLockWaitStart();
 
             checkpointLock.writeLock().lock();
@@ -2152,19 +2153,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 if (curr.nextSnapshot)
                     snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map);
 
-                IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> tup = beginAllCheckpoints();
+                cpPagesTuple = beginAllCheckpoints();
 
-                // Todo it maybe more optimally
-                Collection<FullPageId> cpPagesList = new ArrayList<>(tup.get2());
-
-                for (GridMultiCollectionWrapper<FullPageId> col : tup.get1()) {
-                    for (int i = 0; i < col.collectionsSize(); i++)
-                        cpPagesList.addAll(col.innerCollection(i));
-                }
-
-                cpPages = new GridMultiCollectionWrapper<>(cpPagesList);
-
-                if (!F.isEmpty(cpPages)) {
+                if (!F.isEmpty(cpPagesTuple.get1())) {
                     // No page updates for this checkpoint are allowed from now on.
                     cpPtr = cctx.wal().log(cpRec);
 
@@ -2180,7 +2171,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             curr.cpBeginFut.onDone();
 
-            if (!F.isEmpty(cpPages)) {
+            if (!F.isEmpty(cpPagesTuple.get1())) {
                 assert cpPtr != null;
 
                 // Sync log outside the checkpoint write lock.
@@ -2198,6 +2189,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                 checkpointHist.addCheckpointEntry(cpEntry);
 
+                GridMultiCollectionWrapper<FullPageId> cpPages = splitAndSortCpPagesIfNeeded(cpPagesTuple);
+
                 if (printCheckpointStats)
                     if (log.isInfoEnabled())
                         log.info(String.format("Checkpoint started [checkpointId=%s, startPtr=%s, checkpointLockWait=%dms, " +
@@ -2295,6 +2288,55 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         }
     }
 
+    /**
+     * Reorders list of checkpoint pages and splits them into needed number of sublists according to
+     * {@link PersistentStoreConfiguration#getCheckpointingThreads()} and
+     * {@link PersistentStoreConfiguration#getCheckpointWriteOrder()}.
+     *
+     * @param cpPagesTuple Checkpoint pages tuple.
+     */
+    private GridMultiCollectionWrapper<FullPageId> splitAndSortCpPagesIfNeeded(
+        IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple) {
+        List<FullPageId> cpPagesList = new ArrayList<>(cpPagesTuple.get2());
+
+        for (GridMultiCollectionWrapper<FullPageId> col : cpPagesTuple.get1()) {
+            for (int i = 0; i < col.collectionsSize(); i++)
+                cpPagesList.addAll(col.innerCollection(i));
+        }
+
+        if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) {
+            Collections.sort(cpPagesList, new Comparator<FullPageId>() {
+                @Override public int compare(FullPageId o1, FullPageId o2) {
+                    int cmp = Long.compare(o1.groupId(), o2.groupId());
+                    if (cmp != 0)
+                        return cmp;
+
+                    return Long.compare(PageIdUtils.effectivePageId(o1.pageId()),
+                        PageIdUtils.effectivePageId(o2.pageId()));
+                }
+            });
+        }
+
+        int cpThreads = persistenceCfg.getCheckpointingThreads();
+
+        int pagesSubLists = cpThreads == 1 ? 1 : cpThreads * 4;
+        // Splitting pages to (threads * 4) subtasks. If any thread will be faster, it will help slower threads.
+
+        Collection[] pagesSubListArr = new Collection[pagesSubLists];
+
+        for (int i = 0; i < pagesSubLists; i++) {
+            int totalSize = cpPagesList.size();
+
+            int from = totalSize * i / (pagesSubLists);
+
+            int to = totalSize * (i + 1) / (pagesSubLists);
+
+            pagesSubListArr[i] = cpPagesList.subList(from, to);
+        }
+
+        return new GridMultiCollectionWrapper<FullPageId>(pagesSubListArr);
+    }
+
     /** Pages write task */
     private class WriteCheckpointPages implements Runnable {
         /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/69e6f8b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
new file mode 100644
index 0000000..9295000
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
@@ -0,0 +1,44 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.configuration.CheckpointWriteOrder;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+
+/**
+ *
+ */
+public class IgnitePersistenceSequentialCheckpointTest extends IgnitePersistentStoreCacheGroupsTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration()
+            .setWalMode(WALMode.LOG_ONLY)
+            .setCheckpointingThreads(4)
+            .setCheckpointWriteOrder(CheckpointWriteOrder.SEQUENTIAL));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int entriesCount() {
+        return 1000;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/69e6f8b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
index a945c73..b39b8cb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
@@ -87,7 +87,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
 
         MemoryConfiguration memCfg = new MemoryConfiguration();
         memCfg.setPageSize(1024);
-        memCfg.setDefaultMemoryPolicySize(10 * 1024 * 1024);
+        memCfg.setDefaultMemoryPolicySize(100 * 1024 * 1024);
 
         cfg.setMemoryConfiguration(memCfg);
 
@@ -115,6 +115,11 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         super.afterTest();
     }
 
+    /** Entries count. */
+    protected int entriesCount() {
+        return 10;
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -236,7 +241,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         for (String cacheName : caches) {
             IgniteCache<Object, Object> cache = node.cache(cacheName).withExpiryPolicy(plc);
 
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < entriesCount(); i++)
                 cache.put(i, cacheName + i);
         }
 
@@ -253,10 +258,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         for (String cacheName : caches) {
             IgniteCache<Object, Object> cache = node.cache(cacheName);
 
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < entriesCount(); i++)
                 assertEquals(cacheName + i, cache.get(i));
 
-            assertEquals(10, cache.size());
+            assertEquals(entriesCount(), cache.size());
         }
 
         // Wait for expiration.
@@ -340,7 +345,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         for (String cacheName : caches) {
             IgniteCache<Object, Object> cache = node.cache(cacheName);
 
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < entriesCount(); i++)
                 cache.put(i, new Person("" + i, cacheName));
         }
     }
@@ -353,10 +358,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         for (String cacheName : caches) {
             IgniteCache<Object, Object> cache = node.cache(cacheName);
 
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < entriesCount(); i++)
                 assertEquals(new Person("" + i, cacheName), cache.get(i));
 
-            assertEquals(10, cache.size());
+            assertEquals(entriesCount(), cache.size());
         }
     }
 
@@ -373,10 +378,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
 
             List<Cache.Entry<Integer, Person>> persons = cache.query(qry.setArgs(cacheName)).getAll();
 
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < entriesCount(); i++)
                 assertEquals(new Person("" + i, cacheName), persons.get(i).getValue());
 
-            assertEquals(10, persons.size());
+            assertEquals(entriesCount(), persons.size());
         }
     }
 
@@ -413,13 +418,13 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         for (String cacheName : caches) {
             IgniteCache<Object, Object> cache = node.cache(cacheName);
 
-            for (int i = 0; i < 10; i++)  {
+            for (int i = 0; i < entriesCount(); i++)  {
                 cache.put(i, cacheName + i);
 
                 assertEquals(cacheName + i, cache.get(i));
             }
 
-            assertEquals(10, cache.size());
+            assertEquals(entriesCount(), cache.size());
         }
 
         stopAllGrids();
@@ -433,10 +438,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         for (String cacheName : caches) {
             IgniteCache<Object, Object> cache = node.cache(cacheName);
 
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < entriesCount(); i++)
                 assertEquals(cacheName + i, cache.get(i));
 
-            assertEquals(10, cache.size());
+            assertEquals(entriesCount(), cache.size());
         }
     }