You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/15 14:09:59 UTC
[29/50] [abbrv] ignite git commit: IGNITE-6423: PDS could be
corrupted if partition have been evicted and owned again. This closes #3115.
IGNITE-6423: PDS could be corrupted if partition have been evicted and owned again. This closes #3115.
Fixed page memory update operations without checkpoint lock.
Fixed page CRC calculation.
Fixed outdated page handling.
Added checkpoint lock hold assertions for memory update operations.
Fixed incorrect tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e24d4d03
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e24d4d03
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e24d4d03
Branch: refs/heads/ignite-zk-ce
Commit: e24d4d03dd00cc464e6e793e2a3f23988664a0cd
Parents: 24412f5
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Wed Dec 13 14:16:50 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Wed Dec 13 14:16:50 2017 +0300
----------------------------------------------------------------------
.../internal/pagemem/store/PageStore.java | 5 +-
.../processors/cache/GridCacheTtlManager.java | 3 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 41 +++--
.../distributed/dht/GridDhtTxPrepareFuture.java | 5 +
.../local/atomic/GridLocalAtomicCache.java | 171 ++++++++++---------
.../GridCacheDatabaseSharedManager.java | 9 +-
.../persistence/GridCacheOffheapManager.java | 4 +
.../processors/cache/persistence/RowStore.java | 2 +
.../cache/persistence/file/FilePageStore.java | 32 +++-
.../persistence/file/FilePageStoreManager.java | 7 +-
.../persistence/pagemem/PageMemoryImpl.java | 56 ++++--
.../processors/cache/tree/CacheDataTree.java | 2 +
.../cache/tree/PendingEntriesTree.java | 2 +
.../pagemem/impl/PageMemoryNoLoadSelfTest.java | 10 +-
.../cache/persistence/DummyPageIO.java | 41 +++++
.../IgnitePdsContinuousRestartTest.java | 5 -
...gnitePdsRecoveryAfterFileCorruptionTest.java | 83 ++++++---
...ckpointSimulationWithRealCpDisabledTest.java | 53 +++++-
.../db/file/IgnitePdsEvictionTest.java | 30 ++++
.../query/h2/database/H2TreeIndex.java | 58 ++++---
20 files changed, 439 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
index f6e577c..42d584d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
@@ -71,10 +71,11 @@ public interface PageStore {
* @param pageId Page ID.
* @param pageBuf Page buffer to write.
* @param tag Partition file version, 1-based incrementing counter. For outdated pages {@code tag} has lower value,
- * and write does nothing
+ * and write does nothing.
+ * @param calculateCrc if {@code False} crc calculation will be forcibly skipped.
* @throws IgniteCheckedException If page writing failed (IO error occurred).
*/
- public void write(long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException;
+ public void write(long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) throws IgniteCheckedException;
/**
* Gets page offset within the store file.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index b006154..9c013fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -23,7 +23,6 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.typedef.X;
@@ -145,7 +144,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
try {
X.println(">>>");
X.println(">>> TTL processor memory stats [igniteInstanceName=" + cctx.igniteInstanceName() +
- ", cache=" + cctx.name() + ']');
+ ", cache=" + cctx.name() + ']');
X.println(">>> pendingEntriesSize: " + pendingSize());
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 370a92e..8ad6d4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -139,20 +139,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException{
+ public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException {
if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) {
String name = "PendingEntries";
- long rootPage = allocateForTree();
+ long rootPage = allocateForTree();
- pendingEntries = new PendingEntriesTree(
- grp,
- name,
- grp.dataRegion().pageMemory(),
- rootPage,
- grp.reuseList(),
- true);
- }
+ pendingEntries = new PendingEntriesTree(
+ grp,
+ name,
+ grp.dataRegion().pageMemory(),
+ rootPage,
+ grp.reuseList(),
+ true);
+ }
}
/**
@@ -196,6 +196,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
try {
if (grp.sharedGroup()) {
assert cacheId != CU.UNDEFINED_CACHE_ID;
+ assert ctx.database().checkpointLockIsHeldByThread();
for (CacheDataStore store : cacheDataStores())
store.clear(cacheId);
@@ -437,7 +438,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
while (it.hasNext()) {
cctx.shared().database().checkpointReadLock();
- try{
+ try {
KeyCacheObject key = it.next().key();
try {
@@ -1192,6 +1193,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
try {
int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+ assert cctx.shared().database().checkpointLockIsHeldByThread();
+
dataTree.invoke(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY, c);
switch (c.operationType()) {
@@ -1232,8 +1235,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
CacheObject val,
GridCacheVersion ver,
long expireTime,
- @Nullable CacheDataRow oldRow) throws IgniteCheckedException
- {
+ @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
DataRow dataRow = new DataRow(key, val, ver, partId, expireTime, cacheId);
@@ -1258,8 +1260,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public void update(GridCacheContext cctx,KeyCacheObject key,
-
+ @Override public void update(
+ GridCacheContext cctx,
+ KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
long expireTime,
@@ -1285,6 +1288,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
CacheDataRow old;
+ assert cctx.shared().database().checkpointLockIsHeldByThread();
+
if (canUpdateOldRow(cctx, oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) {
old = oldRow;
@@ -1363,6 +1368,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
try {
int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+ assert cctx.shared().database().checkpointLockIsHeldByThread();
+
CacheDataRow oldRow = dataTree.remove(new SearchRow(cacheId, key));
finishRemove(cctx, key, oldRow);
@@ -1425,8 +1432,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
return dataTree.find(null, null);
}
- /** {@inheritDoc}
- * @param cacheId*/
+ /** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException {
return cursor(cacheId, null, null);
}
@@ -1486,6 +1492,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/** {@inheritDoc} */
@Override public void clear(int cacheId) throws IgniteCheckedException {
assert cacheId != CU.UNDEFINED_CACHE_ID;
+ assert ctx.database().checkpointLockIsHeldByThread();
if (cacheSize(cacheId) == 0)
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 6873890..0fb9ee4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1819,6 +1819,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
GridDrType drType = cacheCtx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE;
+ cctx.database().checkpointReadLock();
+
try {
if (entry.initialValue(info.value(),
info.version(),
@@ -1850,6 +1852,9 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
log.debug("Failed to set entry initial value (entry is obsolete, " +
"will retry): " + entry);
}
+ finally {
+ cctx.database().checkpointReadUnlock();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 599a58c..1454e96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -830,113 +830,120 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
CacheEntryPredicate[] filters = CU.filterArray(filter);
- ctx.shared().database().ensureFreeSpace(ctx.dataRegion());
-
- if (writeThrough && keys.size() > 1) {
- return updateWithBatch(op,
- keys,
- vals,
- invokeArgs,
- expiryPlc,
- ver,
- filters,
- keepBinary,
- subjId,
- taskName);
- }
-
- Iterator<?> valsIter = vals != null ? vals.iterator() : null;
-
IgniteBiTuple<Boolean, ?> res = null;
CachePartialUpdateCheckedException err = null;
- boolean intercept = ctx.config().getInterceptor() != null;
+ ctx.shared().database().checkpointReadLock();
- for (K key : keys) {
- if (key == null)
- throw new NullPointerException("Null key.");
+ try {
+ ctx.shared().database().ensureFreeSpace(ctx.dataRegion());
+
+ if (writeThrough && keys.size() > 1) {
+ return updateWithBatch(op,
+ keys,
+ vals,
+ invokeArgs,
+ expiryPlc,
+ ver,
+ filters,
+ keepBinary,
+ subjId,
+ taskName);
+ }
- Object val = valsIter != null ? valsIter.next() : null;
+ Iterator<?> valsIter = vals != null ? vals.iterator() : null;
- if (val == null && op != DELETE)
- throw new NullPointerException("Null value.");
+ boolean intercept = ctx.config().getInterceptor() != null;
- KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
+ for (K key : keys) {
+ if (key == null)
+ throw new NullPointerException("Null key.");
- if (op == UPDATE) {
- val = ctx.toCacheObject(val);
+ Object val = valsIter != null ? valsIter.next() : null;
- ctx.validateKeyAndValue(cacheKey, (CacheObject)val);
- }
- else if (op == TRANSFORM)
- ctx.kernalContext().resource().inject(val, GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR, ctx.name());
+ if (val == null && op != DELETE)
+ throw new NullPointerException("Null value.");
- while (true) {
- GridCacheEntryEx entry = null;
+ KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
- try {
- entry = entryEx(cacheKey);
-
- GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
- ver,
- val == null ? DELETE : op,
- val,
- invokeArgs,
- writeThrough,
- readThrough,
- retval,
- keepBinary,
- expiryPlc,
- true,
- true,
- filters,
- intercept,
- subjId,
- taskName);
+ if (op == UPDATE) {
+ val = ctx.toCacheObject(val);
- if (op == TRANSFORM) {
- if (t.get3() != null) {
- Map<K, EntryProcessorResult> computedMap;
+ ctx.validateKeyAndValue(cacheKey, (CacheObject)val);
+ }
+ else if (op == TRANSFORM)
+ ctx.kernalContext().resource().inject(val, GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR, ctx.name());
- if (res == null) {
- computedMap = U.newHashMap(keys.size());
+ while (true) {
+ GridCacheEntryEx entry = null;
- res = new IgniteBiTuple<>(true, computedMap);
- }
- else
- computedMap = (Map<K, EntryProcessorResult>)res.get2();
+ try {
+ entry = entryEx(cacheKey);
- computedMap.put(key, t.get3());
+ GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
+ ver,
+ val == null ? DELETE : op,
+ val,
+ invokeArgs,
+ writeThrough,
+ readThrough,
+ retval,
+ keepBinary,
+ expiryPlc,
+ true,
+ true,
+ filters,
+ intercept,
+ subjId,
+ taskName);
+
+ if (op == TRANSFORM) {
+ if (t.get3() != null) {
+ Map<K, EntryProcessorResult> computedMap;
+
+ if (res == null) {
+ computedMap = U.newHashMap(keys.size());
+
+ res = new IgniteBiTuple<>(true, computedMap);
+ }
+ else
+ computedMap = (Map<K, EntryProcessorResult>)res.get2();
+
+ computedMap.put(key, t.get3());
+ }
}
- }
- else if (res == null)
- res = new T2(t.get1(), t.get2());
+ else if (res == null)
+ res = new T2(t.get1(), t.get2());
- break; // While.
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry while updating (will retry): " + key);
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry while updating (will retry): " + key);
- entry = null;
- }
- catch (IgniteCheckedException e) {
- if (err == null)
- err = partialUpdateException();
+ entry = null;
+ }
+ catch (IgniteCheckedException e) {
+ if (err == null)
+ err = partialUpdateException();
- err.add(F.asList(key), e);
+ err.add(F.asList(key), e);
- U.error(log, "Failed to update key : " + key, e);
+ U.error(log, "Failed to update key : " + key, e);
- break;
- }
- finally {
- if (entry != null)
- ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
+ break;
+ }
+ finally {
+ if (entry != null)
+ ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
+ }
}
}
}
+ finally {
+ ctx.shared().database().checkpointReadUnlock();
+ }
if (err != null)
throw err;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index c0e59bc..3c2842f 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1850,7 +1850,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (tag != null) {
tmpWriteBuf.rewind();
- PageStore store = storeMgr.writeInternal(fullId.groupId(), fullId.pageId(), tmpWriteBuf, tag);
+ PageStore store = storeMgr.writeInternal(fullId.groupId(), fullId.pageId(), tmpWriteBuf, tag, true);
tmpWriteBuf.rewind();
@@ -2640,6 +2640,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
fullId, tmpWriteBuf, persStoreMetrics.metricsEnabled() ? tracker : null);
if (tag != null) {
+ assert PageIO.getType(tmpWriteBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId());
+ assert PageIO.getVersion(tmpWriteBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId());
+
tmpWriteBuf.rewind();
if (persStoreMetrics.metricsEnabled()) {
@@ -2661,9 +2664,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
tmpWriteBuf.rewind();
- PageIO.setCrc(writeAddr, 0);
-
- PageStore store = storeMgr.writeInternal(grpId, fullId.pageId(), tmpWriteBuf, tag);
+ PageStore store = storeMgr.writeInternal(grpId, fullId.pageId(), tmpWriteBuf, tag, false);
updStores.add(store);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index cfa1829..e818b00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -922,6 +922,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
reuseRoot.pageId().pageId(),
reuseRoot.isAllocated()) {
@Override protected long allocatePageNoReuse() throws IgniteCheckedException {
+ assert grp.shared().database().checkpointLockIsHeldByThread();
+
return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA);
}
};
@@ -938,6 +940,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
treeRoot.pageId().pageId(),
treeRoot.isAllocated()) {
@Override protected long allocatePageNoReuse() throws IgniteCheckedException {
+ assert grp.shared().database().checkpointLockIsHeldByThread();
+
return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA);
}
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
index 2051021..ad2f731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
@@ -107,6 +107,8 @@ public class RowStore {
* @return {@code True} if was able to update row.
*/
public boolean updateRow(long link, CacheDataRow row) throws IgniteCheckedException {
+ assert !persistenceEnabled || ctx.database().checkpointLockIsHeldByThread();
+
return freeList.updateDataRow(link, row);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index 408240c..47f1d4d 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -433,7 +433,7 @@ public class FilePageStore implements PageStore {
}
/** {@inheritDoc} */
- @Override public void write(long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException {
+ @Override public void write(long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) throws IgniteCheckedException {
init();
lock.readLock().lock();
@@ -450,13 +450,20 @@ public class FilePageStore implements PageStore {
assert pageBuf.capacity() == pageSize;
assert pageBuf.position() == 0;
assert pageBuf.order() == ByteOrder.nativeOrder();
- assert PageIO.getCrc(pageBuf) == 0 : U.hexLong(pageId);
+ assert PageIO.getType(pageBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(pageId);
+ assert PageIO.getVersion(pageBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(pageId);
- int crc32 = skipCrc ? 0 : PureJavaCrc32.calcCrc32(pageBuf, pageSize);
+ if (calculateCrc && !skipCrc) {
+ assert PageIO.getCrc(pageBuf) == 0 : U.hexLong(pageId);
- PageIO.setCrc(pageBuf, crc32);
+ PageIO.setCrc(pageBuf, calcCrc32(pageBuf, pageSize));
+ }
- pageBuf.position(0);
+ // Check whether crc was calculated somewhere above the stack if it is forcibly skipped.
+ assert skipCrc || PageIO.getCrc(pageBuf) != 0 || calcCrc32(pageBuf, pageSize) == 0 :
+ "CRC hasn't been calculated, crc=0";
+
+ assert pageBuf.position() == 0 : pageBuf.position();
int len = pageSize;
@@ -480,6 +487,21 @@ public class FilePageStore implements PageStore {
}
}
+ /**
+ * @param pageBuf Page buffer.
+ * @param pageSize Page size.
+ */
+ private static int calcCrc32(ByteBuffer pageBuf, int pageSize) {
+ try {
+ pageBuf.position(0);
+
+ return PureJavaCrc32.calcCrc32(pageBuf, pageSize);
+ }
+ finally {
+ pageBuf.position(0);
+ }
+ }
+
/** {@inheritDoc} */
@Override public long pageOffset(long pageId) {
return (long) PageIdUtils.pageIndex(pageId) * pageSize + headerSize();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 1fe22ca..66af0dd 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -291,7 +291,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
/** {@inheritDoc} */
@Override public void write(int grpId, long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException {
- writeInternal(grpId, pageId, pageBuf, tag);
+ writeInternal(grpId, pageId, pageBuf, tag, true);
}
/** {@inheritDoc} */
@@ -306,15 +306,16 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
* @param pageId Page ID.
* @param pageBuf Page buffer.
* @param tag Partition tag (growing 1-based partition file version). Used to validate page is not outdated
+ * @param calculateCrc if {@code False} crc calculation will be forcibly skipped.
* @return PageStore to which the page has been written.
* @throws IgniteCheckedException If IO error occurred.
*/
- public PageStore writeInternal(int cacheId, long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException {
+ public PageStore writeInternal(int cacheId, long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) throws IgniteCheckedException {
int partId = PageIdUtils.partId(pageId);
PageStore store = getStore(cacheId, partId);
- store.write(pageId, pageBuf, tag);
+ store.write(pageId, pageBuf, tag, calculateCrc);
return store;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index 3014099..41de7f0 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -173,7 +173,7 @@ public class PageMemoryImpl implements PageMemoryEx {
private final int sysPageSize;
/** Shared context. */
- private final GridCacheSharedContext<?, ?> sharedCtx;
+ private final GridCacheSharedContext<?, ?> ctx;
/** State checker. */
private final CheckpointLockStateChecker stateChecker;
@@ -236,7 +236,7 @@ public class PageMemoryImpl implements PageMemoryEx {
/**
* @param directMemoryProvider Memory allocator to use.
- * @param sharedCtx Cache shared context.
+ * @param ctx Cache shared context.
* @param pageSize Page size.
* @param flushDirtyPage Callback invoked when a dirty page is evicted.
* @param changeTracker Callback invoked to track changes in pages.
@@ -245,7 +245,7 @@ public class PageMemoryImpl implements PageMemoryEx {
public PageMemoryImpl(
DirectMemoryProvider directMemoryProvider,
long[] sizes,
- GridCacheSharedContext<?, ?> sharedCtx,
+ GridCacheSharedContext<?, ?> ctx,
int pageSize,
GridInClosure3X<FullPageId, ByteBuffer, Integer> flushDirtyPage,
GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker,
@@ -253,11 +253,11 @@ public class PageMemoryImpl implements PageMemoryEx {
DataRegionMetricsImpl memMetrics,
boolean throttleEnabled
) {
- assert sharedCtx != null;
+ assert ctx != null;
- log = sharedCtx.logger(PageMemoryImpl.class);
+ log = ctx.logger(PageMemoryImpl.class);
- this.sharedCtx = sharedCtx;
+ this.ctx = ctx;
this.directMemoryProvider = directMemoryProvider;
this.sizes = sizes;
this.flushDirtyPage = flushDirtyPage;
@@ -265,8 +265,8 @@ public class PageMemoryImpl implements PageMemoryEx {
this.stateChecker = stateChecker;
this.throttleEnabled = throttleEnabled;
- storeMgr = sharedCtx.pageStore();
- walMgr = sharedCtx.wal();
+ storeMgr = ctx.pageStore();
+ walMgr = ctx.wal();
assert storeMgr != null;
assert walMgr != null;
@@ -336,15 +336,15 @@ public class PageMemoryImpl implements PageMemoryEx {
*
*/
private void initWriteThrottle() {
- if (!(sharedCtx.database() instanceof GridCacheDatabaseSharedManager)) {
+ if (!(ctx.database() instanceof GridCacheDatabaseSharedManager)) {
log.error("Write throttle can't start. Unexpected class of database manager: " +
- sharedCtx.database().getClass());
+ ctx.database().getClass());
throttleEnabled = false;
}
if (throttleEnabled)
- writeThrottle = new PagesWriteThrottle(this, (GridCacheDatabaseSharedManager)sharedCtx.database());
+ writeThrottle = new PagesWriteThrottle(this, (GridCacheDatabaseSharedManager)ctx.database());
}
/** {@inheritDoc} */
@@ -423,6 +423,8 @@ public class PageMemoryImpl implements PageMemoryEx {
flags == PageIdAllocator.FLAG_IDX && partId == PageIdAllocator.INDEX_PARTITION :
"flags = " + flags + ", partId = " + partId;
+ assert ctx.database().checkpointLockIsHeldByThread();
+
long pageId = storeMgr.allocatePage(cacheId, partId, flags);
memMetrics.incrementTotalAllocatedPages();
@@ -441,7 +443,19 @@ public class PageMemoryImpl implements PageMemoryEx {
boolean isTrackingPage = trackingIO.trackingPageFor(pageId, pageSize()) == pageId;
try {
- long relPtr = seg.borrowOrAllocateFreePage(pageId);
+ long relPtr = seg.loadedPages.get(
+ cacheId,
+ PageIdUtils.effectivePageId(pageId),
+ seg.partTag(cacheId, partId),
+ INVALID_REL_PTR,
+ OUTDATED_REL_PTR
+ );
+
+ if (relPtr == OUTDATED_REL_PTR)
+ relPtr = refreshOutdatedPage(seg, cacheId, pageId, false);
+
+ if (relPtr == INVALID_REL_PTR)
+ relPtr = seg.borrowOrAllocateFreePage(pageId);
if (relPtr == INVALID_REL_PTR)
relPtr = seg.evictPage();
@@ -470,8 +484,8 @@ public class PageMemoryImpl implements PageMemoryEx {
if (PageIO.getType(pageAddr) == 0) {
trackingIO.initNewPage(pageAddr, pageId, pageSize());
- if (!sharedCtx.wal().isAlwaysWriteFullPages())
- sharedCtx.wal().log(
+ if (!ctx.wal().isAlwaysWriteFullPages())
+ ctx.wal().log(
new InitNewPageRecord(
cacheId,
pageId,
@@ -480,7 +494,7 @@ public class PageMemoryImpl implements PageMemoryEx {
)
);
else
- sharedCtx.wal().log(new PageSnapshot(fullId, absPtr + PAGE_OVERHEAD, pageSize()));
+ ctx.wal().log(new PageSnapshot(fullId, absPtr + PAGE_OVERHEAD, pageSize()));
}
}
@@ -991,6 +1005,9 @@ public class PageMemoryImpl implements PageMemoryEx {
PageHeader.releasePage(absPtr);
}
+ assert PageIO.getType(tmpBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId());
+ assert PageIO.getVersion(tmpBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId());
+
return true;
}
finally {
@@ -1229,6 +1246,9 @@ public class PageMemoryImpl implements PageMemoryEx {
pageSize()
);
+ assert PageIO.getType(tmpAbsPtr + PAGE_OVERHEAD) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId());
+ assert PageIO.getVersion(tmpAbsPtr + PAGE_OVERHEAD) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId());
+
PageHeader.dirty(absPtr, false);
PageHeader.tempBufferPointer(absPtr, tmpRelPtr);
@@ -1269,6 +1289,10 @@ public class PageMemoryImpl implements PageMemoryEx {
long pageId = PageIO.getPageId(page + PAGE_OVERHEAD);
+ assert pageId != 0 : U.hexLong(PageHeader.readPageId(page));
+ assert PageIO.getVersion(page + PAGE_OVERHEAD) != 0 : U.hexLong(pageId);
+ assert PageIO.getType(page + PAGE_OVERHEAD) != 0 : U.hexLong(pageId);
+
try {
rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, PageIdUtils.tag(pageId));
@@ -1390,6 +1414,8 @@ public class PageMemoryImpl implements PageMemoryEx {
boolean wasDirty = PageHeader.dirty(absPtr, dirty);
if (dirty) {
+ assert ctx.database().checkpointLockIsHeldByThread();
+
if (!wasDirty || forceAdd) {
boolean added = segment(pageId.groupId(), pageId.pageId()).dirtyPages.add(pageId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index afa3fd7..f2bfa41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -77,6 +77,8 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
this.rowStore = rowStore;
this.grp = grp;
+ assert !grp.dataRegion().config().isPersistenceEnabled() || grp.shared().database().checkpointLockIsHeldByThread();
+
initTree(initNew);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java
index a6ec6e7..0b1c931 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java
@@ -64,6 +64,8 @@ public class PendingEntriesTree extends BPlusTree<PendingRow, PendingRow> {
this.grp = grp;
+ assert !grp.dataRegion().config().isPersistenceEnabled() || grp.shared().database().checkpointLockIsHeldByThread();
+
initTree(initNew);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/test/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoLoadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoLoadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoLoadSelfTest.java
index 3b9e393..0f3bf9c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoLoadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoLoadSelfTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -47,6 +48,9 @@ public class PageMemoryNoLoadSelfTest extends GridCommonAbstractTest {
/** */
private static final int MAX_MEMORY_SIZE = 10 * 1024 * 1024;
+ /** */
+ private static final PageIO PAGE_IO = new DummyPageIO();
+
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "pagemem", false));
@@ -226,6 +230,8 @@ public class PageMemoryNoLoadSelfTest extends GridCommonAbstractTest {
assertNotNull(pageAddr);
try {
+ PAGE_IO.initNewPage(pageAddr, id.pageId(), mem.pageSize());
+
long updId = PageIdUtils.rotatePageId(id.pageId());
PageIO.setPageId(pageAddr, updId);
@@ -334,7 +340,7 @@ public class PageMemoryNoLoadSelfTest extends GridCommonAbstractTest {
long pageAddr = mem.writeLock(-1, pageId, page);
try {
- PageIO.setPageId(pageAddr, pageId);
+ PAGE_IO.initNewPage(pageAddr, pageId, mem.pageSize());
for (int i = PageIO.COMMON_HEADER_END; i < PAGE_SIZE; i++)
PageUtils.putByte(pageAddr, i, (byte)val);
@@ -355,7 +361,7 @@ public class PageMemoryNoLoadSelfTest extends GridCommonAbstractTest {
long pageAddr = mem.readLock(-1, pageId, page);
- assert(pageAddr != 0);
+ assert pageAddr != 0;
try {
for (int i = PageIO.COMMON_HEADER_END; i < PAGE_SIZE; i++) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java
new file mode 100644
index 0000000..1b36ac1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.GridStringBuilder;
+
+/**
+ * Dummy PageIO implementation. For test purposes only.
+ */
+public class DummyPageIO extends PageIO {
+ /** */
+ public DummyPageIO() {
+ super(2 * Short.MAX_VALUE, 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException {
+ sb.a("DummyPageIO [\n");
+ sb.a("addr=").a(addr).a(", ");
+ sb.a("pageSize=").a(addr);
+ sb.a("\n]");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
index 27b1950..fa89bf2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
@@ -221,11 +221,6 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
checkRebalancingDuringLoad(10, 500, 8, 16);
}
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return TimeUnit.MINUTES.toMillis(3);
- }
-
/**
* @throws Exception if failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
index 9369443..8e20585 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
@@ -142,20 +142,33 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract
PageMemory mem = sharedCtx.database().dataRegion(policyName).pageMemory();
+ DummyPageIO pageIO = new DummyPageIO();
+
int cacheId = sharedCtx.cache().cache(cacheName).context().cacheId();
FullPageId[] pages = new FullPageId[totalPages];
- for (int i = 0; i < totalPages; i++)
- pages[i] = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId);
+ // Get lock to prevent assertion. A new page should be allocated under checkpoint lock.
+ psMgr.checkpointReadLock();
- generateWal(
- (PageMemoryImpl)mem,
- sharedCtx.pageStore(),
- sharedCtx.wal(),
- cacheId,
- pages
- );
+ try {
+ for (int i = 0; i < totalPages; i++) {
+ pages[i] = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId);
+
+ initPage(mem, pageIO, pages[i]);
+ }
+
+ generateWal(
+ (PageMemoryImpl)mem,
+ sharedCtx.pageStore(),
+ sharedCtx.wal(),
+ cacheId,
+ pages
+ );
+ }
+ finally {
+ psMgr.checkpointReadUnlock();
+ }
eraseDataFromDisk(pageStore, cacheId, pages[0]);
@@ -169,6 +182,31 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract
}
/**
+ * Initializes page.
+ * @param mem page memory implementation.
+ * @param pageIO page io implementation.
+ * @param fullId full page id.
+ * @throws IgniteCheckedException if error occurs.
+ */
+ private void initPage(PageMemory mem, PageIO pageIO, FullPageId fullId) throws IgniteCheckedException {
+ long page = mem.acquirePage(fullId.groupId(), fullId.pageId());
+
+ try {
+ final long pageAddr = mem.writeLock(fullId.groupId(), fullId.pageId(), page);
+
+ try {
+ pageIO.initNewPage(pageAddr, fullId.pageId(), mem.pageSize());
+ }
+ finally {
+ mem.writeUnlock(fullId.groupId(), fullId.pageId(), page, null, true);
+ }
+ }
+ finally {
+ mem.releasePage(fullId.groupId(), fullId.pageId(), page);
+ }
+ }
+
+ /**
* @param pageStore Page store.
* @param cacheId Cache id.
* @param page Page.
@@ -207,21 +245,28 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract
PageMemory mem = shared.database().dataRegion(null).pageMemory();
- for (FullPageId fullId : pages) {
- long page = mem.acquirePage(fullId.groupId(), fullId.pageId());
+ dbMgr.checkpointReadLock();
- try {
- long pageAddr = mem.readLock(fullId.groupId(), fullId.pageId(), page);
+ try {
+ for (FullPageId fullId : pages) {
+ long page = mem.acquirePage(fullId.groupId(), fullId.pageId());
+
+ try {
+ long pageAddr = mem.readLock(fullId.groupId(), fullId.pageId(), page);
- for (int j = PageIO.COMMON_HEADER_END; j < mem.pageSize(); j += 4)
- assertEquals(j + (int)fullId.pageId(), PageUtils.getInt(pageAddr, j));
+ for (int j = PageIO.COMMON_HEADER_END; j < mem.pageSize(); j += 4)
+ assertEquals(j + (int)fullId.pageId(), PageUtils.getInt(pageAddr, j));
- mem.readUnlock(fullId.groupId(), fullId.pageId(), page);
- }
- finally {
- mem.releasePage(fullId.groupId(), fullId.pageId(), page);
+ mem.readUnlock(fullId.groupId(), fullId.pageId(), page);
+ }
+ finally {
+ mem.releasePage(fullId.groupId(), fullId.pageId(), page);
+ }
}
}
+ finally {
+ dbMgr.checkpointReadUnlock();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
index 5ae8969..0dd9153 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
@@ -47,7 +47,9 @@ import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
@@ -519,10 +521,12 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
PageMemoryEx mem = (PageMemoryEx) dbMgr.dataRegion(null).pageMemory();
- ig.context().cache().context().database().checkpointReadLock();
-
FullPageId[] pageIds = new FullPageId[100];
+ DummyPageIO pageIO = new DummyPageIO();
+
+ ig.context().cache().context().database().checkpointReadLock();
+
try {
for (int i = 0; i < pageIds.length; i++)
pageIds[i] = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId);
@@ -535,9 +539,9 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
long pageAddr = mem.writeLock(fullId.groupId(), fullId.pageId(), page);
- PageIO.setPageId(pageAddr, fullId.pageId());
-
try {
+ pageIO.initNewPage(pageAddr, fullId.pageId(), mem.pageSize());
+
assertTrue(mem.isDirty(fullId.groupId(), fullId.pageId(), page));
}
finally {
@@ -737,8 +741,22 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
Set<FullPageId> allocated = new HashSet<>();
+ IgniteCacheDatabaseSharedManager db = ig.context().cache().context().database();
+
+ PageIO pageIO = new DummyPageIO();
+
for (int i = 0; i < TOTAL_PAGES; i++) {
- FullPageId fullId = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId);
+ FullPageId fullId;
+
+ db.checkpointReadLock();
+ try {
+ fullId = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId);
+
+ initPage(mem, pageIO, fullId);
+ }
+ finally {
+ db.checkpointReadUnlock();
+ }
resMap.put(fullId, -1);
@@ -982,4 +1000,29 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
private void deleteWorkFiles() throws IgniteCheckedException {
deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false));
}
+
+ /**
+ * Initializes page.
+ * @param mem page memory implementation.
+ * @param pageIO page io implementation.
+ * @param fullId full page id.
+ * @throws IgniteCheckedException if error occurs.
+ */
+ private void initPage(PageMemory mem, PageIO pageIO, FullPageId fullId) throws IgniteCheckedException {
+ long page = mem.acquirePage(fullId.groupId(), fullId.pageId());
+
+ try {
+ final long pageAddr = mem.writeLock(fullId.groupId(), fullId.pageId(), page);
+
+ try {
+ pageIO.initNewPage(pageAddr, fullId.pageId(), mem.pageSize());
+ }
+ finally {
+ mem.writeUnlock(fullId.groupId(), fullId.pageId(), page, null, true);
+ }
+ }
+ finally {
+ mem.releasePage(fullId.groupId(), fullId.pageId(), page);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java
index 47a4b7b..1b86e3d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -138,6 +139,8 @@ public class IgnitePdsEvictionTest extends GridCommonAbstractTest {
IgniteCacheDatabaseSharedManager db = ignite.context().cache().context().database();
+ PageIO pageIO = new DummyPageIO();
+
// Allocate.
for (int i = 0; i < size; i++) {
db.checkpointReadLock();
@@ -145,6 +148,8 @@ public class IgnitePdsEvictionTest extends GridCommonAbstractTest {
final FullPageId fullId = new FullPageId(memory.allocatePage(cacheId, i % 256, PageMemory.FLAG_DATA),
cacheId);
+ initPage(memory, pageIO, fullId);
+
pageIds.add(fullId);
}
finally {
@@ -180,6 +185,31 @@ public class IgnitePdsEvictionTest extends GridCommonAbstractTest {
}
/**
+ * Initializes page.
+ * @param mem page memory implementation.
+ * @param pageIO page io implementation.
+ * @param fullId full page id.
+ * @throws IgniteCheckedException if error occurs.
+ */
+ private void initPage(PageMemory mem, PageIO pageIO, FullPageId fullId) throws IgniteCheckedException {
+ long page = mem.acquirePage(fullId.groupId(), fullId.pageId());
+
+ try {
+ final long pageAddr = mem.writeLock(fullId.groupId(), fullId.pageId(), page);
+
+ try {
+ pageIO.initNewPage(pageAddr, fullId.pageId(), mem.pageSize());
+ }
+ finally {
+ mem.writeUnlock(fullId.groupId(), fullId.pageId(), page, null, true);
+ }
+ }
+ finally {
+ mem.releasePage(fullId.groupId(), fullId.pageId(), page);
+ }
+ }
+
+ /**
* @param start Start index.
* @param end End index.
* @param memory PageMemory.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index edfaecf..53d5de5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
@@ -109,26 +110,35 @@ public class H2TreeIndex extends GridH2IndexBase {
segments = new H2Tree[segmentsCnt];
+ IgniteCacheDatabaseSharedManager db = cctx.shared().database();
+
for (int i = 0; i < segments.length; i++) {
- RootPage page = getMetaPage(name, i);
-
- segments[i] = new H2Tree(
- name,
- cctx.offheap().reuseListForIndex(name),
- cctx.groupId(),
- cctx.dataRegion().pageMemory(),
- cctx.shared().wal(),
- cctx.offheap().globalRemoveId(),
- tbl.rowFactory(),
- page.pageId().pageId(),
- page.isAllocated(),
- cols,
- inlineIdxs,
- computeInlineSize(inlineIdxs, inlineSize)) {
- @Override public int compareValues(Value v1, Value v2) {
- return v1 == v2 ? 0 : table.compareTypeSafe(v1, v2);
- }
- };
+ db.checkpointReadLock();
+
+ try {
+ RootPage page = getMetaPage(name, i);
+
+ segments[i] = new H2Tree(
+ name,
+ cctx.offheap().reuseListForIndex(name),
+ cctx.groupId(),
+ cctx.dataRegion().pageMemory(),
+ cctx.shared().wal(),
+ cctx.offheap().globalRemoveId(),
+ tbl.rowFactory(),
+ page.pageId().pageId(),
+ page.isAllocated(),
+ cols,
+ inlineIdxs,
+ computeInlineSize(inlineIdxs, inlineSize)) {
+ @Override public int compareValues(Value v1, Value v2) {
+ return v1 == v2 ? 0 : table.compareTypeSafe(v1, v2);
+ }
+ };
+ }
+ finally {
+ db.checkpointReadUnlock();
+ }
}
}
else {
@@ -202,6 +212,8 @@ public class H2TreeIndex extends GridH2IndexBase {
H2Tree tree = treeForRead(seg);
+ assert cctx.shared().database().checkpointLockIsHeldByThread();
+
return tree.put(row);
}
catch (IgniteCheckedException e) {
@@ -221,6 +233,8 @@ public class H2TreeIndex extends GridH2IndexBase {
H2Tree tree = treeForRead(seg);
+ assert cctx.shared().database().checkpointLockIsHeldByThread();
+
return tree.putx(row);
}
catch (IgniteCheckedException e) {
@@ -240,6 +254,8 @@ public class H2TreeIndex extends GridH2IndexBase {
H2Tree tree = treeForRead(seg);
+ assert cctx.shared().database().checkpointLockIsHeldByThread();
+
return tree.remove(row);
}
catch (IgniteCheckedException e) {
@@ -259,6 +275,8 @@ public class H2TreeIndex extends GridH2IndexBase {
H2Tree tree = treeForRead(seg);
+ assert cctx.shared().database().checkpointLockIsHeldByThread();
+
return tree.removex(row);
}
catch (IgniteCheckedException e) {
@@ -326,6 +344,8 @@ public class H2TreeIndex extends GridH2IndexBase {
@Override public void destroy(boolean rmvIndex) {
try {
if (cctx.affinityNode() && rmvIndex) {
+ assert cctx.shared().database().checkpointLockIsHeldByThread();
+
for (int i = 0; i < segments.length; i++) {
H2Tree tree = segments[i];