You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2017/08/29 10:24:38 UTC
ignite git commit: IGNITE-6204 Backport optimizations of
checkpointing algorithm into 2.2
Repository: ignite
Updated Branches:
refs/heads/ignite-2.2 bdaeecca9 -> 737874f65
IGNITE-6204 Backport optimizations of checkpointing algorithm into 2.2
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/737874f6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/737874f6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/737874f6
Branch: refs/heads/ignite-2.2
Commit: 737874f65d2aa143a79dd5f6b0d223e07a4eceee
Parents: bdaeecc
Author: Ivan Rakov <iv...@gmail.com>
Authored: Tue Aug 29 13:24:20 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Aug 29 13:24:20 2017 +0300
----------------------------------------------------------------------
.../configuration/CheckpointWriteOrder.java | 33 ++++++
.../PersistentStoreConfiguration.java | 31 ++++-
.../internal/pagemem/store/PageStore.java | 5 +
.../GridCacheDatabaseSharedManager.java | 106 +++++++++++++----
.../cache/persistence/file/FilePageStore.java | 56 +++++----
.../persistence/file/FilePageStoreFactory.java | 35 ++++++
.../persistence/file/FilePageStoreManager.java | 17 +--
.../cache/persistence/file/FilePageStoreV2.java | 53 +++++++++
.../file/FileVersionCheckingFactory.java | 116 +++++++++++++++++++
...gnitePdsRecoveryAfterFileCorruptionTest.java | 2 +-
...nitePersistenceSequentialCheckpointTest.java | 44 +++++++
.../IgnitePersistentStoreCacheGroupsTest.java | 31 ++---
12 files changed, 461 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/737874f6/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java
new file mode 100644
index 0000000..31feaf6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.configuration;
+
+/**
+ * This enum defines order of writing pages to disk storage during checkpoint.
+ */
+public enum CheckpointWriteOrder {
+ /**
+ * Pages are written in order provided by checkpoint pages collection iterator (which is basically a hashtable).
+ */
+ RANDOM,
+
+ /**
+ * All checkpoint pages are collected into single list and sorted by page index.
+ * Provides almost sequential disk writes, which can be much faster on some SSD models.
+ */
+ SEQUENTIAL
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/737874f6/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
index e8a0ff4..888bf42 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
@@ -16,12 +16,11 @@
*/
package org.apache.ignite.configuration;
+import java.io.Serializable;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.util.typedef.internal.S;
-import java.io.Serializable;
-
/**
* Configures Apache Ignite Persistent store.
*/
@@ -45,7 +44,10 @@ public class PersistentStoreConfiguration implements Serializable {
public static final int DFLT_RATE_TIME_INTERVAL_MILLIS = 60_000;
/** Default number of checkpointing threads. */
- public static final int DFLT_CHECKPOINTING_THREADS = 1;
+ public static final int DFLT_CHECKPOINTING_THREADS = 4;
+
+ /** Default checkpoint write order. */
+ public static final CheckpointWriteOrder DFLT_CHECKPOINT_WRITE_ORDER = CheckpointWriteOrder.SEQUENTIAL;
/** Default number of checkpoints to be kept in WAL after checkpoint is finished */
public static final int DFLT_WAL_HISTORY_SIZE = 20;
@@ -95,6 +97,9 @@ public class PersistentStoreConfiguration implements Serializable {
/** */
private int checkpointingThreads = DFLT_CHECKPOINTING_THREADS;
+ /** Checkpoint write order. */
+ private CheckpointWriteOrder checkpointWriteOrder = DFLT_CHECKPOINT_WRITE_ORDER;
+
/** Number of checkpoints to keep */
private int walHistSize = DFLT_WAL_HISTORY_SIZE;
@@ -587,6 +592,26 @@ public class PersistentStoreConfiguration implements Serializable {
return walAutoArchiveAfterInactivity;
}
+ /**
+ * This property defines order of writing pages to disk storage during checkpoint.
+ *
+ * @return Checkpoint write order.
+ */
+ public CheckpointWriteOrder getCheckpointWriteOrder() {
+ return checkpointWriteOrder;
+ }
+
+ /**
+ * This property defines order of writing pages to disk storage during checkpoint.
+ *
+ * @param checkpointWriteOrder Checkpoint write order.
+ */
+ public PersistentStoreConfiguration setCheckpointWriteOrder(CheckpointWriteOrder checkpointWriteOrder) {
+ this.checkpointWriteOrder = checkpointWriteOrder;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(PersistentStoreConfiguration.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/737874f6/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
index 4698a6b..f6e577c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
@@ -95,4 +95,9 @@ public interface PageStore {
* @throws IgniteCheckedException If sync failed (IO error occurred).
*/
public void ensure() throws IgniteCheckedException;
+
+ /**
+ * @return Page store version.
+ */
+ public int version();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/737874f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index d147f36..351ec71 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -49,7 +49,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -65,6 +64,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.PersistenceMetrics;
+import org.apache.ignite.configuration.CheckpointWriteOrder;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.MemoryConfiguration;
@@ -137,6 +137,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.mxbean.PersistenceMetricsMXBean;
import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -384,11 +385,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
long cpBufSize = persistenceCfg.getCheckpointingPageBufferSize();
if (persistenceCfg.getCheckpointingThreads() > 1)
- asyncRunner = new ThreadPoolExecutor(
+ asyncRunner = new IgniteThreadPoolExecutor(
+ "checkpoint-runner",
+ cctx.igniteInstanceName(),
persistenceCfg.getCheckpointingThreads(),
persistenceCfg.getCheckpointingThreads(),
- 30L,
- TimeUnit.SECONDS,
+ 30_000,
new LinkedBlockingQueue<Runnable>()
);
@@ -1301,8 +1303,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
boolean apply = status.needRestoreMemory();
if (apply) {
- U.quietAndWarn(log, "Ignite node crashed in the middle of checkpoint. Will restore memory state and " +
- "enforce checkpoint on node start.");
+ U.quietAndWarn(log, "Ignite node stopped in the middle of checkpoint. Will restore memory state and " +
+ "finish checkpoint on node start.");
cctx.pageStore().beginRecover();
}
@@ -2082,12 +2084,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
WALPointer cpPtr = null;
- GridMultiCollectionWrapper<FullPageId> cpPages;
-
final CheckpointProgress curr;
+ IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple;
+
tracker.onLockWaitStart();
+ boolean hasPages;
+
checkpointLock.writeLock().lock();
try {
@@ -2150,19 +2154,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (curr.nextSnapshot)
snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map);
- IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> tup = beginAllCheckpoints();
-
- // Todo it maybe more optimally
- Collection<FullPageId> cpPagesList = new ArrayList<>(tup.get2());
-
- for (GridMultiCollectionWrapper<FullPageId> col : tup.get1()) {
- for (int i = 0; i < col.collectionsSize(); i++)
- cpPagesList.addAll(col.innerCollection(i));
- }
+ cpPagesTuple = beginAllCheckpoints();
- cpPages = new GridMultiCollectionWrapper<>(cpPagesList);
+ hasPages = hasPageForWrite(cpPagesTuple.get1());
- if (!F.isEmpty(cpPages)) {
+ if (hasPages) {
// No page updates for this checkpoint are allowed from now on.
cpPtr = cctx.wal().log(cpRec);
@@ -2178,7 +2174,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
curr.cpBeginFut.onDone();
- if (!F.isEmpty(cpPages)) {
+ if (hasPages) {
assert cpPtr != null;
// Sync log outside the checkpoint write lock.
@@ -2196,6 +2192,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
checkpointHist.addCheckpointEntry(cpEntry);
+ GridMultiCollectionWrapper<FullPageId> cpPages = splitAndSortCpPagesIfNeeded(cpPagesTuple);
+
if (printCheckpointStats)
if (log.isInfoEnabled())
log.info(String.format("Checkpoint started [checkpointId=%s, startPtr=%s, checkpointLockWait=%dms, " +
@@ -2227,6 +2225,24 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
+ * Check that at least one collection is not empty.
+ *
+ * @param cpPagesCollWrapper Collection of {@link GridMultiCollectionWrapper} checkpoint pages.
+ */
+ private boolean hasPageForWrite(Collection<GridMultiCollectionWrapper<FullPageId>> cpPagesCollWrapper) {
+ boolean hasPages = false;
+
+ for (Collection c : cpPagesCollWrapper)
+ if (!c.isEmpty()) {
+ hasPages = true;
+
+ break;
+ }
+
+ return hasPages;
+ }
+
+ /**
* @return tuple with collections of FullPageIds obtained from each PageMemory and overall number of dirty pages.
*/
private IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> beginAllCheckpoints() {
@@ -2293,6 +2309,56 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
}
+ /**
+ * Reorders list of checkpoint pages and splits them into needed number of sublists according to
+ * {@link PersistentStoreConfiguration#getCheckpointingThreads()} and
+ * {@link PersistentStoreConfiguration#getCheckpointWriteOrder()}.
+ *
+ * @param cpPagesTuple Checkpoint pages tuple.
+ */
+ private GridMultiCollectionWrapper<FullPageId> splitAndSortCpPagesIfNeeded(
+ IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple
+ ) {
+ List<FullPageId> cpPagesList = new ArrayList<>(cpPagesTuple.get2());
+
+ for (GridMultiCollectionWrapper<FullPageId> col : cpPagesTuple.get1()) {
+ for (int i = 0; i < col.collectionsSize(); i++)
+ cpPagesList.addAll(col.innerCollection(i));
+ }
+
+ if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) {
+ Collections.sort(cpPagesList, new Comparator<FullPageId>() {
+ @Override public int compare(FullPageId o1, FullPageId o2) {
+ int cmp = Long.compare(o1.groupId(), o2.groupId());
+ if (cmp != 0)
+ return cmp;
+
+ return Long.compare(PageIdUtils.effectivePageId(o1.pageId()),
+ PageIdUtils.effectivePageId(o2.pageId()));
+ }
+ });
+ }
+
+ int cpThreads = persistenceCfg.getCheckpointingThreads();
+
+ int pagesSubLists = cpThreads == 1 ? 1 : cpThreads * 4;
+ // Splitting pages to (threads * 4) subtasks. If any thread will be faster, it will help slower threads.
+
+ Collection[] pagesSubListArr = new Collection[pagesSubLists];
+
+ for (int i = 0; i < pagesSubLists; i++) {
+ int totalSize = cpPagesList.size();
+
+ int from = totalSize * i / (pagesSubLists);
+
+ int to = totalSize * (i + 1) / (pagesSubLists);
+
+ pagesSubListArr[i] = cpPagesList.subList(from, to);
+ }
+
+ return new GridMultiCollectionWrapper<FullPageId>(pagesSubListArr);
+ }
+
/** Pages write task */
private class WriteCheckpointPages implements Runnable {
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/737874f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index a7ca13c..e6c5379 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -45,10 +45,10 @@ public class FilePageStore implements PageStore {
private static final long SIGNATURE = 0xF19AC4FE60C530B8L;
/** File version. */
- private static final int VERSION = 1;
+ public static final int VERSION = 1;
/** Allocated field offset. */
- public static final int HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page size*/;
+ static final int HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page size*/;
/** */
private final File cfgFile;
@@ -57,7 +57,7 @@ public class FilePageStore implements PageStore {
private final byte type;
/** Database configuration. */
- private final MemoryConfiguration dbCfg;
+ protected final MemoryConfiguration dbCfg;
/** Factory to provide I/O interfaces for read/write operations with files */
private final FileIOFactory ioFactory;
@@ -103,20 +103,36 @@ public class FilePageStore implements PageStore {
/** {@inheritDoc} */
@Override public boolean exists() {
- return cfgFile.exists() && cfgFile.length() > HEADER_SIZE;
+ return cfgFile.exists() && cfgFile.length() > headerSize();
}
/**
+ * Size of page store header.
+ */
+ public int headerSize() {
+ return HEADER_SIZE;
+ }
+
+ /**
+ * Page store version.
+ */
+ public int version() {
+ return VERSION;
+ }
+
+ /**
+ * Creates header for current version file store. Doesn't init the store.
+ *
* @param type Type.
* @param pageSize Page size.
* @return Byte buffer instance.
*/
- public static ByteBuffer header(byte type, int pageSize) {
- ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN);
+ public ByteBuffer header(byte type, int pageSize) {
+ ByteBuffer hdr = ByteBuffer.allocate(headerSize()).order(ByteOrder.LITTLE_ENDIAN);
hdr.putLong(SIGNATURE);
- hdr.putInt(VERSION);
+ hdr.putInt(version());
hdr.put(type);
@@ -142,7 +158,7 @@ public class FilePageStore implements PageStore {
}
//there is 'super' page in every file
- return HEADER_SIZE + dbCfg.getPageSize();
+ return headerSize() + dbCfg.getPageSize();
}
/**
@@ -150,7 +166,7 @@ public class FilePageStore implements PageStore {
*/
private long checkFile() throws IgniteCheckedException {
try {
- ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN);
+ ByteBuffer hdr = ByteBuffer.allocate(headerSize()).order(ByteOrder.LITTLE_ENDIAN);
while (hdr.remaining() > 0)
fileIO.read(hdr);
@@ -166,9 +182,9 @@ public class FilePageStore implements PageStore {
int ver = hdr.getInt();
- if (VERSION != ver)
+ if (version() != ver)
throw new IgniteCheckedException("Failed to verify store file (invalid file version)" +
- " [expectedVersion=" + VERSION +
+ " [expectedVersion=" + version() +
", fileVersion=" + ver + "]");
byte type = hdr.get();
@@ -187,10 +203,10 @@ public class FilePageStore implements PageStore {
long fileSize = cfgFile.length();
- if (fileSize == HEADER_SIZE) // Every file has a special meta page.
- fileSize = pageSize + HEADER_SIZE;
+ if (fileSize == headerSize()) // Every file has a special meta page.
+ fileSize = pageSize + headerSize();
- if ((fileSize - HEADER_SIZE) % pageSize != 0)
+ if ((fileSize - headerSize()) % pageSize != 0)
throw new IgniteCheckedException("Failed to verify store file (invalid file size)" +
" [fileSize=" + U.hexLong(fileSize) +
", pageSize=" + U.hexLong(pageSize) + ']');
@@ -346,9 +362,9 @@ public class FilePageStore implements PageStore {
init();
try {
- assert buf.remaining() == HEADER_SIZE;
+ assert buf.remaining() == headerSize();
- int len = HEADER_SIZE;
+ int len = headerSize();
long off = 0;
@@ -425,7 +441,7 @@ public class FilePageStore implements PageStore {
long off = pageOffset(pageId);
- assert (off >= 0 && off + pageSize <= allocated.get() + HEADER_SIZE) || recover :
+ assert (off >= 0 && off + pageSize <= allocated.get() + headerSize()) || recover :
"off=" + U.hexLong(off) + ", allocated=" + U.hexLong(allocated.get()) + ", pageId=" + U.hexLong(pageId);
assert pageBuf.capacity() == pageSize;
@@ -463,7 +479,7 @@ public class FilePageStore implements PageStore {
/** {@inheritDoc} */
@Override public long pageOffset(long pageId) {
- return (long) PageIdUtils.pageIndex(pageId) * pageSize + HEADER_SIZE;
+ return (long) PageIdUtils.pageIndex(pageId) * pageSize + headerSize();
}
/** {@inheritDoc} */
@@ -494,7 +510,7 @@ public class FilePageStore implements PageStore {
long off = allocPage();
- return off / pageSize;
+ return (off - headerSize()) / pageSize;
}
/**
@@ -519,6 +535,6 @@ public class FilePageStore implements PageStore {
if (!inited)
return 0;
- return (int)(allocated.get() / pageSize);
+ return (int)(allocated.get() - headerSize()) / pageSize;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/737874f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java
new file mode 100644
index 0000000..d97ab26
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java
@@ -0,0 +1,35 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+
+/**
+ *
+ */
+public interface FilePageStoreFactory {
+ /**
+ * Creates instance of FilePageStore based on given file.
+ *
+ * @param type Data type, can be {@link PageIdAllocator#FLAG_IDX} or {@link PageIdAllocator#FLAG_DATA}.
+ * @param file File Page store file.
+ */
+ public FilePageStore createPageStore(byte type, File file) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/737874f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index e2ad070..0041ea6 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -365,21 +365,16 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
if (dirExisted && !idxFile.exists())
grpsWithoutIdx.add(grpDesc.groupId());
- FilePageStore idxStore = new FilePageStore(
- PageMemory.FLAG_IDX,
- idxFile,
- pstCfg.getFileIOFactory(),
- cctx.kernalContext().config().getMemoryConfiguration());
+ FileVersionCheckingFactory pageStoreFactory = new FileVersionCheckingFactory(
+ pstCfg.getFileIOFactory(), igniteCfg.getMemoryConfiguration());
+
+ FilePageStore idxStore = pageStoreFactory.createPageStore(PageMemory.FLAG_IDX, idxFile);
FilePageStore[] partStores = new FilePageStore[grpDesc.config().getAffinity().partitions()];
for (int partId = 0; partId < partStores.length; partId++) {
- FilePageStore partStore = new FilePageStore(
- PageMemory.FLAG_DATA,
- new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE, partId)),
- pstCfg.getFileIOFactory(),
- cctx.kernalContext().config().getMemoryConfiguration()
- );
+ FilePageStore partStore = pageStoreFactory.createPageStore(
+ PageMemory.FLAG_DATA, new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE, partId)));
partStores[partId] = partStore;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/737874f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java
new file mode 100644
index 0000000..5d044ec
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java
@@ -0,0 +1,53 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import org.apache.ignite.configuration.MemoryConfiguration;
+
+/**
+ *
+ */
+public class FilePageStoreV2 extends FilePageStore {
+ /** File version. */
+ public static final int VERSION = 2;
+
+ /** Header size. */
+ private final int hdrSize;
+
+ /**
+ * @param type Type.
+ * @param file File.
+ * @param factory Factory.
+ * @param cfg Config.
+ */
+ public FilePageStoreV2(byte type, File file, FileIOFactory factory, MemoryConfiguration cfg) {
+ super(type, file, factory, cfg);
+
+ hdrSize = cfg.getPageSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int headerSize() {
+ return hdrSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int version() {
+ return VERSION;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/737874f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
new file mode 100644
index 0000000..53bd802
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
@@ -0,0 +1,116 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.MemoryConfiguration;
+
+/**
+ * Checks version in files if it's present on the disk, creates store with latest version otherwise.
+ */
+public class FileVersionCheckingFactory implements FilePageStoreFactory {
+ /** Property to override latest version. Should be used only in tests. */
+ public static final String LATEST_VERSION_OVERRIDE_PROPERTY = "file.page.store.latest.version.override";
+
+ /** Latest page store version. */
+ public final static int LATEST_VERSION = 2;
+
+ /** Factory to provide I/O interfaces for read/write operations with files. */
+ private final FileIOFactory fileIOFactory;
+
+ /** Memory configuration. */
+ private final MemoryConfiguration memCfg;
+
+ /**
+ * @param fileIOFactory File io factory.
+ * @param memCfg Memory configuration.
+ */
+ public FileVersionCheckingFactory(
+ FileIOFactory fileIOFactory, MemoryConfiguration memCfg) {
+ this.fileIOFactory = fileIOFactory;
+ this.memCfg = memCfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FilePageStore createPageStore(byte type, File file) throws IgniteCheckedException {
+ if (!file.exists())
+ return createPageStore(type, file, latestVersion());
+
+ try (FileIO fileIO = fileIOFactory.create(file, "r")) {
+ int minHdr = FilePageStore.HEADER_SIZE;
+
+ if (fileIO.size() < minHdr)
+ return createPageStore(type, file, latestVersion());
+
+ ByteBuffer hdr = ByteBuffer.allocate(minHdr).order(ByteOrder.LITTLE_ENDIAN);
+
+ while (hdr.remaining() > 0)
+ fileIO.read(hdr);
+
+ hdr.rewind();
+
+ hdr.getLong(); // Read signature
+
+ int ver = hdr.getInt();
+
+ return createPageStore(type, file, ver);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Error while creating file page store [file=" + file + "]:", e);
+ }
+ }
+
+ /**
+ * Resolves latest page store version.
+ */
+ public int latestVersion() {
+ int latestVer = LATEST_VERSION;
+
+ try {
+ latestVer = Integer.parseInt(System.getProperty(LATEST_VERSION_OVERRIDE_PROPERTY));
+ } catch (NumberFormatException e) {
+ // No override.
+ }
+
+ return latestVer;
+ }
+
+ /**
+ * Instantiates specific version of FilePageStore.
+ *
+ * @param type Type.
+ * @param file File.
+ * @param ver Version.
+ */
+ public FilePageStore createPageStore(byte type, File file, int ver) throws IgniteCheckedException {
+ switch (ver) {
+ case FilePageStore.VERSION:
+ return new FilePageStore(type, file, fileIOFactory, memCfg);
+
+ case FilePageStoreV2.VERSION:
+ return new FilePageStoreV2(type, file, fileIOFactory, memCfg);
+
+ default:
+ throw new IllegalArgumentException("Unknown version of file page store: " + ver);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/737874f6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
index c248c35..11d5eef 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
@@ -194,7 +194,7 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract
long size = fileIO.size();
- fileIO.write(ByteBuffer.allocate((int)size - FilePageStore.HEADER_SIZE), FilePageStore.HEADER_SIZE);
+ fileIO.write(ByteBuffer.allocate((int)size - filePageStore.headerSize()), filePageStore.headerSize());
fileIO.force();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/737874f6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
new file mode 100644
index 0000000..9295000
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
@@ -0,0 +1,44 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.configuration.CheckpointWriteOrder;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+
+/**
+ *
+ */
+public class IgnitePersistenceSequentialCheckpointTest extends IgnitePersistentStoreCacheGroupsTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration()
+ .setWalMode(WALMode.LOG_ONLY)
+ .setCheckpointingThreads(4)
+ .setCheckpointWriteOrder(CheckpointWriteOrder.SEQUENTIAL));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int entriesCount() {
+ return 1000;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/737874f6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
index a945c73..b39b8cb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
@@ -87,7 +87,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
MemoryConfiguration memCfg = new MemoryConfiguration();
memCfg.setPageSize(1024);
- memCfg.setDefaultMemoryPolicySize(10 * 1024 * 1024);
+ memCfg.setDefaultMemoryPolicySize(100 * 1024 * 1024);
cfg.setMemoryConfiguration(memCfg);
@@ -115,6 +115,11 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
super.afterTest();
}
+ /** Entries count. */
+ protected int entriesCount() {
+ return 10;
+ }
+
/**
* @throws Exception If failed.
*/
@@ -236,7 +241,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
for (String cacheName : caches) {
IgniteCache<Object, Object> cache = node.cache(cacheName).withExpiryPolicy(plc);
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < entriesCount(); i++)
cache.put(i, cacheName + i);
}
@@ -253,10 +258,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
for (String cacheName : caches) {
IgniteCache<Object, Object> cache = node.cache(cacheName);
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < entriesCount(); i++)
assertEquals(cacheName + i, cache.get(i));
- assertEquals(10, cache.size());
+ assertEquals(entriesCount(), cache.size());
}
// Wait for expiration.
@@ -340,7 +345,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
for (String cacheName : caches) {
IgniteCache<Object, Object> cache = node.cache(cacheName);
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < entriesCount(); i++)
cache.put(i, new Person("" + i, cacheName));
}
}
@@ -353,10 +358,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
for (String cacheName : caches) {
IgniteCache<Object, Object> cache = node.cache(cacheName);
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < entriesCount(); i++)
assertEquals(new Person("" + i, cacheName), cache.get(i));
- assertEquals(10, cache.size());
+ assertEquals(entriesCount(), cache.size());
}
}
@@ -373,10 +378,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
List<Cache.Entry<Integer, Person>> persons = cache.query(qry.setArgs(cacheName)).getAll();
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < entriesCount(); i++)
assertEquals(new Person("" + i, cacheName), persons.get(i).getValue());
- assertEquals(10, persons.size());
+ assertEquals(entriesCount(), persons.size());
}
}
@@ -413,13 +418,13 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
for (String cacheName : caches) {
IgniteCache<Object, Object> cache = node.cache(cacheName);
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < entriesCount(); i++) {
cache.put(i, cacheName + i);
assertEquals(cacheName + i, cache.get(i));
}
- assertEquals(10, cache.size());
+ assertEquals(entriesCount(), cache.size());
}
stopAllGrids();
@@ -433,10 +438,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
for (String cacheName : caches) {
IgniteCache<Object, Object> cache = node.cache(cacheName);
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < entriesCount(); i++)
assertEquals(cacheName + i, cache.get(i));
- assertEquals(10, cache.size());
+ assertEquals(entriesCount(), cache.size());
}
}