You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2019/12/04 15:32:53 UTC
[ignite] branch master updated: Revert "IGNITE-11704 Write
tombstones during rebalance to get rid of deferred delete buffer" (#7100)
This is an automated email from the ASF dual-hosted git repository.
mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9265c04 Revert "IGNITE-11704 Write tombstones during rebalance to get rid of deferred delete buffer" (#7100)
9265c04 is described below
commit 9265c04a368c4cf0fc331aac5a71f7d0db365ea8
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Wed Dec 4 18:32:34 2019 +0300
Revert "IGNITE-11704 Write tombstones during rebalance to get rid of deferred delete buffer" (#7100)
This reverts commit ce9f593495a6c9c89311aa1608ffda7fe92d0aa0.
---
.../communication/GridIoMessageFactory.java | 6 -
.../apache/ignite/internal/pagemem/PageUtils.java | 4 +-
.../internal/pagemem/wal/record/WALRecord.java | 5 +-
.../delta/MetaPageUpdatePartitionDataRecord.java | 19 +-
.../delta/MetaPageUpdatePartitionDataRecordV2.java | 22 +-
.../delta/MetaPageUpdatePartitionDataRecordV3.java | 108 ------
.../processors/cache/CacheGroupContext.java | 19 -
.../processors/cache/CacheGroupMetricsImpl.java | 23 +-
.../internal/processors/cache/CacheObject.java | 3 -
.../processors/cache/GridCacheContext.java | 9 +-
.../processors/cache/GridCacheMapEntry.java | 319 +++++++---------
.../cache/IgniteCacheOffheapManager.java | 93 +----
.../cache/IgniteCacheOffheapManagerImpl.java | 395 ++++----------------
.../processors/cache/IncompleteCacheObject.java | 19 -
.../processors/cache/IncompleteObject.java | 2 +-
.../processors/cache/PartitionUpdateCounter.java | 4 +-
.../processors/cache/TombstoneCacheObject.java | 94 -----
.../binary/CacheObjectBinaryProcessorImpl.java | 6 +-
.../dht/topology/GridDhtLocalPartition.java | 293 +++++----------
.../dht/topology/GridDhtPartitionTopologyImpl.java | 5 +-
.../dht/topology/PartitionsEvictManager.java | 366 ++++--------------
.../processors/cache/persistence/CacheDataRow.java | 5 -
.../cache/persistence/CacheDataRowAdapter.java | 61 +--
.../cache/persistence/GridCacheOffheapManager.java | 369 +++++++-----------
.../IgniteCacheDatabaseSharedManager.java | 104 +-----
.../persistence/tree/io/PagePartitionMetaIO.java | 29 +-
.../persistence/tree/io/PagePartitionMetaIOV2.java | 25 +-
.../wal/serializer/RecordDataV1Serializer.java | 11 -
.../internal/processors/cache/tree/DataRow.java | 6 +-
.../processors/metric/impl/MetricUtils.java | 9 -
.../cache/CacheDeferredDeleteSanitySelfTest.java | 6 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 7 +-
.../IgniteCacheConfigVariationsFullApiTest.java | 2 +-
.../CacheRemoveWithTombstonesLoadTest.java | 414 ---------------------
.../distributed/CacheRemoveWithTombstonesTest.java | 289 --------------
.../CacheRemoveWithTombstonesFailoverTest.java | 187 ----------
.../DropCacheContextDuringEvictionTest.java | 24 +-
.../PartitionsEvictManagerAbstractTest.java | 113 +++---
.../PartitionsEvictionTaskFailureHandlerTest.java | 72 +---
.../processors/database/CacheFreeListSelfTest.java | 5 -
.../testsuites/IgniteCacheMvccTestSuite9.java | 18 +-
.../ignite/testsuites/IgniteCacheTestSuite9.java | 7 -
.../query/h2/database/H2PkHashIndex.java | 4 +-
.../processors/query/h2/opt/H2CacheRow.java | 5 -
44 files changed, 669 insertions(+), 2917 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index b5ae4f6..d8d62d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
-import org.apache.ignite.internal.processors.cache.TombstoneCacheObject;
import org.apache.ignite.internal.processors.cache.WalStateAckMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
@@ -1167,11 +1166,6 @@ public class GridIoMessageFactory implements MessageFactory {
break;
- case 176:
- msg = TombstoneCacheObject.INSTANCE;
-
- break;
-
// [-3..119] [124..129] [-23..-28] [-36..-55] - this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java
index 0b9b1b4..217164c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java
@@ -56,8 +56,8 @@ public class PageUtils {
*/
public static byte[] getBytes(long addr, int off, int len) {
assert addr > 0 : addr;
- assert off >= 0 : off;
- assert len >= 0 : len;
+ assert off >= 0;
+ assert len >= 0;
byte[] bytes = new byte[len];
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 9dca94d..ea884db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -221,10 +221,7 @@ public abstract class WALRecord {
ROLLBACK_TX_RECORD (57, LOGICAL),
/** Partition meta page containing update counter gaps. */
- PARTITION_META_PAGE_UPDATE_COUNTERS_V2 (58, PHYSICAL),
-
- /** Partition meta page containing tombstone presence flag. */
- PARTITION_META_PAGE_UPDATE_COUNTERS_V3 (60, PHYSICAL);
+ PARTITION_META_PAGE_UPDATE_COUNTERS_V2 (58, PHYSICAL);
/** Index for serialization. Should be consistent throughout all versions. */
private final int idx;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
index 0c9f7fc..3e2b67b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
@@ -46,7 +46,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord {
private int allocatedIdxCandidate;
/** */
- private long cacheSizesPageId;
+ private long cntrsPageId;
/**
* @param grpId Cache group ID.
@@ -59,10 +59,9 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord {
long updateCntr,
long globalRmvId,
int partSize,
- long cacheSizesPageId,
+ long cntrsPageId,
byte state,
- int allocatedIdxCandidate
- ) {
+ int allocatedIdxCandidate) {
super(grpId, pageId);
this.updateCntr = updateCntr;
@@ -70,7 +69,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord {
this.partSize = partSize;
this.state = state;
this.allocatedIdxCandidate = allocatedIdxCandidate;
- this.cacheSizesPageId = cacheSizesPageId;
+ this.cntrsPageId = cntrsPageId;
}
/**
@@ -82,7 +81,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord {
this.updateCntr = in.readLong();
this.globalRmvId = in.readLong();
this.partSize = in.readInt();
- this.cacheSizesPageId = in.readLong();
+ this.cntrsPageId = in.readLong();
this.state = in.readByte();
this.allocatedIdxCandidate = in.readInt();
}
@@ -111,8 +110,8 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord {
/**
* @return Partition size.
*/
- public long cacheSizesPageId() {
- return cacheSizesPageId;
+ public long countersPageId() {
+ return cntrsPageId;
}
/**
@@ -129,7 +128,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord {
io.setUpdateCounter(pageAddr, updateCntr);
io.setGlobalRemoveId(pageAddr, globalRmvId);
io.setSize(pageAddr, partSize);
- io.setSizesPageId(pageAddr, cacheSizesPageId);
+ io.setCountersPageId(pageAddr, cntrsPageId);
io.setPartitionState(pageAddr, state);
io.setCandidatePageCount(pageAddr, allocatedIdxCandidate);
}
@@ -151,7 +150,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord {
buf.putLong(updateCounter());
buf.putLong(globalRemoveId());
buf.putInt(partitionSize());
- buf.putLong(cacheSizesPageId());
+ buf.putLong(countersPageId());
buf.put(state());
buf.putInt(allocatedIndexCandidate());
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecordV2.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecordV2.java
index a8a8597..ab3ccf8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecordV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecordV2.java
@@ -28,12 +28,11 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageParti
import org.apache.ignite.internal.util.typedef.internal.S;
/**
- * Partition meta page delta record.
- * Contains reference to update counters gaps.
+ *
*/
public class MetaPageUpdatePartitionDataRecordV2 extends MetaPageUpdatePartitionDataRecord {
/** */
- private long gapsLink;
+ private long link;
/**
* @param grpId Group id.
@@ -44,7 +43,7 @@ public class MetaPageUpdatePartitionDataRecordV2 extends MetaPageUpdatePartition
* @param cntrsPageId Cntrs page id.
* @param state State.
* @param allocatedIdxCandidate Allocated index candidate.
- * @param gapsLink Link.
+ * @param link Link.
*/
public MetaPageUpdatePartitionDataRecordV2(
int grpId,
@@ -55,10 +54,9 @@ public class MetaPageUpdatePartitionDataRecordV2 extends MetaPageUpdatePartition
long cntrsPageId,
byte state,
int allocatedIdxCandidate,
- long gapsLink
- ) {
+ long link) {
super(grpId, pageId, updateCntr, globalRmvId, partSize, cntrsPageId, state, allocatedIdxCandidate);
- this.gapsLink = gapsLink;
+ this.link = link;
}
/**
@@ -67,7 +65,7 @@ public class MetaPageUpdatePartitionDataRecordV2 extends MetaPageUpdatePartition
public MetaPageUpdatePartitionDataRecordV2(DataInput in) throws IOException {
super(in);
- this.gapsLink = in.readLong();
+ this.link = in.readLong();
}
/** {@inheritDoc} */
@@ -76,21 +74,21 @@ public class MetaPageUpdatePartitionDataRecordV2 extends MetaPageUpdatePartition
PagePartitionMetaIOV2 io = (PagePartitionMetaIOV2)PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
- io.setGapsLink(pageAddr, gapsLink);
+ io.setGapsLink(pageAddr, link);
}
/**
*
*/
- public long gapsLink() {
- return gapsLink;
+ public long link() {
+ return link;
}
/** {@inheritDoc} */
@Override public void toBytes(ByteBuffer buf) {
super.toBytes(buf);
- buf.putLong(gapsLink());
+ buf.putLong(link());
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecordV3.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecordV3.java
deleted file mode 100644
index 1263c43..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecordV3.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagemem.wal.record.delta;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.PageIdUtils;
-import org.apache.ignite.internal.pagemem.PageMemory;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIOV2;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Partition meta page delta record.
- * Contains information about tombstones count.
- */
-public class MetaPageUpdatePartitionDataRecordV3 extends MetaPageUpdatePartitionDataRecordV2 {
- /** Tombstones count. */
- private long tombstonesCnt;
-
- /**
- * @param grpId Group id.
- * @param pageId Page id.
- * @param updateCntr Update counter.
- * @param globalRmvId Global remove id.
- * @param partSize Partition size.
- * @param cacheSizesPageId Cache sizes page id.
- * @param state State.
- * @param allocatedIdxCandidate Allocated index candidate.
- * @param gapsLink Gaps link.
- * @param tombstonesCnt Tombstones count.
- */
- public MetaPageUpdatePartitionDataRecordV3(
- int grpId,
- long pageId,
- long updateCntr,
- long globalRmvId,
- int partSize,
- long cacheSizesPageId,
- byte state,
- int allocatedIdxCandidate,
- long gapsLink,
- long tombstonesCnt
- ) {
- super(grpId, pageId, updateCntr, globalRmvId, partSize, cacheSizesPageId, state, allocatedIdxCandidate, gapsLink);
- this.tombstonesCnt = tombstonesCnt;
- }
-
- /**
- * @param in In.
- */
- public MetaPageUpdatePartitionDataRecordV3(DataInput in) throws IOException {
- super(in);
-
- this.tombstonesCnt = in.readLong();
- }
-
- /**
- * @return Tombstones count.
- */
- public long tombstonesCount() {
- return tombstonesCnt;
- }
-
- /** {@inheritDoc} */
- @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
- super.applyDelta(pageMem, pageAddr);
-
- PagePartitionMetaIOV2 io = (PagePartitionMetaIOV2) PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
-
- io.setTombstonesCount(pageAddr, tombstonesCnt);
- }
-
- /** {@inheritDoc} */
- @Override public void toBytes(ByteBuffer buf) {
- super.toBytes(buf);
-
- buf.putLong(tombstonesCnt);
- }
-
- /** {@inheritDoc} */
- @Override public RecordType type() {
- return RecordType.PARTITION_META_PAGE_UPDATE_COUNTERS_V3;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MetaPageUpdatePartitionDataRecordV2.class, this, "partId", PageIdUtils.partId(pageId()),
- "super", super.toString());
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index e753edb..ad35570 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -46,8 +46,6 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCach
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
@@ -263,8 +261,6 @@ public class CacheGroupContext {
statHolderIdx = new IoStatisticsHolderIndex(HASH_INDEX, cacheOrGroupName(), HASH_PK_IDX_NAME, mmgr);
statHolderData = new IoStatisticsHolderCache(cacheOrGroupName(), grpId, mmgr);
}
-
- hasAtomicCaches = ccfg.getAtomicityMode() == ATOMIC;
}
/**
@@ -1303,21 +1299,6 @@ public class CacheGroupContext {
}
/**
- * @return {@code True} if need create temporary tombstones entries for removed data.
- */
- public boolean supportsTombstone() {
- return !mvccEnabled && !isLocal();
- }
-
- /**
- * @param part Partition.
- * @return {@code True} if need create tombstone for remove in given partition.
- */
- public boolean shouldCreateTombstone(@Nullable GridDhtLocalPartition part) {
- return part != null && supportsTombstone() && part.state() == GridDhtPartitionState.MOVING;
- }
-
- /**
* @return Metrics.
*/
public CacheGroupMetricsImpl metrics() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java
index fab2e1f..e82e451 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java
@@ -70,7 +70,6 @@ public class CacheGroupMetricsImpl {
private final LongMetric sparseStorageSize;
/** Interface describing a predicate of two integers. */
- @FunctionalInterface
private interface IntBiPredicate {
/**
* Predicate body.
@@ -170,10 +169,6 @@ public class CacheGroupMetricsImpl {
mreg.register("TotalAllocatedSize",
this::getTotalAllocatedSize,
"Total size of memory allocated for group, in bytes.");
-
- mreg.register("Tombstones",
- this::getTombstones,
- "Number of tombstone entries.");
}
/** */
@@ -258,12 +253,20 @@ public class CacheGroupMetricsImpl {
/** */
public int getMinimumNumberOfPartitionCopies() {
- return numberOfPartitionCopies((targetVal, nextVal) -> nextVal < targetVal);
+ return numberOfPartitionCopies(new IntBiPredicate() {
+ @Override public boolean apply(int targetVal, int nextVal) {
+ return nextVal < targetVal;
+ }
+ });
}
/** */
public int getMaximumNumberOfPartitionCopies() {
- return numberOfPartitionCopies((targetVal, nextVal) -> nextVal > targetVal);
+ return numberOfPartitionCopies(new IntBiPredicate() {
+ @Override public boolean apply(int targetVal, int nextVal) {
+ return nextVal > targetVal;
+ }
+ });
}
/**
@@ -459,12 +462,6 @@ public class CacheGroupMetricsImpl {
return sparseStorageSize == null ? 0 : sparseStorageSize.value();
}
- /** */
- public long getTombstones() {
- return ctx.topology().localPartitions().stream()
- .map(part -> part.dataStore().tombstonesCount()).reduce(Long::sum).orElse(0L);
- }
-
/** Removes all metric for cache group. */
public void remove() {
ctx.shared().kernalContext().metric().remove(metricGroupName());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
index 5e89926..f9f384a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
@@ -38,9 +38,6 @@ public interface CacheObject extends Message {
/** */
public static final byte TYPE_BINARY_ENUM = 101;
- /** */
- public static final byte TOMBSTONE = -1;
-
/**
* @param ctx Context.
* @param cpy If {@code true} need to copy value.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index a97d3ee..9ddafb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -625,13 +625,8 @@ public class GridCacheContext<K, V> implements Externalizable {
public void cache(GridCacheAdapter<K, V> cache) {
this.cache = cache;
- if (grp.supportsTombstone() && cache.configuration().getAtomicityMode() == TRANSACTIONAL
- && !store().configured())
- deferredDel = false;
- else {
- deferredDel = (cache.isDht() || cache.isDhtAtomic() || cache.isColocated() ||
- (cache.isNear() && cache.configuration().getAtomicityMode() == ATOMIC));
- }
+ deferredDel = cache.isDht() || cache.isDhtAtomic() || cache.isColocated() ||
+ (cache.isNear() && cache.configuration().getAtomicityMode() == ATOMIC);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 641001f..a064503 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -98,9 +98,9 @@ import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.NotNull;
@@ -1713,19 +1713,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
interceptRes = cctx.config().getInterceptor().onBeforeRemove(entry0);
- if (cctx.cancelRemove(interceptRes))
+ if (cctx.cancelRemove(interceptRes)) {
+ CacheObject ret = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
+
return new GridCacheUpdateTxResult(false, logPtr);
+ }
}
- if (cctx.group().shouldCreateTombstone(localPartition())) {
- cctx.offheap().removeWithTombstone(cctx, key, newVer, localPartition());
-
- // Partition may change his state during remove.
- if (!cctx.group().shouldCreateTombstone(localPartition()))
- removeTombstone0(newVer);
- }
- else
- removeValue();
+ removeValue();
update(null, 0, 0, newVer, true);
@@ -2823,34 +2818,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/**
- * @param tombstoneVer Tombstone version.
- * @throws GridCacheEntryRemovedException If entry was removed.
- * @throws IgniteCheckedException If failed.
- */
- public void removeTombstone(GridCacheVersion tombstoneVer) throws GridCacheEntryRemovedException, IgniteCheckedException {
- lockEntry();
-
- try {
- checkObsolete();
-
- removeTombstone0(tombstoneVer);
- }
- finally {
- unlockEntry();
- }
- }
-
- /**
- * @param tombstoneVer Tombstone version.
- * @throws IgniteCheckedException If failed.
- */
- private void removeTombstone0(GridCacheVersion tombstoneVer) throws IgniteCheckedException {
- RemoveClosure c = new RemoveClosure(this, tombstoneVer);
-
- cctx.offheap().invoke(cctx, key, localPartition(), c);
- }
-
- /**
* @return {@code True} if this entry should not be evicted from cache.
*/
protected boolean evictionDisabled() {
@@ -3370,18 +3337,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean update;
- IgniteBiPredicate<CacheObject, GridCacheVersion> p = new IgniteBiPredicate<CacheObject, GridCacheVersion>() {
- @Override public boolean apply(@Nullable CacheObject val, GridCacheVersion currVer) {
+ IgnitePredicate<CacheDataRow> p = new IgnitePredicate<CacheDataRow>() {
+ @Override public boolean apply(@Nullable CacheDataRow row) {
boolean update0;
- boolean isStartVer = cctx.shared().versions().isStartVersion(currVer);
+ GridCacheVersion currentVer = row != null ? row.version() : GridCacheMapEntry.this.ver;
+
+ boolean isStartVer = cctx.shared().versions().isStartVersion(currentVer);
if (cctx.group().persistenceEnabled()) {
if (!isStartVer) {
if (cctx.atomic())
- update0 = ATOMIC_VER_COMPARATOR.compare(currVer, ver) < 0;
+ update0 = ATOMIC_VER_COMPARATOR.compare(currentVer, ver) < 0;
else
- update0 = currVer.compareTo(ver) < 0;
+ update0 = currentVer.compareTo(ver) < 0;
}
else
update0 = true;
@@ -3389,15 +3358,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else
update0 = isStartVer;
- // Such combination may exist during datastreamer first update.
- update0 |= (!preload && val == null);
+ update0 |= (!preload && deletedUnlocked());
return update0;
}
};
if (unswapped) {
- update = p.apply(this.val, this.ver);
+ update = p.apply(null);
if (update) {
// If entry is already unswapped and we are modifying it, we must run deletion callbacks for old value.
@@ -3428,7 +3396,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// cannot identify whether the entry is exist on the fly
unswap(false);
- if (update = p.apply(this.val, this.ver)) {
+ if (update = p.apply(null)) {
// If entry is already unswapped and we are modifying it, we must run deletion callbacks for old value.
long oldExpTime = expireTimeUnlocked();
long delta = (oldExpTime == 0 ? 0 : oldExpTime - U.currentTimeMillis());
@@ -4288,11 +4256,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param ver New entry version.
* @throws IgniteCheckedException If update failed.
*/
- protected boolean storeValue(
- @Nullable CacheObject val,
+ protected boolean storeValue(@Nullable CacheObject val,
long expireTime,
- GridCacheVersion ver
- ) throws IgniteCheckedException {
+ GridCacheVersion ver) throws IgniteCheckedException {
return storeValue(val, expireTime, ver, null, null);
}
@@ -4302,26 +4268,26 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param val Value.
* @param expireTime Expire time.
* @param ver New entry version.
- * @param p Optional predicate.
+ * @param predicate Optional predicate.
* @param row Pre-created data row, associated with this cache entry.
* @return {@code True} if storage was modified.
* @throws IgniteCheckedException If update failed.
*/
- private boolean storeValue(
+ protected boolean storeValue(
@Nullable CacheObject val,
long expireTime,
GridCacheVersion ver,
- @Nullable IgniteBiPredicate<CacheObject, GridCacheVersion> p,
+ @Nullable IgnitePredicate<CacheDataRow> predicate,
@Nullable CacheDataRow row
) throws IgniteCheckedException {
assert lock.isHeldByCurrentThread();
assert localPartition() == null || localPartition().state() != RENTING : localPartition();
- UpdateClosure c = new UpdateClosure(this, val, ver, expireTime, p, row);
+ UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate, row);
- cctx.offheap().invoke(cctx, key, localPartition(), c);
+ cctx.offheap().invoke(cctx, key, localPartition(), closure);
- return c.treeOp != IgniteTree.OperationType.NOOP;
+ return closure.treeOp != IgniteTree.OperationType.NOOP;
}
/**
@@ -4515,9 +4481,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheDataRow row = cctx.offheap().read(this);
- if (cctx.offheap().isTombstone(row))
- return;
-
if (row != null && (filter == null || filter.apply(row)))
clo.apply(row);
}
@@ -5742,101 +5705,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/**
- * @param row Data row.
- * @return {@code True} if row expired.
- * @throws IgniteCheckedException If failed.
- */
- private boolean checkRowExpired(CacheDataRow row) throws IgniteCheckedException {
- assert row != null;
-
- if (!(row.expireTime() > 0 && row.expireTime() <= U.currentTimeMillis()))
- return false;
-
- CacheObject expiredVal = row.value();
-
- if (cctx.deferredDelete() && !detached() && !isInternal()) {
- update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver, true);
-
- if (!deletedUnlocked())
- deletedUnlocked(true);
- }
- else
- markObsolete0(cctx.versions().next(), true, null);
-
- if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) {
- cctx.events().addEvent(partition(),
- key(),
- cctx.localNodeId(),
- null,
- EVT_CACHE_OBJECT_EXPIRED,
- null,
- false,
- expiredVal,
- expiredVal != null,
- null,
- null,
- null,
- true);
- }
-
- cctx.continuousQueries().onEntryExpired(this, key(), expiredVal);
-
- return true;
- }
-
- /**
- *
- */
- private static class RemoveClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure {
- /** */
- private final GridCacheMapEntry entry;
-
- /** */
- private final GridCacheVersion ver;
-
- /** */
- private IgniteTree.OperationType op;
-
- /** */
- private CacheDataRow oldRow;
-
- public RemoveClosure(GridCacheMapEntry entry, GridCacheVersion ver) {
- this.entry = entry;
- this.ver = ver;
- }
-
- /** {@inheritDoc} */
- @Override public @Nullable CacheDataRow oldRow() {
- return oldRow;
- }
-
- /** {@inheritDoc} */
- @Override public void call(@Nullable CacheDataRow row) throws IgniteCheckedException {
- if (row == null || !ver.equals(row.version())) {
- op = IgniteTree.OperationType.NOOP;
-
- return;
- }
-
- row.key(entry.key);
-
- oldRow = row;
-
- op = IgniteTree.OperationType.REMOVE;
- }
-
- /** {@inheritDoc} */
- @Override public CacheDataRow newRow() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteTree.OperationType operationType() {
- return op;
- }
- }
-
- /**
*
*/
private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure {
@@ -5853,7 +5721,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
private final long expireTime;
/** */
- @Nullable private final IgniteBiPredicate<CacheObject, GridCacheVersion> p;
+ @Nullable private final IgnitePredicate<CacheDataRow> predicate;
/** */
private CacheDataRow newRow;
@@ -5869,48 +5737,32 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param val New value.
* @param ver New version.
* @param expireTime New expire time.
- * @param p Optional predicate.
- * @param newRow New row value.
+ * @param predicate Optional predicate.
*/
- private UpdateClosure(
- GridCacheMapEntry entry,
- @Nullable CacheObject val,
- GridCacheVersion ver,
- long expireTime,
- @Nullable IgniteBiPredicate<CacheObject, GridCacheVersion> p,
- @Nullable CacheDataRow newRow
- ) {
+ UpdateClosure(GridCacheMapEntry entry, @Nullable CacheObject val, GridCacheVersion ver, long expireTime,
+ @Nullable IgnitePredicate<CacheDataRow> predicate, @Nullable CacheDataRow newRow) {
this.entry = entry;
this.val = val;
this.ver = ver;
this.expireTime = expireTime;
- this.p = p;
+ this.predicate = predicate;
this.newRow = newRow;
}
/** {@inheritDoc} */
@Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
- if (oldRow != null)
+ if (oldRow != null) {
oldRow.key(entry.key);
- this.oldRow = oldRow;
-
- if (p != null) {
- CacheObject val = null;
- GridCacheVersion ver = entry.ver;
-
- if (oldRow != null) {
- if (!entry.checkRowExpired(oldRow) && !entry.context().offheap().isTombstone(oldRow))
- val = oldRow.value();
+ oldRow = checkRowExpired(oldRow);
+ }
- ver = oldRow.version();
- }
+ this.oldRow = oldRow;
- if (!p.apply(val, ver)) {
- treeOp = IgniteTree.OperationType.NOOP;
+ if (predicate != null && !predicate.apply(oldRow)) {
+ treeOp = IgniteTree.OperationType.NOOP;
- return;
- }
+ return;
}
if (val != null) {
@@ -5921,8 +5773,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
val,
ver,
expireTime,
- oldRow
- );
+ oldRow);
}
treeOp = oldRow != null && oldRow.link() == newRow.link() ?
@@ -5946,6 +5797,53 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable @Override public CacheDataRow oldRow() {
return oldRow;
}
+
+ /**
+ * Checks row for expiration and fire expire events if needed.
+ *
+ * @param row old row.
+ * @return {@code Null} if row was expired, row itself otherwise.
+ * @throws IgniteCheckedException
+ */
+ private CacheDataRow checkRowExpired(CacheDataRow row) throws IgniteCheckedException {
+ assert row != null;
+
+ if (!(row.expireTime() > 0 && row.expireTime() <= U.currentTimeMillis()))
+ return row;
+
+ GridCacheContext cctx = entry.context();
+
+ CacheObject expiredVal = row.value();
+
+ if (cctx.deferredDelete() && !entry.detached() && !entry.isInternal()) {
+ entry.update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, entry.ver, true);
+
+ if (!entry.deletedUnlocked() && !entry.isStartVersion())
+ entry.deletedUnlocked(true);
+ }
+ else
+ entry.markObsolete0(cctx.versions().next(), true, null);
+
+ if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) {
+ cctx.events().addEvent(entry.partition(),
+ entry.key(),
+ cctx.localNodeId(),
+ null,
+ EVT_CACHE_OBJECT_EXPIRED,
+ null,
+ false,
+ expiredVal,
+ expiredVal != null,
+ null,
+ null,
+ null,
+ true);
+ }
+
+ cctx.continuousQueries().onEntryExpired(entry, entry.key(), expiredVal);
+
+ return null;
+ }
}
/**
@@ -6120,7 +6018,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// unswap
entry.update(oldRow.value(), oldRow.expireTime(), 0, oldRow.version(), false);
- if (entry.checkRowExpired(oldRow)) {
+ if (checkRowExpired(oldRow)) {
oldRowExpiredFlag = true;
oldRow = null;
@@ -6272,6 +6170,53 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/**
+ * Check row expiration and fire expire events if needed.
+ *
+ * @param row Old row.
+ * @return {@code True} if row was expired, {@code False} otherwise.
+ * @throws IgniteCheckedException if failed.
+ */
+ private boolean checkRowExpired(CacheDataRow row) throws IgniteCheckedException {
+ assert row != null;
+
+ if (!(row.expireTime() > 0 && row.expireTime() <= U.currentTimeMillis()))
+ return false;
+
+ GridCacheContext cctx = entry.context();
+
+ CacheObject expiredVal = row.value();
+
+ if (cctx.deferredDelete() && !entry.detached() && !entry.isInternal()) {
+ entry.update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, entry.ver, true);
+
+ if (!entry.deletedUnlocked())
+ entry.deletedUnlocked(true);
+ }
+ else
+ entry.markObsolete0(cctx.versions().next(), true, null);
+
+ if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) {
+ cctx.events().addEvent(entry.partition(),
+ entry.key(),
+ cctx.localNodeId(),
+ null,
+ EVT_CACHE_OBJECT_EXPIRED,
+ null,
+ false,
+ expiredVal,
+ expiredVal != null,
+ null,
+ null,
+ null,
+ true);
+ }
+
+ cctx.continuousQueries().onEntryExpired(entry, entry.key(), expiredVal);
+
+ return true;
+ }
+
+ /**
* @param storeLoadedVal Value loaded from store.
* @param updateExpireTime {@code True} if need update expire time.
* @throws IgniteCheckedException If failed.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 283aaea..e73ad52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -30,7 +30,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.persistence.DataRowCacheAware;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
@@ -163,6 +162,11 @@ public interface IgniteCacheOffheapManager {
public void destroyCacheDataStore(CacheDataStore store) throws IgniteCheckedException;
/**
+ * TODO: GG-10884, used on only from initialValue.
+ */
+ public boolean containsKey(GridCacheMapEntry entry);
+
+ /**
* @param cctx Cache context.
* @param c Closure.
* @param amount Limit of processed entries by single call, {@code -1} for no limit.
@@ -220,7 +224,7 @@ public interface IgniteCacheOffheapManager {
* @return Iterator over all versions.
* @throws IgniteCheckedException If failed.
*/
- GridCursor<CacheDataRow> mvccAllVersionsCursor(GridCacheContext cctx, KeyCacheObject key, CacheDataRowAdapter.RowData x)
+ GridCursor<CacheDataRow> mvccAllVersionsCursor(GridCacheContext cctx, KeyCacheObject key, Object x)
throws IgniteCheckedException;
/**
@@ -401,27 +405,6 @@ public interface IgniteCacheOffheapManager {
) throws IgniteCheckedException;
/**
- * @param cctx Cache context.
- * @param key Key.
- * @param ver Version.
- * @param part Partition.
- * @throws IgniteCheckedException If failed.
- */
- public void removeWithTombstone(
- GridCacheContext cctx,
- KeyCacheObject key,
- GridCacheVersion ver,
- GridDhtLocalPartition part
- ) throws IgniteCheckedException;
-
- /**
- * @param row Data row.
- * @return {@code True} if give row is tombstone.
- * @throws IgniteCheckedException If failed.
- */
- public boolean isTombstone(@Nullable CacheDataRow row) throws IgniteCheckedException;
-
- /**
* @param ldr Class loader.
* @return Number of undeployed entries.
*/
@@ -458,20 +441,10 @@ public interface IgniteCacheOffheapManager {
/**
* @param part Partition number.
- * @param withTombstones {@code True} if should return tombstone entries.
* @return Iterator for given partition.
* @throws IgniteCheckedException If failed.
*/
- public GridIterator<CacheDataRow> partitionIterator(final int part, boolean withTombstones) throws IgniteCheckedException;
-
- /**
- * @param part Partition number.
- * @return Iterator for given partition that skips tombstones.
- * @throws IgniteCheckedException If failed.
- */
- public default GridIterator<CacheDataRow> partitionIterator(final int part) throws IgniteCheckedException {
- return partitionIterator(part, false);
- }
+ public GridIterator<CacheDataRow> partitionIterator(final int part) throws IgniteCheckedException;
/**
* @param part Partition number.
@@ -759,7 +732,7 @@ public interface IgniteCacheOffheapManager {
*
* @param cctx Cache context.
* @param row Row.
- * @throws IgniteCheckedException If failed.
+ * @throws IgniteCheckedException
*/
public void updateTxState(GridCacheContext cctx, CacheSearchRow row)
throws IgniteCheckedException;
@@ -932,7 +905,7 @@ public interface IgniteCacheOffheapManager {
* @param ver Version.
* @param expireTime Expire time.
* @param mvccVer Mvcc version.
- * @throws IgniteCheckedException If failed.
+ * @throws IgniteCheckedException
*/
void mvccApplyUpdate(GridCacheContext cctx,
KeyCacheObject key,
@@ -953,20 +926,6 @@ public interface IgniteCacheOffheapManager {
/**
* @param cctx Cache context.
* @param key Key.
- * @param ver Version.
- * @param part Partition.
- * @throws IgniteCheckedException If failed.
- */
- public void removeWithTombstone(
- GridCacheContext cctx,
- KeyCacheObject key,
- GridCacheVersion ver,
- GridDhtLocalPartition part
- ) throws IgniteCheckedException;
-
- /**
- * @param cctx Cache context.
- * @param key Key.
* @return Data row.
* @throws IgniteCheckedException If failed.
*/
@@ -981,7 +940,7 @@ public interface IgniteCacheOffheapManager {
* @return Iterator over all versions.
* @throws IgniteCheckedException If failed.
*/
- GridCursor<CacheDataRow> mvccAllVersionsCursor(GridCacheContext cctx, KeyCacheObject key, CacheDataRowAdapter.RowData x)
+ GridCursor<CacheDataRow> mvccAllVersionsCursor(GridCacheContext cctx, KeyCacheObject key, Object x)
throws IgniteCheckedException;
/**
@@ -1008,23 +967,14 @@ public interface IgniteCacheOffheapManager {
* @return Data cursor.
* @throws IgniteCheckedException If failed.
*/
- public default GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException {
- return cursor(false);
- }
-
- /**
- * @param withTombstones {@code True} if should return tombstone entries.
- * @return Data cursor.
- * @throws IgniteCheckedException If failed.
- */
- public GridCursor<? extends CacheDataRow> cursor(boolean withTombstones) throws IgniteCheckedException;
+ public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException;
/**
* @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
* @return Data cursor.
* @throws IgniteCheckedException If failed.
*/
- public GridCursor<? extends CacheDataRow> cursor(CacheDataRowAdapter.RowData x) throws IgniteCheckedException;
+ public GridCursor<? extends CacheDataRow> cursor(Object x) throws IgniteCheckedException;
/**
* @param mvccSnapshot MVCC snapshot.
@@ -1035,11 +985,10 @@ public interface IgniteCacheOffheapManager {
/**
* @param cacheId Cache ID.
- * @param withTombstones {@code True} if should return tombstone entries.
* @return Data cursor.
* @throws IgniteCheckedException If failed.
*/
- public GridCursor<? extends CacheDataRow> cursor(int cacheId, boolean withTombstones) throws IgniteCheckedException;
+ public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException;
/**
* @param cacheId Cache ID.
@@ -1069,7 +1018,7 @@ public interface IgniteCacheOffheapManager {
* @throws IgniteCheckedException If failed.
*/
public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
- KeyCacheObject upper, CacheDataRowAdapter.RowData x) throws IgniteCheckedException;
+ KeyCacheObject upper, Object x) throws IgniteCheckedException;
/**
* @param cacheId Cache ID.
@@ -1077,16 +1026,11 @@ public interface IgniteCacheOffheapManager {
* @param upper Upper bound.
* @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
* @param snapshot Mvcc snapshot.
- * @param withTombstones {@code True} if should return tombstone entries.
* @return Data cursor.
* @throws IgniteCheckedException If failed.
*/
- public GridCursor<? extends CacheDataRow> cursor(int cacheId,
- KeyCacheObject lower,
- KeyCacheObject upper,
- CacheDataRowAdapter.RowData x,
- MvccSnapshot snapshot,
- boolean withTombstones) throws IgniteCheckedException;
+ public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
+ KeyCacheObject upper, Object x, MvccSnapshot snapshot) throws IgniteCheckedException;
/**
* Destroys the tree associated with the store.
@@ -1150,10 +1094,5 @@ public interface IgniteCacheOffheapManager {
* Partition storage.
*/
public PartitionMetaStorage<SimpleDataRow> partStorage();
-
- /**
- * @return Number of tombstone entries.
- */
- public long tombstonesCount();
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 18e9647..0df7728 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -142,7 +142,6 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.state;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.unexpectedStateException;
import static org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager.EMPTY_CURSOR;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO.MVCC_INFO_SIZE;
-import static org.apache.ignite.internal.util.IgniteTree.OperationType.IN_PLACE;
import static org.apache.ignite.internal.util.IgniteTree.OperationType.NOOP;
import static org.apache.ignite.internal.util.IgniteTree.OperationType.PUT;
@@ -437,8 +436,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
GridCacheContext cctx,
KeyCacheObject key,
GridDhtLocalPartition part,
- OffheapInvokeClosure c
- ) throws IgniteCheckedException {
+ OffheapInvokeClosure c)
+ throws IgniteCheckedException {
dataStore(part).invoke(cctx, key, c);
}
@@ -617,28 +616,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public void removeWithTombstone(
- GridCacheContext cctx,
- KeyCacheObject key,
- GridCacheVersion ver,
- GridDhtLocalPartition part
- ) throws IgniteCheckedException {
- assert part != null;
- assert !cctx.isNear();
- assert !cctx.isLocal();
-
- dataStore(part).removeWithTombstone(cctx, key, ver, part);
- }
-
- /** {@inheritDoc} */
- @Override public boolean isTombstone(CacheDataRow row) throws IgniteCheckedException {
- if (!grp.supportsTombstone())
- return false;
-
- return grp.shared().database().isTombstone(row);
- }
-
- /** {@inheritDoc} */
@Override @Nullable public CacheDataRow read(GridCacheMapEntry entry)
throws IgniteCheckedException {
KeyCacheObject key = entry.key();
@@ -685,7 +662,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/** {@inheritDoc} */
@Override public GridCursor<CacheDataRow> mvccAllVersionsCursor(GridCacheContext cctx,
- KeyCacheObject key, CacheDataRowAdapter.RowData x) throws IgniteCheckedException {
+ KeyCacheObject key, Object x) throws IgniteCheckedException {
CacheDataStore dataStore = dataStore(cctx, key);
return dataStore != null ? dataStore.mvccAllVersionsCursor(cctx, key, x) : EMPTY_CURSOR;
@@ -706,6 +683,18 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
+ @Override public boolean containsKey(GridCacheMapEntry entry) {
+ try {
+ return read(entry) != null;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to read value", e);
+
+ return false;
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void onPartitionCounterUpdated(int part, long cntr) {
// No-op.
}
@@ -729,8 +718,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
GridCacheVersion obsoleteVer = null;
try (GridCloseableIterator<CacheDataRow> it = grp.isLocal() ?
- iterator(cctx.cacheId(), cacheDataStores().iterator(), null, null, true) :
- evictionSafeIterator(cctx.cacheId(), cacheDataStores().iterator(), true)) {
+ iterator(cctx.cacheId(), cacheDataStores().iterator(), null, null) :
+ evictionSafeIterator(cctx.cacheId(), cacheDataStores().iterator())) {
while (it.hasNext()) {
cctx.shared().database().checkpointReadLock();
@@ -873,7 +862,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
@Nullable MvccSnapshot mvccSnapshot,
Boolean dataPageScanEnabled
) {
- return iterator(cacheId, cacheData(primary, backups, topVer), mvccSnapshot, dataPageScanEnabled, false);
+ return iterator(cacheId, cacheData(primary, backups, topVer), mvccSnapshot, dataPageScanEnabled);
}
/** {@inheritDoc} */
@@ -884,17 +873,17 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (data == null)
return new GridEmptyCloseableIterator<>();
- return iterator(cacheId, singletonIterator(data), mvccSnapshot, dataPageScanEnabled, false);
+ return iterator(cacheId, singletonIterator(data), mvccSnapshot, dataPageScanEnabled);
}
/** {@inheritDoc} */
- @Override public GridIterator<CacheDataRow> partitionIterator(int part, boolean withTombstones) {
+ @Override public GridIterator<CacheDataRow> partitionIterator(int part) {
CacheDataStore data = partitionData(part);
if (data == null)
return new GridEmptyCloseableIterator<>();
- return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null, null, withTombstones);
+ return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null, null);
}
/**
@@ -903,14 +892,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
* @param dataIt Data store iterator.
* @param mvccSnapshot Mvcc snapshot.
* @param dataPageScanEnabled Flag to enable data page scan.
- * @param withTombstones {@code True} if should return tombstone entries.
* @return Rows iterator
*/
private GridCloseableIterator<CacheDataRow> iterator(final int cacheId,
final Iterator<CacheDataStore> dataIt,
final MvccSnapshot mvccSnapshot,
- Boolean dataPageScanEnabled,
- boolean withTombstones
+ Boolean dataPageScanEnabled
) {
return new GridCloseableIteratorAdapter<CacheDataRow>() {
/** */
@@ -947,7 +934,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
try {
if (mvccSnapshot == null)
- cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor(withTombstones) : ds.cursor(cacheId, withTombstones);
+ cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId);
else {
cur = cacheId == CU.UNDEFINED_CACHE_ID ?
ds.cursor(mvccSnapshot) : ds.cursor(cacheId, mvccSnapshot);
@@ -979,14 +966,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/**
* @param cacheId Cache ID.
* @param dataIt Data store iterator.
- * @param withTombstones {@code True} if should return tombstone entries.
* @return Rows iterator
*/
- private GridCloseableIterator<CacheDataRow> evictionSafeIterator(
- final int cacheId,
- final Iterator<CacheDataStore> dataIt,
- boolean withTombstones
- ) {
+ private GridCloseableIterator<CacheDataRow> evictionSafeIterator(final int cacheId, final Iterator<CacheDataStore> dataIt) {
return new GridCloseableIteratorAdapter<CacheDataRow>() {
/** */
private GridCursor<? extends CacheDataRow> cur;
@@ -1017,7 +999,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (!reservePartition(ds.partId()))
continue;
- cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor(withTombstones) : ds.cursor(cacheId, withTombstones);
+ cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId);
}
else
break;
@@ -1476,9 +1458,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/** */
private final PageHandler<MvccDataRow, Boolean> mvccApplyChanges = new MvccApplyChangesHandler();
- /** Tombstones counter. */
- private final AtomicLong tombstonesCnt = new AtomicLong();
-
/**
* @param partId Partition number.
* @param rowStore Row store.
@@ -1730,23 +1709,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
case REMOVE: {
CacheDataRow oldRow = c.oldRow();
- finishRemove(cctx, row.key(), oldRow, null);
+ finishRemove(cctx, row.key(), oldRow);
break;
}
- case IN_PLACE:
- assert !isTombstone(c.newRow());
-
- if (isTombstone(c.oldRow())) {
- tombstoneRemoved();
-
- incrementSize(cctx.cacheId());
- }
-
- break;
-
case NOOP:
+ case IN_PLACE:
break;
default:
@@ -1764,10 +1733,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
- // Set real stored cacheId to properly calculate row size.
- if (oldRow != null)
- oldRow.cacheId(cacheId);
-
DataRow dataRow = makeDataRow(key, val, ver, expireTime, cacheId);
if (canUpdateOldRow(cctx, oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow, grp.statisticsHolderData()))
@@ -1783,13 +1748,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
assert dataRow.link() != 0 : dataRow;
- if (grp.sharedGroup()) {
- if (dataRow.cacheId() == CU.UNDEFINED_CACHE_ID)
- dataRow.cacheId(cctx.cacheId());
-
- if (oldRow != null && oldRow.cacheId() == CU.UNDEFINED_CACHE_ID)
- oldRow.cacheId(cctx.cacheId());
- }
+ if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+ dataRow.cacheId(cctx.cacheId());
return dataRow;
}
@@ -2647,12 +2607,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
*/
private void finishUpdate(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow oldRow)
throws IgniteCheckedException {
- assert !isTombstone(newRow);
-
- boolean oldTombstone = isTombstone(oldRow);
- boolean oldNull = oldRow == null || oldTombstone;
-
- if (oldNull)
+ if (oldRow == null)
incrementSize(cctx.cacheId());
KeyCacheObject key = newRow.key();
@@ -2660,9 +2615,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
GridCacheQueryManager qryMgr = cctx.queries();
if (qryMgr.enabled())
- qryMgr.store(newRow, oldNull ? null : oldRow, true);
+ qryMgr.store(newRow, oldRow, true);
- updatePendingEntries(cctx, newRow, oldNull ? null : oldRow);
+ updatePendingEntries(cctx, newRow, oldRow);
if (oldRow != null) {
assert oldRow.link() != 0 : oldRow;
@@ -2671,10 +2626,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
rowStore.removeRow(oldRow.link(), grp.statisticsHolderData());
}
- updateIgfsMetrics(cctx, key, (oldNull ? null : oldRow.value()), newRow.value());
-
- if (oldTombstone)
- tombstoneRemoved();
+ updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), newRow.value());
}
/**
@@ -2717,97 +2669,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
CacheDataRow oldRow = dataTree.remove(new SearchRow(cacheId, key));
- finishRemove(cctx, key, oldRow, null);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
-
- /**
- *
- */
- private class RemoveWithTombstone implements IgniteCacheOffheapManager.OffheapInvokeClosure {
- /** */
- private final GridCacheContext cctx;
-
- /** */
- private final KeyCacheObject key;
-
- /** */
- private final GridCacheVersion ver;
-
- /** */
- private CacheDataRow oldRow;
-
- /** */
- private CacheDataRow newRow;
-
- /**
- * @param cctx Context.
- * @param key Key.
- * @param ver Version.
- */
- RemoveWithTombstone(GridCacheContext cctx, KeyCacheObject key, GridCacheVersion ver) {
- this.cctx = cctx;
- this.key = key;
- this.ver = ver;
- }
-
- /** {@inheritDoc} */
- @Override public CacheDataRow oldRow() {
- return oldRow;
- }
-
- /** {@inheritDoc} */
- @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
- if (oldRow != null)
- oldRow.key(key);
-
- this.oldRow = oldRow;
-
- newRow = createRow(cctx, key, TombstoneCacheObject.INSTANCE, ver, 0, oldRow);
- }
-
- /** {@inheritDoc} */
- @Override public CacheDataRow newRow() {
- return newRow;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteTree.OperationType operationType() {
- if (oldRow != null && oldRow.link() == newRow.link())
- return IgniteTree.OperationType.IN_PLACE;
-
- return PUT;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void removeWithTombstone(
- GridCacheContext cctx,
- KeyCacheObject key,
- GridCacheVersion ver,
- GridDhtLocalPartition part
- ) throws IgniteCheckedException {
- if (!busyLock.enterBusy())
- throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
-
- try {
- assert cctx.shared().database().checkpointLockIsHeldByThread();
-
- int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
-
- RemoveWithTombstone c = new RemoveWithTombstone(cctx, key, ver);
-
- dataTree.invoke(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY, c);
-
- assert c.operationType() == PUT || c.operationType() == IN_PLACE : c.operationType();
-
- if (!isTombstone(c.oldRow))
- tombstoneCreated();
-
- finishRemove(cctx, key, c.oldRow, c.newRow);
+ finishRemove(cctx, key, oldRow);
}
finally {
busyLock.leaveBusy();
@@ -2818,19 +2680,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
* @param cctx Cache context.
* @param key Key.
* @param oldRow Removed row.
- * @param tombstoneRow Tombstone row (if tombstone was created for remove).
* @throws IgniteCheckedException If failed.
*/
- private void finishRemove(
- GridCacheContext cctx,
- KeyCacheObject key,
- @Nullable CacheDataRow oldRow,
- @Nullable CacheDataRow tombstoneRow
- ) throws IgniteCheckedException {
- boolean oldTombstone = isTombstone(oldRow);
- boolean oldNull = oldRow == null || oldTombstone;
-
- if (!oldNull) {
+ private void finishRemove(GridCacheContext cctx, KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+ if (oldRow != null) {
clearPendingEntries(cctx, oldRow);
decrementSize(cctx.cacheId());
@@ -2839,15 +2692,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
GridCacheQueryManager qryMgr = cctx.queries();
if (qryMgr.enabled())
- qryMgr.remove(key, oldNull ? null : oldRow);
+ qryMgr.remove(key, oldRow);
- if (oldRow != null && (tombstoneRow == null || tombstoneRow.link() != oldRow.link()))
+ if (oldRow != null)
rowStore.removeRow(oldRow.link(), grp.statisticsHolderData());
- updateIgfsMetrics(cctx, key, (oldNull ? null : oldRow.value()), null);
-
- if (oldTombstone && tombstoneRow == null)
- tombstoneRemoved();
+ updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), null);
}
/**
@@ -2867,15 +2717,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
pendingTree().removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
}
- /**
- * @param row Data row.
- * @return {@code Null} if given row is tombstone, otherwise row itself.
- * @throws IgniteCheckedException If null.
- */
- @Nullable private CacheDataRow checkTombstone(@Nullable CacheDataRow row) throws IgniteCheckedException {
- return grp.offheap().isTombstone(row) ? null : row;
- }
-
/** {@inheritDoc} */
@Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
key.valueBytes(cctx.cacheObjectContext());
@@ -2895,12 +2736,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
row = clo.row();
}
- else {
+ else
row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY);
- row = checkTombstone(row);
- }
-
afterRowFound(row, key);
return row;
@@ -2948,7 +2786,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public GridCursor<CacheDataRow> mvccAllVersionsCursor(GridCacheContext cctx, KeyCacheObject key, CacheDataRowAdapter.RowData x)
+ @Override public GridCursor<CacheDataRow> mvccAllVersionsCursor(GridCacheContext cctx, KeyCacheObject key, Object x)
throws IgniteCheckedException {
int cacheId = cctx.cacheId();
@@ -2997,91 +2835,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public GridCursor<? extends CacheDataRow> cursor(boolean withTombstones) throws IgniteCheckedException {
- GridCursor<? extends CacheDataRow> cur = dataTree.find(null, null);
-
- return withTombstones ? cur : cursorSkipTombstone(cur);
- }
-
- /**
- * @param cur Cursor.
- * @return Cursor skipping non-tombstone entries.
- */
- private GridCursor<? extends CacheDataRow> cursorSkipEmpty(final GridCursor<? extends CacheDataRow> cur) {
- if (!grp.supportsTombstone())
- return cur;
-
- return new GridCursor<CacheDataRow>() {
- /** */
- CacheDataRow next;
-
- /** {@inheritDoc} */
- @Override public boolean next() throws IgniteCheckedException {
- while (cur.next()) {
- CacheDataRow next = cur.get();
-
- // If request cursor with RowData.TOMBSTONES, then for non-tombtones all fields are null.
- if (next.version() != null) {
- this.next = next;
-
- return true;
- }
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public CacheDataRow get() {
- return next;
- }
- };
- }
-
- /**
- * @param cur Cursor.
- * @return Cursor skipping tombstone entries.
- */
- private GridCursor<? extends CacheDataRow> cursorSkipTombstone(final GridCursor<? extends CacheDataRow> cur) {
- if (!grp.supportsTombstone())
- return cur;
-
- return new GridCursor<CacheDataRow>() {
- /** */
- CacheDataRow next;
-
- /** {@inheritDoc} */
- @Override public boolean next() throws IgniteCheckedException {
- while (cur.next()) {
- CacheDataRow next = cur.get();
-
- if (!isTombstone(next)) {
- this.next = next;
-
- return true;
- }
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public CacheDataRow get() {
- return next;
- }
- };
+ @Override public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException {
+ return dataTree.find(null, null);
}
/** {@inheritDoc} */
- @Override public GridCursor<? extends CacheDataRow> cursor(CacheDataRowAdapter.RowData x) throws IgniteCheckedException {
- GridCursor<? extends CacheDataRow> cur = dataTree.find(null, null, x);
-
- return x == CacheDataRowAdapter.RowData.TOMBSTONES ? cursorSkipEmpty(cur) : cursorSkipTombstone(cur);
+ @Override public GridCursor<? extends CacheDataRow> cursor(Object x) throws IgniteCheckedException {
+ return dataTree.find(null, null, x);
}
/** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor(MvccSnapshot mvccSnapshot)
throws IgniteCheckedException {
+
GridCursor<? extends CacheDataRow> cursor;
if (mvccSnapshot != null) {
assert grp.mvccEnabled();
@@ -3090,20 +2856,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
new MvccFirstVisibleRowTreeClosure(grp.singleCacheContext(), mvccSnapshot), null);
}
else
- cursor = cursorSkipTombstone(dataTree.find(null, null));
+ cursor = dataTree.find(null, null);
return cursor;
}
/** {@inheritDoc} */
- @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, boolean withTombstones) throws IgniteCheckedException {
- return cursor(cacheId, null, null, null, null, withTombstones);
+ @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException {
+ return cursor(cacheId, null, null);
}
/** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor(int cacheId,
MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
- return cursor(cacheId, null, null, null, mvccSnapshot, false);
+ return cursor(cacheId, null, null, null, mvccSnapshot);
}
/** {@inheritDoc} */
@@ -3114,18 +2880,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
- KeyCacheObject upper, CacheDataRowAdapter.RowData x) throws IgniteCheckedException {
- return cursor(cacheId, lower, upper, null, null, false);
+ KeyCacheObject upper, Object x) throws IgniteCheckedException {
+ return cursor(cacheId, lower, upper, null, null);
}
/** {@inheritDoc} */
- @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId,
- KeyCacheObject lower,
- KeyCacheObject upper,
- CacheDataRowAdapter.RowData x,
- MvccSnapshot snapshot,
- boolean withTombstones
- ) throws IgniteCheckedException {
+ @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
+ KeyCacheObject upper, Object x, MvccSnapshot snapshot) throws IgniteCheckedException {
SearchRow lowerRow;
SearchRow upperRow;
@@ -3149,13 +2910,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
cursor = dataTree.find(lowerRow, upperRow, new MvccFirstVisibleRowTreeClosure(cctx, snapshot), x);
}
- else {
+ else
cursor = dataTree.find(lowerRow, upperRow, x);
- if (!withTombstones)
- cursor = cursorSkipTombstone(cursor);
- }
-
return cursor;
}
@@ -3195,7 +2952,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
Exception ex = null;
GridCursor<? extends CacheDataRow> cur =
- cursor(cacheId, null, null, CacheDataRowAdapter.RowData.KEY_ONLY, null, true);
+ cursor(cacheId, null, null, CacheDataRowAdapter.RowData.KEY_ONLY);
while (cur.next()) {
CacheDataRow row = cur.get();
@@ -3239,24 +2996,17 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
* @param size Size to init.
* @param updCntr Update counter.
* @param cacheSizes Cache sizes if store belongs to group containing multiple caches.
- * @param updCntrGapsData Update counters gaps raw data.
- * @param tombstonesCnt Tombstones count.
+ * @param cntrUpdData Counter updates.
*/
- public void restoreState(
- long size,
- long updCntr,
- Map<Integer, Long> cacheSizes,
- byte[] updCntrGapsData,
- long tombstonesCnt
- ) {
- pCntr.init(updCntr, updCntrGapsData);
+ public void restoreState(long size, long updCntr, @Nullable Map<Integer, Long> cacheSizes, byte[] cntrUpdData) {
+ pCntr.init(updCntr, cntrUpdData);
storageSize.set(size);
- for (Map.Entry<Integer, Long> e : cacheSizes.entrySet())
- this.cacheSizes.put(e.getKey(), new AtomicLong(e.getValue()));
-
- this.tombstonesCnt.set(tombstonesCnt);
+ if (cacheSizes != null) {
+ for (Map.Entry<Integer, Long> e : cacheSizes.entrySet())
+ this.cacheSizes.put(e.getKey(), new AtomicLong(e.getValue()));
+ }
}
/** {@inheritDoc} */
@@ -3279,25 +3029,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
return null;
}
- /** {@inheritDoc} */
- @Override public long tombstonesCount() {
- return tombstonesCnt.get();
- }
-
- /**
- * Called when tombstone has removed from partition.
- */
- private void tombstoneRemoved() {
- tombstonesCnt.decrementAndGet();
- }
-
- /**
- * Called when tombstone has created in partition.
- */
- private void tombstoneCreated() {
- tombstonesCnt.incrementAndGet();
- }
-
/**
* @param cctx Cache context.
* @param key Key.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteCacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteCacheObject.java
index 3802c2b..dedb3bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteCacheObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteCacheObject.java
@@ -43,8 +43,6 @@ public class IncompleteCacheObject extends IncompleteObject<CacheObject> {
if (buf.remaining() >= HEAD_LEN) {
data = new byte[buf.getInt()];
type = buf.get();
-
- headerReady();
}
// We cannot fully read head to initialize data buffer.
// Start partial read of header.
@@ -70,8 +68,6 @@ public class IncompleteCacheObject extends IncompleteObject<CacheObject> {
data = new byte[headBuf.getInt()];
type = headBuf.get();
-
- headerReady();
}
}
@@ -80,21 +76,6 @@ public class IncompleteCacheObject extends IncompleteObject<CacheObject> {
}
/**
- * Invoke when object header is ready.
- */
- private void headerReady() {
- if (type == CacheObject.TOMBSTONE)
- object(TombstoneCacheObject.INSTANCE);
- }
-
- /**
- * @return Size of already read data.
- */
- public int dataOffset() {
- return off;
- }
-
- /**
* @return Data type.
*/
public byte type() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java
index 27c9def..7c24c12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java
@@ -33,7 +33,7 @@ public class IncompleteObject<T> {
private T obj;
/** */
- protected int off;
+ private int off;
/**
* @param data Data bytes.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
index 1d41d7f..112a110 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
@@ -35,9 +35,9 @@ public interface PartitionUpdateCounter extends Iterable<long[]> {
* Restores update counter state.
*
* @param initUpdCntr LWM.
- * @param updCntrGapsData Updates counters gaps raw data.
+ * @param cntrUpdData Counter updates raw data.
*/
- public void init(long initUpdCntr, @Nullable byte[] updCntrGapsData);
+ public void init(long initUpdCntr, @Nullable byte[] cntrUpdData);
/**
* @deprecated TODO LWM should be used as initial counter https://ggsystems.atlassian.net/browse/GG-17396
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TombstoneCacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TombstoneCacheObject.java
deleted file mode 100644
index d87b024..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TombstoneCacheObject.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Special value object indicating that value is removed.
- */
-public class TombstoneCacheObject extends CacheObjectAdapter {
- /** */
- private static final long serialVersionUID = 2106775575127797257L;
-
- /** Empty. */
- private static final byte[] EMPTY = new byte[] { };
-
- /** Instance. */
- public static final TombstoneCacheObject INSTANCE = new TombstoneCacheObject();
-
- /**
- * Default constructor.
- */
- public TombstoneCacheObject() {
- valBytes = EMPTY;
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- valBytes = EMPTY;
- }
-
- /** {@inheritDoc} */
- @Override public <T> @Nullable T value(CacheObjectValueContext ctx, boolean cpy) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public byte[] valueBytes(CacheObjectValueContext ctx) throws IgniteCheckedException {
- return valBytes;
- }
-
- /** {@inheritDoc} */
- @Override public byte cacheObjectType() {
- return CacheObject.TOMBSTONE;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isPlatformType() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException {
-
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(CacheObjectValueContext ctx) throws IgniteCheckedException {
-
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 176;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
-
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 8c8a3ef..ac2d237 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.binary;
+import javax.cache.CacheException;
import java.io.File;
import java.io.Serializable;
import java.math.BigDecimal;
@@ -31,7 +32,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import javax.cache.CacheException;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
@@ -83,7 +83,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
-import org.apache.ignite.internal.processors.cache.TombstoneCacheObject;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectByteArrayImpl;
@@ -1146,9 +1145,6 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
case CacheObject.TYPE_REGULAR:
return new CacheObjectImpl(null, bytes);
-
- case CacheObject.TOMBSTONE:
- return TombstoneCacheObject.INSTANCE;
}
throw new IllegalArgumentException("Invalid object type: " + type);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index 4f7986f..0eed54a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -43,7 +43,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -54,7 +53,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -63,9 +61,7 @@ import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.collection.IntMap;
import org.apache.ignite.internal.util.collection.IntRWHashMap;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.GridIterator;
-import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -178,10 +174,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @param recovery Flag indicates that partition is created during recovery phase.
*/
public GridDhtLocalPartition(
- GridCacheSharedContext ctx,
- CacheGroupContext grp,
- int id,
- boolean recovery
+ GridCacheSharedContext ctx,
+ CacheGroupContext grp,
+ int id,
+ boolean recovery
) {
super(ENTRY_FACTORY);
@@ -606,12 +602,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
assert partState == MOVING || partState == LOST;
- if (casState(state, OWNING)) {
- if (hasTombstones())
- clearTombstonesAsync();
-
+ if (casState(state, OWNING))
return true;
- }
}
}
@@ -765,21 +757,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
/**
- * @return {@code True} if partition has tombstone entries.
- */
- boolean hasTombstones() {
- return grp.supportsTombstone() && dataStore().tombstonesCount() > 0;
- }
-
- /**
- * Adds async task that will clear tombstone entries from partition.
- * @see #clearTombstones(EvictionContext).
- */
- void clearTombstonesAsync() {
- grp.shared().evict().clearTombstonesAsync(grp, this);
- }
-
- /**
* Continues delayed clearing of partition if possible.
* Clearing may be delayed because of existing reservations.
*/
@@ -944,7 +921,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @return {@code false} if clearing is not started due to existing reservations.
* @throws NodeStoppingException If node is stopping.
*/
- public boolean tryClear(EvictionContext evictionCtx) throws NodeStoppingException, IgniteCheckedException {
+ public boolean tryClear(EvictionContext evictionCtx) throws NodeStoppingException {
if (clearFuture.isDone())
return true;
@@ -955,73 +932,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
if (addEvicting()) {
try {
- GridCacheVersion clearVer = ctx.versions().next();
-
- GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer);
-
- boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
-
- if (grp.sharedGroup())
- cacheMaps.forEach((key, hld) -> clearOnheapEntries(hld.map, extras, rec));
- else
- clearOnheapEntries(singleCacheEntryMap.map, extras, rec);
-
// Attempt to evict partition entries from cache.
- long clearedEntities = doClear(
- evictionCtx,
- 1000,
- grp.offheap().partitionIterator(id, true),
- (hld, row) -> {
- // Do not clear fresh rows in case of partition reloading.
- // This is required because normal updates are possible to moving partition which is currently cleared.
- if (row.version().compareTo(clearVer) >= 0 && state() == MOVING)
- return false;
-
- GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(
- hld,
- hld.cctx,
- grp.affinity().lastVersion(),
- row.key(),
- true,
- false);
-
- if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
- removeEntry(cached);
-
- if (rec && !hld.cctx.config().isEventsDisabled()) {
- hld.cctx.events().addEvent(cached.partition(),
- cached.key(),
- ctx.localNodeId(),
- null,
- null,
- null,
- EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
- null,
- false,
- cached.rawGet(),
- cached.hasValue(),
- null,
- null,
- null,
- false);
- }
-
- return true;
- }
-
- return false;
- }
- );
-
- if (forceTestCheckpointOnEviction) {
- if (partWhereTestCheckpointEnforced == null && clearedEntities >= fullSize()) {
- ctx.database().forceCheckpoint("test").finishFuture().get();
-
- log.warning("Forced checkpoint by test reasons for partition: " + this);
-
- partWhereTestCheckpointEnforced = id;
- }
- }
+ long clearedEntities = clearAll(evictionCtx);
if (log.isDebugEnabled())
log.debug("Partition has been cleared [grp=" + grp.cacheOrGroupName()
@@ -1208,120 +1120,113 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
/**
- * Iterates over partition entries and removes tombstone entries.
+ * Removes all entries and rows from this partition.
*
- * @param evictionCtx Eviction context.
+ * @return Number of rows cleared from page memory.
+ * @throws NodeStoppingException If node stopping.
*/
- void clearTombstones(EvictionContext evictionCtx) throws IgniteCheckedException {
- if (evictionCtx.shouldStop())
- return;
+ private long clearAll(EvictionContext evictionCtx) throws NodeStoppingException {
+ GridCacheVersion clearVer = ctx.versions().next();
- GridIterator<CacheDataRow> iter;
+ GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer);
- try {
- GridCursor<? extends CacheDataRow> cur = store.cursor(CacheDataRowAdapter.RowData.TOMBSTONES);
+ boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
- iter = new GridIteratorAdapter<CacheDataRow>() {
- @Override public boolean hasNextX() throws IgniteCheckedException {
- return cur.next();
- }
+ if (grp.sharedGroup())
+ cacheMaps.forEach((key, hld) -> clear(hld.map, extras, rec));
+ else
+ clear(singleCacheEntryMap.map, extras, rec);
- @Override public CacheDataRow nextX() throws IgniteCheckedException {
- return cur.get();
- }
+ long cleared = 0;
- @Override public void removeX() throws IgniteCheckedException {
- throw new UnsupportedOperationException();
- }
- };
- }
- catch (IgniteCheckedException e) {
- throw new IgniteCheckedException("Failed to get iterator for partition: " + id, e);
- }
+ final int stopCheckingFreq = 1000;
- doClear(
- evictionCtx,
- 10,
- iter,
- (hld, row) -> {
- while (true) {
- GridCacheMapEntry cached = null;
+ CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
- try {
- cached = putEntryIfObsoleteOrAbsent(
- hld,
- hld.cctx,
- grp.affinity().lastVersion(),
- row.key(),
- true,
- false);
+ try {
+ GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id);
- cached.removeTombstone(row.version());
+ while (it0.hasNext()) {
+ ctx.database().checkpointReadLock();
- return true;
- }
- catch (GridCacheEntryRemovedException e) {
- cached = null;
- }
- finally {
- if (cached != null)
- cached.touch();
- }
- }
- }
- );
- }
+ try {
+ CacheDataRow row = it0.next();
- /**
- * Runs abstract clear operation over partition data rows.
- *
- * @param evictionCtx Eviction context.
- * @param stopCheckingFreq Frequency to check stopping eviction/clearing.
- * @param rowIter Rows iterator.
- * @param clearOp Clear operation.
- * @return Number of cleared rows.
- * @throws IgniteCheckedException If failed.
- */
- private long doClear(
- EvictionContext evictionCtx,
- int stopCheckingFreq,
- GridIterator<CacheDataRow> rowIter,
- ClearRowOperation clearOp
- ) throws IgniteCheckedException {
- long cleared = 0;
+ // Do not clear fresh rows in case of partition reloading.
+ // This is required because normal updates are possible to moving partition which is currently cleared.
+ if (row.version().compareTo(clearVer) >= 0 && state() == MOVING)
+ continue;
- CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
+ if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId()))
+ hld = cacheMapHolder(ctx.cacheContext(row.cacheId()));
- while (rowIter.hasNext()) {
- ctx.database().checkpointReadLock();
+ assert hld != null;
- try {
- CacheDataRow row = rowIter.next();
+ GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(
+ hld,
+ hld.cctx,
+ grp.affinity().lastVersion(),
+ row.key(),
+ true,
+ false);
- assert row.key() != null : row;
- assert row.version() != null : row;
+ if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
+ removeEntry(cached);
- if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId()))
- hld = cacheMapHolder(ctx.cacheContext(row.cacheId()));
+ if (rec && !hld.cctx.config().isEventsDisabled()) {
+ hld.cctx.events().addEvent(cached.partition(),
+ cached.key(),
+ ctx.localNodeId(),
+ null,
+ null,
+ null,
+ EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
+ null,
+ false,
+ cached.rawGet(),
+ cached.hasValue(),
+ null,
+ null,
+ null,
+ false);
+ }
- assert hld != null;
+ cleared++;
+ }
- if (clearOp.apply(hld, row))
- cleared++;
+ // For each 'stopCheckingFreq' cleared entities check clearing process to stop.
+ if (cleared % stopCheckingFreq == 0 && evictionCtx.shouldStop())
+ return cleared;
+ }
+ catch (GridDhtInvalidPartitionException e) {
+ assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']';
- // For each 'stopCheckingFreq' cleared entities check clearing process to stop.
- if (cleared % stopCheckingFreq == 0 && evictionCtx.shouldStop())
- return cleared;
+ break; // Partition is already concurrently cleared and evicted.
+ }
+ finally {
+ ctx.database().checkpointReadUnlock();
+ }
}
- catch (GridDhtInvalidPartitionException e) {
- assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']';
- break; // Partition is already concurrently cleared and evicted.
- }
- finally {
- ctx.database().checkpointReadUnlock();
+ if (forceTestCheckpointOnEviction) {
+ if (partWhereTestCheckpointEnforced == null && cleared >= fullSize()) {
+ ctx.database().forceCheckpoint("test").finishFuture().get();
+
+ log.warning("Forced checkpoint by test reasons for partition: " + this);
+
+ partWhereTestCheckpointEnforced = id;
+ }
}
}
+ catch (NodeStoppingException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to get iterator for evicted partition: " + id);
+
+ throw e;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to get iterator for evicted partition: " + id, e);
+ }
return cleared;
}
@@ -1334,11 +1239,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @param evt Unload event flag.
* @throws NodeStoppingException If current node is stopping.
*/
- private void clearOnheapEntries(
- ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map,
+ private void clear(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map,
GridCacheObsoleteEntryExtras extras,
- boolean evt
- ) throws NodeStoppingException {
+ boolean evt) throws NodeStoppingException {
Iterator<GridCacheMapEntry> it = map.values().iterator();
while (it.hasNext()) {
@@ -1639,18 +1542,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
/**
- * Abstract operation to clear row.
- */
- @FunctionalInterface
- private static interface ClearRowOperation {
- /**
- * @param hld Hld.
- * @param row Row.
- */
- boolean apply(CacheMapHolder hld, CacheDataRow row) throws IgniteCheckedException;
- }
-
- /**
* Future is needed to control partition clearing process.
* Future can be used both for single clearing or eviction processes.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 1f90108..2078eab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -716,10 +716,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
updateLocal(p, state, updateSeq, topVer);
- if (state == RENTING) // Restart cleaning.
+ // Restart cleaning.
+ if (state == RENTING)
locPart.clearAsync();
- else if (state == OWNING && locPart.hasTombstones())
- locPart.clearTombstonesAsync(); // Restart tombstones cleaning.
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
index 2d3b813..31399bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
-import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -108,74 +107,35 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
}
/**
- * @param grp Group context.
- * @param part Partition to clear tombstones.
- */
- public void clearTombstonesAsync(CacheGroupContext grp, GridDhtLocalPartition part) {
- if (addAsyncTask(grp, part, TaskType.CLEAR_TOMBSTONES)) {
- if (log.isDebugEnabled())
- log.debug("Partition has been scheduled for tomstones cleanup [grp=" + grp.cacheOrGroupName()
- + ", p=" + part.id() + ", state=" + part.state() + "]");
- }
- }
-
- /**
* Adds partition to eviction queue and starts eviction process if permit available.
*
* @param grp Group context.
* @param part Partition to evict.
*/
public void evictPartitionAsync(CacheGroupContext grp, GridDhtLocalPartition part) {
- if (addAsyncTask(grp, part, TaskType.EVICT)) {
- if (log.isDebugEnabled())
- log.debug("Partition has been scheduled for eviction [grp=" + grp.cacheOrGroupName()
- + ", p=" + part.id() + ", state=" + part.state() + "]");
- }
- }
-
- /**
- * @param grp Group context.
- * @param part Partition.
- * @param type Task type.
- * @return {@code True} if task was added.
- */
- private boolean addAsyncTask(CacheGroupContext grp, GridDhtLocalPartition part, TaskType type) {
GroupEvictionContext grpEvictionCtx = evictionGroupsMap.computeIfAbsent(
grp.groupId(), (k) -> new GroupEvictionContext(grp));
// Check node stop.
if (grpEvictionCtx.shouldStop())
- return false;
+ return;
int bucket;
- AbstractEvictionTask task;
-
- switch (type) {
- case EVICT:
- task = new PartitionEvictionTask(part, grpEvictionCtx);
- break;
-
- case CLEAR_TOMBSTONES:
- task = new ClearTombstonesTask(part, grpEvictionCtx);
- break;
-
- default:
- throw new UnsupportedOperationException("Unsupported task type: " + type);
- }
-
synchronized (mux) {
- if (!grpEvictionCtx.taskIds.add(task.id))
- return false;
+ if (!grpEvictionCtx.partIds.add(part.id()))
+ return;
- bucket = evictionQueue.offer(task);
+ bucket = evictionQueue.offer(new PartitionEvictionTask(part, grpEvictionCtx));
}
- grpEvictionCtx.taskAdded(task);
+ grpEvictionCtx.totalTasks.incrementAndGet();
- scheduleNextTask(bucket);
+ if (log.isDebugEnabled())
+ log.debug("Partition has been scheduled for eviction [grp=" + grp.cacheOrGroupName()
+ + ", p=" + part.id() + ", state=" + part.state() + "]");
- return true;
+ scheduleNextPartitionEviction(bucket);
}
/**
@@ -183,7 +143,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
*
* @param bucket Bucket.
*/
- private void scheduleNextTask(int bucket) {
+ private void scheduleNextPartitionEviction(int bucket) {
// Check node stop.
if (sharedEvictionCtx.shouldStop())
return;
@@ -198,7 +158,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
// Get task until we have permits.
while (permits >= 0) {
// Get task from bucket.
- AbstractEvictionTask evictionTask = evictionQueue.poll(bucket);
+ PartitionEvictionTask evictionTask = evictionQueue.poll(bucket);
// If bucket empty try get from another.
if (evictionTask == null) {
@@ -238,12 +198,12 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
permits++;
}
- // Re-schedule new one task for same bucket.
- scheduleNextTask(bucket);
+ // Re-schedule new one task form same bucket.
+ scheduleNextPartitionEviction(bucket);
});
// Submit task to executor.
- cctx.kernalContext()
+ cctx.kernalContext()
.closure()
.runLocalSafe(evictionTask, EVICT_POOL_PLC);
}
@@ -259,10 +219,10 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
int size = evictionQueue.size() + 1; // Queue size plus current partition.
if (log.isInfoEnabled())
- log.info("Partition cleanup in progress [permits=" + permits+
+ log.info("Eviction in progress [permits=" + permits+
", threads=" + threads +
", groups=" + evictionGroupsMap.keySet().size() +
- ", remainingTasks=" + size + "]");
+ ", remainingPartsToEvict=" + size + "]");
evictionGroupsMap.values().forEach(GroupEvictionContext::showProgress);
@@ -308,66 +268,30 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
/**
*
*/
- private static class TasksStatistics {
- /** */
- private int total;
-
- /** */
- private int inProgress;
-
- /**
- *
- */
- void taskAdded() {
- total++;
- }
-
- /**
- *
- */
- void taskStarted() {
- inProgress++;
- }
-
- /**
- *
- */
- void taskFinished() {
- total--;
- inProgress--;
- }
- }
-
- /**
- *
- */
private class GroupEvictionContext implements EvictionContext {
/** */
private final CacheGroupContext grp;
- /** Deduplicate set partition tasks. */
- private final Set<TaskId> taskIds = new HashSet<>();
+ /** Deduplicate set partition ids. */
+ private final Set<Integer> partIds = new HashSet<>();
/** Future for currently running partition eviction task. */
- private final Map<TaskId, IgniteInternalFuture<?>> taskFutures = new ConcurrentHashMap<>();
+ private final Map<Integer, IgniteInternalFuture<?>> partsEvictFutures = new ConcurrentHashMap<>();
/** Flag indicates that eviction process has stopped for this group. */
private volatile boolean stop;
- /** Total tasks. */
+ /** Total partition to evict. */
private AtomicInteger totalTasks = new AtomicInteger();
- /** */
- private Map<TaskType, TasksStatistics> stats = U.newHashMap(TaskType.VALS.length);
+ /** Total partition evict in progress. */
+ private int taskInProgress;
/**
* @param grp Group context.
*/
private GroupEvictionContext(CacheGroupContext grp) {
this.grp = grp;
-
- for (TaskType type : TaskType.VALS)
- stats.put(type, new TasksStatistics());
}
/** {@inheritDoc} */
@@ -376,37 +300,28 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
}
/**
- * @param task Task.
- */
- void taskAdded(AbstractEvictionTask task) {
- totalTasks.incrementAndGet();
-
- synchronized (this) {
- stats.get(task.id.type).taskAdded();
- }
- }
-
- /**
*
* @param task Partition eviction task.
*/
- private synchronized void taskScheduled(AbstractEvictionTask task) {
+ private synchronized void taskScheduled(PartitionEvictionTask task) {
if (shouldStop())
return;
- stats.get(task.id.type).taskStarted();
+ taskInProgress++;
GridFutureAdapter<?> fut = task.finishFut;
- taskIds.remove(task.id);
+ int partId = task.part.id();
- taskFutures.put(task.id, fut);
+ partIds.remove(partId);
+
+ partsEvictFutures.put(partId, fut);
fut.listen(f -> {
synchronized (this) {
- stats.get(task.id.type).taskFinished();
+ taskInProgress--;
- taskFutures.remove(task.id, f);
+ partsEvictFutures.remove(partId, f);
if (totalTasks.decrementAndGet() == 0)
evictionGroupsMap.remove(grp.groupId());
@@ -425,7 +340,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
* Await evict finish.
*/
private void awaitFinishAll(){
- taskFutures.forEach(this::awaitFinish);
+ partsEvictFutures.forEach(this::awaitFinish);
evictionGroupsMap.remove(grp.groupId());
}
@@ -433,17 +348,17 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
/**
* Await evict finish partition.
*/
- private void awaitFinish(TaskId taskId, IgniteInternalFuture<?> fut) {
+ private void awaitFinish(Integer part, IgniteInternalFuture<?> fut) {
// Wait for last offered partition eviction completion
try {
- log.info("Await partition cleanup [grpName=" + grp.cacheOrGroupName() +
- ", grpId=" + grp.groupId() + ", task=" + taskId.type + ", partId=" + taskId.part + ']');
+ log.info("Await partition evict, grpName=" + grp.cacheOrGroupName() +
+ ", grpId=" + grp.groupId() + ", partId=" + part);
fut.get();
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
- log.warning("Failed to await partition cleanup during stopping.", e);
+ log.warning("Failed to await partition eviction during stopping.", e);
}
}
@@ -451,132 +366,47 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
* Shows progress group of eviction.
*/
private void showProgress() {
- if (log.isInfoEnabled()) {
- StringBuilder msg = new StringBuilder(
- "Group cleanup in progress [grpName=" + grp.cacheOrGroupName() + ", grpId=" + grp.groupId());
-
- synchronized (this) {
- TasksStatistics evicts = stats.get(TaskType.EVICT);
- if (evicts.total > 0) {
- msg.append(", remainingPartsToEvict=" + (evicts.total - evicts.inProgress)).
- append(", partsEvictInProgress=" + evicts.inProgress);
- }
-
- TasksStatistics tombstones = stats.get(TaskType.CLEAR_TOMBSTONES);
- if (tombstones.total > 0) {
- msg.append(", remainingPartsToClearTombstones=" + (tombstones.total - tombstones.inProgress)).
- append(", tombstoneClearInProgress=" + tombstones.inProgress);
- }
- }
-
- msg.append(", totalParts=" + grp.topology().localPartitions().size() + "]");
-
- log.info(msg.toString());
- }
- }
- }
-
- /**
- *
- */
- private enum TaskType {
- /** */
- EVICT,
-
- /** */
- CLEAR_TOMBSTONES;
-
- /** */
- private static TaskType[] VALS = values();
- }
-
- /**
- *
- */
- private static class TaskId {
- /** */
- final int part;
-
- /** */
- final TaskType type;
-
- /**
- * @param part Partiotion id.
- * @param type Task type.
- */
- TaskId(int part, TaskType type) {
- this.part = part;
- this.type = type;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- TaskId taskKey = (TaskId)o;
-
- return part == taskKey.part && type == taskKey.type;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return Objects.hash(part, type);
+ if (log.isInfoEnabled())
+ log.info("Group eviction in progress [grpName=" + grp.cacheOrGroupName()+
+ ", grpId=" + grp.groupId() +
+ ", remainingPartsToEvict=" + (totalTasks.get() - taskInProgress) +
+ ", partsEvictInProgress=" + taskInProgress +
+ ", totalParts=" + grp.topology().localPartitions().size() + "]");
}
}
/**
- *
+ * Task for self-scheduled partition eviction / clearing.
*/
- abstract class AbstractEvictionTask implements Runnable {
+ class PartitionEvictionTask implements Runnable {
/** Partition to evict. */
- protected final GridDhtLocalPartition part;
+ private final GridDhtLocalPartition part;
/** */
- protected final long size;
+ private final long size;
/** Eviction context. */
- protected final GroupEvictionContext grpEvictionCtx;
+ private final GroupEvictionContext grpEvictionCtx;
/** */
- protected final GridFutureAdapter<?> finishFut = new GridFutureAdapter<>();
-
- /** */
- private final TaskId id;
+ private final GridFutureAdapter<?> finishFut = new GridFutureAdapter<>();
/**
* @param part Partition.
* @param grpEvictionCtx Eviction context.
*/
- private AbstractEvictionTask(
- GridDhtLocalPartition part,
- GroupEvictionContext grpEvictionCtx,
- TaskType type
+ private PartitionEvictionTask(
+ GridDhtLocalPartition part,
+ GroupEvictionContext grpEvictionCtx
) {
this.part = part;
this.grpEvictionCtx = grpEvictionCtx;
- id = new TaskId(part.id(), type);
-
size = part.fullSize();
}
- /**
- * @return {@code False} if need retry task later.
- * @throws IgniteCheckedException If failed.
- */
- abstract boolean run0() throws IgniteCheckedException;
-
- /**
- *
- */
- abstract void scheduleRetry();
-
/** {@inheritDoc} */
- @Override public final void run() {
+ @Override public void run() {
if (grpEvictionCtx.shouldStop()) {
finishFut.onDone();
@@ -584,7 +414,12 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
}
try {
- boolean success = run0();
+ boolean success = part.tryClear(grpEvictionCtx);
+
+ if (success) {
+ if (part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy())
+ part.destroy();
+ }
// Complete eviction future before schedule new to prevent deadlock with
// simultaneous eviction stopping and scheduling new eviction.
@@ -592,7 +427,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
// Re-offer partition if clear was unsuccessful due to partition reservation.
if (!success)
- scheduleRetry();
+ evictPartitionAsync(grpEvictionCtx.grp, part);
}
catch (Throwable ex) {
finishFut.onDone(ex);
@@ -603,7 +438,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
true);
}
else {
- LT.error(log, ex, "Partition eviction failed.");
+ LT.error(log, ex, "Partition eviction failed, this can cause grid hang.");
cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, ex));
}
@@ -612,80 +447,15 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
}
/**
- * Task for self-scheduled partition eviction / clearing.
- */
- class PartitionEvictionTask extends AbstractEvictionTask {
- /**
- * @param part Partition.
- * @param grpEvictionCtx Eviction context.
- */
- private PartitionEvictionTask(
- GridDhtLocalPartition part,
- GroupEvictionContext grpEvictionCtx
- ) {
- super(part, grpEvictionCtx, TaskType.EVICT);
- }
-
- /** {@inheritDoc} */
- @Override void scheduleRetry() {
- evictPartitionAsync(grpEvictionCtx.grp, part);
- }
-
- /** {@inheritDoc} */
- @Override public boolean run0() throws IgniteCheckedException {
- assert part.state() != GridDhtPartitionState.OWNING : part;
-
- boolean success = part.tryClear(grpEvictionCtx);
-
- assert part.state() != GridDhtPartitionState.OWNING : part;
-
- if (success) {
- if (part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy())
- part.destroy();
- }
-
- return success;
- }
- }
-
- /**
- *
- */
- class ClearTombstonesTask extends AbstractEvictionTask {
- /**
- * @param part Partition.
- * @param grpEvictionCtx Eviction context.
- */
- private ClearTombstonesTask(
- GridDhtLocalPartition part,
- GroupEvictionContext grpEvictionCtx
- ) {
- super(part, grpEvictionCtx, TaskType.CLEAR_TOMBSTONES);
- }
-
- /** {@inheritDoc} */
- @Override void scheduleRetry() {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public boolean run0() throws IgniteCheckedException {
- part.clearTombstones(grpEvictionCtx);
-
- return true;
- }
- }
-
- /**
*
*/
class BucketQueue {
- /** Queues contains partitions scheduled for eviction. */
- final Queue<AbstractEvictionTask>[] buckets;
-
/** */
private final long[] bucketSizes;
+ /** Queues contains partitions scheduled for eviction. */
+ final Queue<PartitionEvictionTask>[] buckets;
+
/**
* @param buckets Number of buckets.
*/
@@ -704,8 +474,8 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
* @param bucket Bucket index.
* @return Partition evict task, or {@code null} if bucket queue is empty.
*/
- AbstractEvictionTask poll(int bucket) {
- AbstractEvictionTask task = buckets[bucket].poll();
+ PartitionEvictionTask poll(int bucket) {
+ PartitionEvictionTask task = buckets[bucket].poll();
if (task != null)
bucketSizes[bucket] -= task.size;
@@ -718,7 +488,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
*
* @return Partition evict task.
*/
- AbstractEvictionTask pollAny() {
+ PartitionEvictionTask pollAny() {
for (int bucket = 0; bucket < bucketSizes.length; bucket++){
if (!buckets[bucket].isEmpty())
return poll(bucket);
@@ -733,7 +503,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
* @param task Eviction task.
* @return Bucket index.
*/
- int offer(AbstractEvictionTask task) {
+ int offer(PartitionEvictionTask task) {
int bucket = calculateBucket();
buckets[bucket].offer(task);
@@ -757,7 +527,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
int size(){
int size = 0;
- for (Queue<AbstractEvictionTask> queue : buckets)
+ for (Queue<PartitionEvictionTask> queue : buckets)
size += queue.size();
return size;
@@ -787,7 +557,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
*
* @return Queue for evict partitions.
*/
- private Queue<AbstractEvictionTask> createEvictPartitionQueue() {
+ private Queue<PartitionEvictionTask> createEvictPartitionQueue() {
switch (QUEUE_TYPE) {
case 1:
return new PriorityBlockingQueue<>(
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
index 0b7c4ac..746b94a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
@@ -59,11 +59,6 @@ public interface CacheDataRow extends MvccUpdateVersionAware, CacheSearchRow, St
*/
public void key(KeyCacheObject key);
- /**
- * @param cacheId Cache ID.
- */
- public void cacheId(int cacheId);
-
/** {@inheritDoc} */
@Override public default IOVersions<? extends AbstractDataPageIO> ioVersions() {
return DataPageIO.VERSIONS;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index a3b876d..06e7214 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -52,7 +52,6 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CR
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_OP_COUNTER_NA;
import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.KEY_ONLY;
import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.LINK_WITH_HEADER;
-import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.TOMBSTONES;
/**
* Cache data row adapter.
@@ -352,7 +351,9 @@ public class CacheDataRowAdapter implements CacheDataRow {
buf.position(off);
buf.limit(off + payloadSize);
- incomplete = readFragment(sharedCtx, coctx, buf, rowData, readCacheId, incomplete, skipVer);
+ boolean keyOnly = rowData == RowData.KEY_ONLY;
+
+ incomplete = readFragment(sharedCtx, coctx, buf, keyOnly, readCacheId, incomplete, skipVer);
if (incomplete != null)
incomplete.setNextLink(nextLink);
@@ -378,7 +379,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @param sharedCtx Cache shared context.
* @param coctx Cache object context.
* @param buf Buffer.
- * @param rowData Required row data.
+ * @param keyOnly {@code true} If need to read only key object.
* @param readCacheId {@code true} If need to read cache ID.
* @param incomplete Incomplete object.
* @param skipVer Whether version read should be skipped.
@@ -389,13 +390,11 @@ public class CacheDataRowAdapter implements CacheDataRow {
GridCacheSharedContext<?, ?> sharedCtx,
CacheObjectContext coctx,
ByteBuffer buf,
- RowData rowData,
+ boolean keyOnly,
boolean readCacheId,
IncompleteObject<?> incomplete,
boolean skipVer
) throws IgniteCheckedException {
- boolean tombstones = rowData == TOMBSTONES;
-
if (readCacheId && cacheId == 0) {
incomplete = readIncompleteCacheId(buf, incomplete);
@@ -417,12 +416,6 @@ public class CacheDataRowAdapter implements CacheDataRow {
// Read key.
if (key == null) {
- if (tombstones && sharedCtx.database().isTombstone(buf, key, (IncompleteCacheObject)incomplete) == Boolean.FALSE) {
- verReady = true;
-
- return null;
- }
-
incomplete = readIncompleteKey(coctx, buf, (IncompleteCacheObject)incomplete);
if (key == null) {
@@ -430,7 +423,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
return incomplete; // Need to finish reading the key.
}
- if (rowData == RowData.KEY_ONLY)
+ if (keyOnly)
return null; // Key is ready - we are done!
incomplete = null;
@@ -449,13 +442,6 @@ public class CacheDataRowAdapter implements CacheDataRow {
// Read value.
if (val == null) {
- if (tombstones && sharedCtx.database().isTombstone(buf, key, (IncompleteCacheObject)incomplete) == Boolean.FALSE) {
- key = null;
- verReady = true;
-
- return null;
- }
-
incomplete = readIncompleteValue(coctx, buf, (IncompleteCacheObject)incomplete);
if (val == null) {
@@ -466,14 +452,6 @@ public class CacheDataRowAdapter implements CacheDataRow {
incomplete = null;
}
- if (tombstones && !sharedCtx.database().isTombstone(this)) {
- key = null;
- val = null;
- verReady = true;
-
- return null;
- }
-
// Read version.
if (!verReady) {
incomplete = readIncompleteVersion(buf, incomplete, skipVer);
@@ -520,14 +498,6 @@ public class CacheDataRowAdapter implements CacheDataRow {
int len = PageUtils.getInt(addr, off);
off += 4;
- boolean tombstones = rowData == RowData.TOMBSTONES;
-
- if (tombstones && !sharedCtx.database().isTombstone(addr + off + len + 1)) {
- verReady = true; // Mark as ready, no need to read any data.
-
- return;
- }
-
if (rowData != RowData.NO_KEY && rowData != RowData.NO_KEY_WITH_HINTS) {
byte type = PageUtils.getByte(addr, off);
off++;
@@ -549,14 +519,11 @@ public class CacheDataRowAdapter implements CacheDataRow {
byte type = PageUtils.getByte(addr, off);
off++;
- if (!tombstones) {
- byte[] bytes = PageUtils.getBytes(addr, off, len);
-
- val = coctx.kernalContext().cacheObjects().toCacheObject(coctx, type, bytes);
- }
-
+ byte[] bytes = PageUtils.getBytes(addr, off, len);
off += len;
+ val = coctx.kernalContext().cacheObjects().toCacheObject(coctx, type, bytes);
+
int verLen;
if (skipVer) {
@@ -854,11 +821,6 @@ public class CacheDataRowAdapter implements CacheDataRow {
}
/** {@inheritDoc} */
- @Override public void cacheId(int cacheId) {
- this.cacheId = cacheId;
- }
-
- /** {@inheritDoc} */
@Override public CacheObject value() {
assert val != null : "Value is not ready: " + this;
@@ -974,10 +936,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
FULL_WITH_HINTS,
/** Force instant hints actualization for update operation with history (to avoid races with vacuum). */
- NO_KEY_WITH_HINTS,
-
- /** Do not read row data for non-tombstone entries. */
- TOMBSTONES
+ NO_KEY_WITH_HINTS
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index df19bd5..b321321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -52,7 +52,7 @@ import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecordV3;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecordV2;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheDiagnosticManager;
@@ -297,10 +297,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
if (rowStore0 != null) {
((CacheFreeList)rowStore0.freeList()).saveMetadata(grp.statisticsHolderData());
+ PartitionMetaStorage<SimpleDataRow> partStore = store.partStorage();
+
long updCntr = store.updateCounter();
long size = store.fullSize();
long rmvId = globalRemoveId().get();
+ byte[] updCntrsBytes = store.partUpdateCounter().getBytes();
+
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
IgniteWriteAheadLogManager wal = this.ctx.wal();
@@ -325,9 +329,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
return;
}
- assert state != null || grp.isLocal() : "Partition state is undefined " +
- "[grp=" + grp.cacheOrGroupName() + ", part=" + part + "]";
-
int grpId = grp.groupId();
long partMetaId = pageMem.partitionMetaPageId(grpId, store.partId());
@@ -348,196 +349,141 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
try {
PagePartitionMetaIOV2 io = PageIO.getPageIO(partMetaPageAddr);
- changed |= io.setPartitionState(partMetaPageAddr, state != null ? (byte)state.ordinal() : -1);
- changed |= io.setUpdateCounter(partMetaPageAddr, updCntr);
- changed |= io.setGlobalRemoveId(partMetaPageAddr, rmvId);
- changed |= io.setSize(partMetaPageAddr, size);
- changed |= io.setTombstonesCount(partMetaPageAddr, store.tombstonesCount());
- changed |= savePartitionUpdateCounterGaps(store, io, partMetaPageAddr);
- changed |= saveCacheSizes(store, io, partMetaPageAddr);
-
- if (needSnapshot)
- changed |= savePagesCount(ctx, part, store, io, partMetaPageAddr);
-
- if (changed && PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null))
- wal.log(new MetaPageUpdatePartitionDataRecordV3(
- grpId,
- partMetaId,
- updCntr,
- rmvId,
- (int)size, // TODO: Partition size may be long
- io.getCacheSizesPageId(partMetaPageAddr),
- io.getPartitionState(partMetaPageAddr),
- io.getCandidatePageCount(partMetaPageAddr),
- io.getGapsLink(partMetaPageAddr),
- io.getTombstonesCount(partMetaPageAddr)
- ));
- }
- finally {
- pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed);
- }
- }
- finally {
- pageMem.releasePage(grpId, partMetaId, partMetaPage);
- }
- }
- else if (needSnapshot)
- tryAddEmptyPartitionToSnapshot(store, ctx);
- }
- else if (needSnapshot)
- tryAddEmptyPartitionToSnapshot(store, ctx);
- }
+ long link = io.getGapsLink(partMetaPageAddr);
- /**
- * Saves to partition meta page information about partition update counter gaps.
- *
- * @param store Partition data store.
- * @param io I/O for partition meta page.
- * @param partMetaPageAddr Partition meta page address.
- * @return {@code True} if partition meta data is changed.
- * @throws IgniteCheckedException If failed.
- */
- private boolean savePartitionUpdateCounterGaps(
- CacheDataStore store,
- PagePartitionMetaIOV2 io,
- long partMetaPageAddr
- ) throws IgniteCheckedException {
- PartitionMetaStorage<SimpleDataRow> partStore = store.partStorage();
+ if (updCntrsBytes == null && link != 0) {
+ partStore.removeDataRowByLink(link, grp.statisticsHolderData());
- byte[] updCntrsBytes = store.partUpdateCounter().getBytes();
+ io.setGapsLink(partMetaPageAddr, (link = 0));
- long gapsLink = io.getGapsLink(partMetaPageAddr);
+ changed = true;
+ }
+ else if (updCntrsBytes != null && link == 0) {
+ SimpleDataRow row = new SimpleDataRow(store.partId(), updCntrsBytes);
- boolean changed = false;
+ partStore.insertDataRow(row, grp.statisticsHolderData());
- if (updCntrsBytes == null && gapsLink != 0) {
- partStore.removeDataRowByLink(gapsLink, grp.statisticsHolderData());
+ io.setGapsLink(partMetaPageAddr, (link = row.link()));
- io.setGapsLink(partMetaPageAddr, 0);
+ changed = true;
+ }
+ else if (updCntrsBytes != null && link != 0) {
+ byte[] prev = partStore.readRow(link);
- changed = true;
- }
- else if (updCntrsBytes != null && gapsLink == 0) {
- SimpleDataRow row = new SimpleDataRow(store.partId(), updCntrsBytes);
+ assert prev != null : "Read null gaps using link=" + link;
- partStore.insertDataRow(row, grp.statisticsHolderData());
+ if (!Arrays.equals(prev, updCntrsBytes)) {
+ partStore.removeDataRowByLink(link, grp.statisticsHolderData());
- io.setGapsLink(partMetaPageAddr, row.link());
+ SimpleDataRow row = new SimpleDataRow(store.partId(), updCntrsBytes);
- changed = true;
- }
- else if (updCntrsBytes != null && gapsLink != 0) {
- byte[] prev = partStore.readRow(gapsLink);
+ partStore.insertDataRow(row, grp.statisticsHolderData());
- assert prev != null : "Read null gaps using link=" + gapsLink;
+ io.setGapsLink(partMetaPageAddr, (link = row.link()));
- if (!Arrays.equals(prev, updCntrsBytes)) {
- partStore.removeDataRowByLink(gapsLink, grp.statisticsHolderData());
+ changed = true;
+ }
+ }
- SimpleDataRow row = new SimpleDataRow(store.partId(), updCntrsBytes);
+ if (changed)
+ partStore.saveMetadata(grp.statisticsHolderData());
- partStore.insertDataRow(row, grp.statisticsHolderData());
+ changed |= io.setUpdateCounter(partMetaPageAddr, updCntr);
+ changed |= io.setGlobalRemoveId(partMetaPageAddr, rmvId);
+ changed |= io.setSize(partMetaPageAddr, size);
- io.setGapsLink(partMetaPageAddr, row.link());
+ if (state != null)
+ changed |= io.setPartitionState(partMetaPageAddr, (byte)state.ordinal());
+ else
+ assert grp.isLocal() : grp.cacheOrGroupName();
- changed = true;
- }
- }
+ long cntrsPageId;
- if (changed)
- partStore.saveMetadata(grp.statisticsHolderData());
+ if (grp.sharedGroup()) {
+ long initCntrPageId = io.getCountersPageId(partMetaPageAddr);
- return changed;
- }
+ Map<Integer, Long> newSizes = store.cacheSizes();
+ Map<Integer, Long> prevSizes = readSharedGroupCacheSizes(pageMem, grpId, initCntrPageId);
- /**
- * Saves to partition meta page information about logical cache sizes inside cache group.
- *
- * @param store Partition data store.
- * @param io I/O for partition meta page.
- * @param partMetaPageAddr Partition meta page address.
- * @return {@code True} if partition meta data is changed.
- * @throws IgniteCheckedException If failed.
- */
- private boolean saveCacheSizes(
- CacheDataStore store,
- PagePartitionMetaIOV2 io,
- long partMetaPageAddr
- ) throws IgniteCheckedException {
- if (grp.sharedGroup()) {
- PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
-
- long oldCacheSizesPageId = io.getCacheSizesPageId(partMetaPageAddr);
+ if (prevSizes != null && prevSizes.equals(newSizes))
+ cntrsPageId = initCntrPageId; // Preventing modification of sizes pages for store
+ else {
+ cntrsPageId = writeSharedGroupCacheSizes(pageMem, grpId, initCntrPageId,
+ store.partId(), newSizes);
- Map<Integer, Long> newSizes = store.cacheSizes();
- Map<Integer, Long> prevSizes = readSharedGroupCacheSizes(pageMem, grp.groupId(), oldCacheSizesPageId);
+ if (initCntrPageId == 0 && cntrsPageId != 0) {
+ io.setCountersPageId(partMetaPageAddr, cntrsPageId);
- if (prevSizes == null || !prevSizes.equals(newSizes)) {
- long cacheSizesPageId = writeSharedGroupCacheSizes(pageMem, grp.groupId(), oldCacheSizesPageId,
- store.partId(), newSizes);
+ changed = true;
+ }
+ }
+ }
+ else
+ cntrsPageId = 0L;
+
+ int pageCnt;
+
+ if (needSnapshot) {
+ pageCnt = this.ctx.pageStore().pages(grpId, store.partId());
+
+ io.setCandidatePageCount(partMetaPageAddr, size == 0 ? 0 : pageCnt);
+
+ if (state == OWNING) {
+ assert part != null;
+
+ if (!addPartition(
+ part,
+ ctx.partitionStatMap(),
+ partMetaPageAddr,
+ io,
+ grpId,
+ store.partId(),
+ this.ctx.pageStore().pages(grpId, store.partId()),
+ store.fullSize()
+ ))
+ U.warn(log, "Partition was concurrently evicted grpId=" + grpId +
+ ", partitionId=" + part.id());
+ }
+ else if (state == MOVING || state == RENTING) {
+ if (ctx.partitionStatMap().forceSkipIndexPartition(grpId)) {
+ if (log.isInfoEnabled())
+ log.info("Will not include SQL indexes to snapshot because there is " +
+ "a partition not in " + OWNING + " state [grp=" + grp.cacheOrGroupName() +
+ ", partId=" + store.partId() + ", state=" + state + ']');
+ }
+ }
- if (oldCacheSizesPageId == 0 && cacheSizesPageId != 0) {
- io.setSizesPageId(partMetaPageAddr, cacheSizesPageId);
+ changed = true;
+ }
+ else
+ pageCnt = io.getCandidatePageCount(partMetaPageAddr);
- return true;
+ if (changed && PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null))
+ wal.log(new MetaPageUpdatePartitionDataRecordV2(
+ grpId,
+ partMetaId,
+ updCntr,
+ rmvId,
+ (int)size, // TODO: Partition size may be long
+ cntrsPageId,
+ state == null ? -1 : (byte)state.ordinal(),
+ pageCnt,
+ link
+ ));
+ }
+ finally {
+ pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed);
+ }
+ }
+ finally {
+ pageMem.releasePage(grpId, partMetaId, partMetaPage);
}
}
+ else if (needSnapshot)
+ tryAddEmptyPartitionToSnapshot(store, ctx);
}
- else
- io.setSizesPageId(partMetaPageAddr, 0);
-
- return false;
- }
-
- /**
- * Saves to partition meta page information about pages count.
- *
- * @param ctx Checkpoint context.
- * @param part Partition.
- * @param store Partition data store.
- * @param io I/O for partition meta page.
- * @param partMetaPageAddr Partition meta page address.
- * @return {@code True} if partition meta data is changed.
- * @throws IgniteCheckedException If failed.
- */
- private boolean savePagesCount(
- Context ctx,
- GridDhtLocalPartition part,
- CacheDataStore store,
- PagePartitionMetaIOV2 io,
- long partMetaPageAddr
- ) throws IgniteCheckedException {
- int grpId = grp.groupId();
- int pageCnt = this.ctx.pageStore().pages(grpId, store.partId());
-
- io.setCandidatePageCount(partMetaPageAddr, io.getSize(partMetaPageAddr) == 0 ? 0 : pageCnt);
-
- if (part.state() == OWNING) {
- assert part != null;
-
- if (!addPartition(
- part,
- ctx.partitionStatMap(),
- partMetaPageAddr,
- io,
- grpId,
- store.partId(),
- this.ctx.pageStore().pages(grp.groupId(), store.partId()),
- store.fullSize()
- ))
- U.warn(log, "Partition was concurrently evicted grpId=" + grpId +
- ", partitionId=" + part.id());
- }
- else if (part.state() == MOVING || part.state() == RENTING) {
- if (ctx.partitionStatMap().forceSkipIndexPartition(grpId)) {
- if (log.isInfoEnabled())
- log.info("Will not include SQL indexes to snapshot because there is " +
- "a partition not in " + OWNING + " state [grp=" + grp.cacheOrGroupName() +
- ", partId=" + store.partId() + ", state=" + part.state() + ']');
- }
- }
-
- return true;
+ else if (needSnapshot)
+ tryAddEmptyPartitionToSnapshot(store, ctx);
}
/** {@inheritDoc} */
@@ -701,13 +647,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
* return null if counter page does not exist.
* @throws IgniteCheckedException If page memory operation failed.
*/
- private static Map<Integer, Long> readSharedGroupCacheSizes(
- PageSupport pageMem,
- int grpId,
- long cntrsPageId
- ) throws IgniteCheckedException {
+ @Nullable private static Map<Integer, Long> readSharedGroupCacheSizes(PageSupport pageMem, int grpId,
+ long cntrsPageId) throws IgniteCheckedException {
+
if (cntrsPageId == 0L)
- return Collections.emptyMap();
+ return null;
Map<Integer, Long> cacheSizes = new HashMap<>();
@@ -740,7 +684,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
pageMem.releasePage(grpId, curId, curPage);
}
}
-
return cacheSizes;
}
@@ -1547,11 +1490,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public void cacheId(int cacheId) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
@Override public long mvccCoordinatorVersion() {
return 0; // TODO IGNITE-7384
}
@@ -1884,19 +1822,16 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
if (PageIO.getType(pageAddr) != 0) {
PagePartitionMetaIOV2 io = (PagePartitionMetaIOV2)PagePartitionMetaIO.VERSIONS.latest();
- long gapsLink = io.getGapsLink(pageAddr);
+ Map<Integer, Long> cacheSizes = null;
- byte[] updCntrGapsData = gapsLink == 0 ? null : partStorage.readRow(gapsLink);
+ if (grp.sharedGroup())
+ cacheSizes = readSharedGroupCacheSizes(pageMem, grpId, io.getCountersPageId(pageAddr));
- delegate0.restoreState(
- io.getSize(pageAddr),
- io.getUpdateCounter(pageAddr),
- grp.sharedGroup()
- ? readSharedGroupCacheSizes(pageMem, grpId, io.getCacheSizesPageId(pageAddr))
- : Collections.emptyMap(),
- updCntrGapsData,
- io.getTombstonesCount(pageAddr)
- );
+ long link = io.getGapsLink(pageAddr);
+
+ byte[] data = link == 0 ? null : partStorage.readRow(link);
+
+ delegate0.restoreState(io.getSize(pageAddr), io.getUpdateCounter(pageAddr), cacheSizes, data);
globalRemoveId().setIfGreater(io.getGlobalRemoveId(pageAddr));
}
@@ -2496,20 +2431,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public void removeWithTombstone(
- GridCacheContext cctx,
- KeyCacheObject key,
- GridCacheVersion ver,
- GridDhtLocalPartition part
- ) throws IgniteCheckedException {
- assert ctx.database().checkpointLockIsHeldByThread();
-
- CacheDataStore delegate = init0(false);
-
- delegate.removeWithTombstone(cctx, key, ver, part);
- }
-
- /** {@inheritDoc} */
@Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
@@ -2543,7 +2464,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/** {@inheritDoc} */
@Override public GridCursor<CacheDataRow> mvccAllVersionsCursor(GridCacheContext cctx,
- KeyCacheObject key, CacheDataRowAdapter.RowData x) throws IgniteCheckedException {
+ KeyCacheObject key, Object x) throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
@@ -2554,17 +2475,17 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/** {@inheritDoc} */
- @Override public GridCursor<? extends CacheDataRow> cursor(boolean withTombstones) throws IgniteCheckedException {
+ @Override public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
- return delegate.cursor(withTombstones);
+ return delegate.cursor();
return EMPTY_CURSOR;
}
/** {@inheritDoc} */
- @Override public GridCursor<? extends CacheDataRow> cursor(CacheDataRowAdapter.RowData x) throws IgniteCheckedException {
+ @Override public GridCursor<? extends CacheDataRow> cursor(Object x) throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
@@ -2601,7 +2522,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
@Override public GridCursor<? extends CacheDataRow> cursor(int cacheId,
KeyCacheObject lower,
KeyCacheObject upper,
- CacheDataRowAdapter.RowData x)
+ Object x)
throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
@@ -2615,15 +2536,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
@Override public GridCursor<? extends CacheDataRow> cursor(int cacheId,
KeyCacheObject lower,
KeyCacheObject upper,
- CacheDataRowAdapter.RowData x,
- MvccSnapshot mvccSnapshot,
- boolean withTombstones
- )
+ Object x,
+ MvccSnapshot mvccSnapshot)
throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
- return delegate.cursor(cacheId, lower, upper, x, mvccSnapshot, withTombstones);
+ return delegate.cursor(cacheId, lower, upper, x, mvccSnapshot);
return EMPTY_CURSOR;
}
@@ -2634,11 +2553,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, boolean withTombstones) throws IgniteCheckedException {
+ @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
- return delegate.cursor(cacheId, withTombstones);
+ return delegate.cursor(cacheId);
return EMPTY_CURSOR;
}
@@ -2849,25 +2768,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
}
- /** {@inheritDoc} */
@Override public PartitionMetaStorage partStorage() {
return partStorage;
}
-
- /** {@inheritDoc} */
- @Override public long tombstonesCount() {
- try {
- CacheDataStore delegate0 = init0(true);
-
- if (delegate0 == null)
- return 0;
-
- return delegate0.tombstonesCount();
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 48a7f20..a1a7913 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.processors.cache.persistence;
+import javax.management.InstanceNotFoundException;
import java.io.File;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import javax.management.InstanceNotFoundException;
import org.apache.ignite.DataRegionMetrics;
import org.apache.ignite.DataRegionMetricsProvider;
import org.apache.ignite.DataStorageMetrics;
@@ -49,16 +48,12 @@ import org.apache.ignite.internal.mem.IgniteOutOfMemoryException;
import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider;
import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagemem.PageMemory;
-import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
-import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.persistence.evict.FairFifoPageEvictionTracker;
import org.apache.ignite.internal.processors.cache.persistence.evict.NoOpPageEvictionTracker;
@@ -141,6 +136,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
/** First eviction was warned flag. */
private volatile boolean firstEvictWarn;
+
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
if (cctx.kernalContext().clientNode() && cctx.kernalContext().config().getDataStorageConfiguration() == null)
@@ -158,102 +154,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
}
/**
- * @param row Row.
- * @return {@code True} if given row is tombstone.
- * @throws IgniteCheckedException If failed.
- */
- public boolean isTombstone(@Nullable CacheDataRow row) throws IgniteCheckedException {
- if (row == null)
- return false;
-
- CacheObject val = row.value();
-
- assert val != null : row;
-
- return val.cacheObjectType() == CacheObject.TOMBSTONE;
- }
-
- /**
- * @param buf Buffer.
- * @param key Row key.
- * @param incomplete Incomplete object.
- * @return Tombstone flag or {@code null} if there is no enough data.
- */
- public Boolean isTombstone(
- ByteBuffer buf,
- @Nullable KeyCacheObject key,
- @Nullable IncompleteCacheObject incomplete
- ) {
- if (key == null) {
- if (incomplete == null) { // Did not start read key yet.
- if (buf.remaining() < IncompleteCacheObject.HEAD_LEN)
- return null;
-
- int keySize = buf.getInt(buf.position());
-
- int headOffset = (IncompleteCacheObject.HEAD_LEN + keySize) /* key */ +
- 8 /* expire time */;
-
- int requiredSize = headOffset + IncompleteCacheObject.HEAD_LEN; // Value header.
-
- if (buf.remaining() < requiredSize)
- return null;
-
- return isTombstone(buf, headOffset);
- }
- else { // Reading key, check if there is enogh data to check value header.
- byte[] data = incomplete.data();
-
- if (data == null) // Header is not available yet.
- return null;
-
- int keyRemaining = data.length - incomplete.dataOffset();
-
- assert keyRemaining > 0 : keyRemaining;
-
- int headOffset = keyRemaining + 8 /* expire time */;
-
- int requiredSize = headOffset + IncompleteCacheObject.HEAD_LEN; // Value header.
-
- if (buf.remaining() < requiredSize)
- return null;
-
- return isTombstone(buf, headOffset);
- }
- }
-
- if (incomplete == null) { // Did not start read value yet.
- if (buf.remaining() < IncompleteCacheObject.HEAD_LEN)
- return null;
-
- return isTombstone(buf, 0);
- }
-
- return incomplete.type() == CacheObject.TOMBSTONE;
- }
-
- /**
- * @param buf Buffer.
- * @param offset Value offset.
- * @return Tombstone flag or {@code null} if there is no enough data.
- */
- private Boolean isTombstone(ByteBuffer buf, int offset) {
- byte valType = buf.get(buf.position() + offset + 4);
-
- return valType == CacheObject.TOMBSTONE;
- }
-
- /**
- * @param addr Row address.
- * @return {@code True} if stored value is tombstone.
- */
- public boolean isTombstone(long addr) {
- byte type = PageUtils.getByte(addr, 4);
-
- return type == CacheObject.TOMBSTONE;
- }
-
- /**
* @param cfg Ignite configuration.
* @param groupName Name of group.
* @param dataRegionName Metrics MBean name.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
index 931fd92..c86a64a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
@@ -59,7 +59,7 @@ public class PagePartitionMetaIO extends PageMetaIO {
setUpdateCounter(pageAddr, 0);
setGlobalRemoveId(pageAddr, 0);
setPartitionState(pageAddr, (byte)-1);
- setSizesPageId(pageAddr, 0);
+ setCountersPageId(pageAddr, 0);
}
/**
@@ -153,22 +153,22 @@ public class PagePartitionMetaIO extends PageMetaIO {
}
/**
- * Returns page identifier related to page with logical cache sizes in cache group.
+ * Returns partition counters page identifier, page with caches in cache group sizes.
*
* @param pageAddr Partition metadata page address.
* @return Next meta partial page ID or {@code 0} if it does not exist.
*/
- public long getCacheSizesPageId(long pageAddr) {
+ public long getCountersPageId(long pageAddr) {
return PageUtils.getLong(pageAddr, NEXT_PART_META_PAGE_OFF);
}
/**
- * Sets new reference to page with logical cache sizes in cache group.
+ * Sets new reference to partition counters page (logical cache sizes).
*
* @param pageAddr Partition metadata page address.
* @param cntrsPageId New cache sizes page ID.
*/
- public void setSizesPageId(long pageAddr, long cntrsPageId) {
+ public void setCountersPageId(long pageAddr, long cntrsPageId) {
PageUtils.putLong(pageAddr, NEXT_PART_META_PAGE_OFF, cntrsPageId);
}
@@ -228,23 +228,6 @@ public class PagePartitionMetaIO extends PageMetaIO {
"this PagePartitionMetaIO version: ver=" + getVersion());
}
- /**
- * @param pageAddr Page address.
- */
- public long getTombstonesCount(long pageAddr) {
- throw new UnsupportedOperationException("Tombstones count is not supported by " +
- "this PagePartitionMetaIO version: ver=" + getVersion());
- }
-
- /**
- * @param pageAddr Page address.
- * @param tombstonesCount Tombstones count.
- */
- public boolean setTombstonesCount(long pageAddr, long tombstonesCount) {
- throw new UnsupportedOperationException("Tombstones count is not supported by " +
- "this PagePartitionMetaIO version: ver=" + getVersion());
- }
-
/** {@inheritDoc} */
@Override protected void printPage(long pageAddr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException {
super.printPage(pageAddr, pageSize, sb);
@@ -255,7 +238,7 @@ public class PagePartitionMetaIO extends PageMetaIO {
.a(",\n\tupdateCounter=").a(getUpdateCounter(pageAddr))
.a(",\n\tglobalRemoveId=").a(getGlobalRemoveId(pageAddr))
.a(",\n\tpartitionState=").a(state).a("(").a(GridDhtPartitionState.fromOrdinal(state)).a(")")
- .a(",\n\tcacheSizesPageId=").a(getCacheSizesPageId(pageAddr))
+ .a(",\n\tcountersPageId=").a(getCountersPageId(pageAddr))
.a("\n]");
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java
index e915e0b..37b7243 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence.tree.io;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.util.GridStringBuilder;
@@ -36,9 +37,6 @@ public class PagePartitionMetaIOV2 extends PagePartitionMetaIO {
/** */
private static final int GAPS_LINK = PART_META_REUSE_LIST_ROOT_OFF + 8;
- /** */
- private static final int TOMBSTONES_COUNT = GAPS_LINK + 8;
-
/**
* @param ver Version.
*/
@@ -102,22 +100,7 @@ public class PagePartitionMetaIOV2 extends PagePartitionMetaIO {
}
/** {@inheritDoc} */
- @Override public long getTombstonesCount(long pageAddr) {
- return PageUtils.getLong(pageAddr, TOMBSTONES_COUNT);
- }
-
- /** {@inheritDoc} */
- @Override public boolean setTombstonesCount(long pageAddr, long tombstonesCnt) {
- if (getTombstonesCount(pageAddr) == tombstonesCnt)
- return false;
-
- PageUtils.putLong(pageAddr, TOMBSTONES_COUNT, tombstonesCnt);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected void printPage(long pageAddr, int pageSize, GridStringBuilder sb) {
+ @Override protected void printPage(long pageAddr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException {
byte state = getPartitionState(pageAddr);
sb.a("PagePartitionMeta[\n\ttreeRoot=").a(getReuseListRoot(pageAddr));
@@ -132,9 +115,8 @@ public class PagePartitionMetaIOV2 extends PagePartitionMetaIO {
sb.a(",\n\tupdateCounter=").a(getUpdateCounter(pageAddr));
sb.a(",\n\tglobalRemoveId=").a(getGlobalRemoveId(pageAddr));
sb.a(",\n\tpartitionState=").a(state).a("(").a(GridDhtPartitionState.fromOrdinal(state)).a(")");
- sb.a(",\n\tcacheSizesPageId=").a(getCacheSizesPageId(pageAddr));
+ sb.a(",\n\tcountersPageId=").a(getCountersPageId(pageAddr));
sb.a(",\n\tcntrUpdDataPageId=").a(getGapsLink(pageAddr));
- sb.a(",\n\ttombstonesCount=").a(getTombstonesCount(pageAddr));
sb.a("\n]");
}
@@ -151,6 +133,5 @@ public class PagePartitionMetaIOV2 extends PagePartitionMetaIO {
setPendingTreeRoot(pageAddr, 0);
setPartitionMetaStoreReuseListRoot(pageAddr, 0);
setGapsLink(pageAddr, 0);
- setTombstonesCount(pageAddr, 0);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index 08e980f..ec68972 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -71,7 +71,6 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuc
import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId;
import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecordV2;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecordV3;
import org.apache.ignite.internal.pagemem.wal.record.delta.NewRootInitRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PageListMetaResetCountRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListAddPageRecord;
@@ -380,10 +379,6 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
return /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*counters page id*/8 + /*state*/ 1
+ /*allocatedIdxCandidate*/ 4 + /*link*/ 8;
- case PARTITION_META_PAGE_UPDATE_COUNTERS_V3:
- return /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*counters page id*/8 + /*state*/ 1
- + /*allocatedIdxCandidate*/ 4 + /*link*/ 8 + /*tombstones cnt*/ 8;
-
case MEMORY_RECOVERY:
return 8;
@@ -616,11 +611,6 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
- case PARTITION_META_PAGE_UPDATE_COUNTERS_V3:
- res = new MetaPageUpdatePartitionDataRecordV3(in);
-
- break;
-
case MEMORY_RECOVERY:
long ts = in.readLong();
@@ -1212,7 +1202,6 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
case PARTITION_META_PAGE_UPDATE_COUNTERS:
case PARTITION_META_PAGE_UPDATE_COUNTERS_V2:
- case PARTITION_META_PAGE_UPDATE_COUNTERS_V3:
((MetaPageUpdatePartitionDataRecord)rec).toBytes(buf);
break;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
index 5eb50f8..add2abe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
@@ -120,8 +120,10 @@ public class DataRow extends CacheDataRowAdapter {
this.link = link;
}
- /** {@inheritDoc} */
- @Override public void cacheId(int cacheId) {
+ /**
+ * @param cacheId Cache ID.
+ */
+ public void cacheId(int cacheId) {
this.cacheId = cacheId;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java
index d80e33b..fad16b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.metric.impl;
import java.util.Map;
-import org.apache.ignite.internal.processors.cache.CacheGroupMetricsImpl;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.typedef.T2;
@@ -87,14 +86,6 @@ public class MetricUtils {
}
/**
- * @param cacheGrpName Cache group name.
- * @return Cache group metrics registry name.
- */
- public static String cacheGroupMetricsRegistryName(String cacheGrpName) {
- return metricName(CacheGroupMetricsImpl.CACHE_GROUP_METRICS_PREFIX, cacheGrpName);
- }
-
- /**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteSanitySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteSanitySelfTest.java
index 8c2bd1d..69a19f4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteSanitySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteSanitySelfTest.java
@@ -51,10 +51,10 @@ public class CacheDeferredDeleteSanitySelfTest extends GridCommonAbstractTest {
testDeferredDelete(LOCAL, TRANSACTIONAL, false, false);
testDeferredDelete(PARTITIONED, ATOMIC, false, true);
- testDeferredDelete(PARTITIONED, TRANSACTIONAL, false, false);
+ testDeferredDelete(PARTITIONED, TRANSACTIONAL, false, true);
testDeferredDelete(REPLICATED, ATOMIC, false, true);
- testDeferredDelete(REPLICATED, TRANSACTIONAL, false, false);
+ testDeferredDelete(REPLICATED, TRANSACTIONAL, false, true);
// Near
testDeferredDelete(LOCAL, ATOMIC, true, false);
@@ -64,7 +64,7 @@ public class CacheDeferredDeleteSanitySelfTest extends GridCommonAbstractTest {
testDeferredDelete(PARTITIONED, TRANSACTIONAL, true, false);
testDeferredDelete(REPLICATED, ATOMIC, true, true);
- testDeferredDelete(REPLICATED, TRANSACTIONAL, true, false);
+ testDeferredDelete(REPLICATED, TRANSACTIONAL, true, true);
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index e0ad965..3103f8d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -6718,12 +6718,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(DEFAULT_CACHE_NAME).context();
- if (ctx.isNear())
- ctx = ctx.near().dht().context();
+ GridCacheEntryEx entry = ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
- GridCacheEntryEx entry = ctx.cache().peekEx(key);
-
- if (ctx.deferredDelete() && ignite.affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key).contains(((IgniteKernal)ignite).localNode())) {
+ if (ignite.affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key).contains(((IgniteKernal)ignite).localNode())) {
assertNotNull(entry);
assertTrue(entry.deleted());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
index 51a8af5..b653d01 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
@@ -6623,7 +6623,7 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar
GridCacheEntryEx entry = ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
- if (ctx.deferredDelete() && ignite.affinity(cacheName).mapKeyToPrimaryAndBackups(key).contains(((IgniteKernal)ignite).localNode())) {
+ if (ignite.affinity(cacheName).mapKeyToPrimaryAndBackups(key).contains(((IgniteKernal)ignite).localNode())) {
assertNotNull(entry);
assertTrue(entry.deleted());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesLoadTest.java
deleted file mode 100644
index d066153..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesLoadTest.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.spi.metric.LongMetric;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.MvccFeatureChecker;
-import org.apache.ignite.testframework.junits.WithSystemProperty;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.After;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
-
-/**
- *
- */
-@RunWith(Parameterized.class)
-public class CacheRemoveWithTombstonesLoadTest extends GridCommonAbstractTest {
- /** Dummy data. */
- private static final byte[] DUMMY_DATA = {};
-
- /** Test parameters. */
- @Parameterized.Parameters(name = "persistenceEnabled={0}, historicalRebalance={1}")
- public static Collection parameters() {
- List<Object[]> res = new ArrayList<>();
-
- for (boolean persistenceEnabled : new boolean[] {false, true}) {
- for (boolean histRebalance : new boolean[] {false, true}) {
- if (!persistenceEnabled && histRebalance)
- continue;
-
- res.add(new Object[]{persistenceEnabled, histRebalance});
- }
- }
-
- return res;
- }
-
- /** */
- @Parameterized.Parameter(0)
- public boolean persistence;
-
- /** */
- @Parameterized.Parameter(1)
- public boolean histRebalance;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setConsistentId(gridName);
-
- DataStorageConfiguration dsCfg = new DataStorageConfiguration();
-
- if (persistence) {
- dsCfg.setDefaultDataRegionConfiguration(
- new DataRegionConfiguration()
- .setInitialSize(256L * 1024 * 1024)
- .setMaxSize(256L * 1024 * 1024)
- .setPersistenceEnabled(true))
- .setWalMode(WALMode.LOG_ONLY);
- }
-
- dsCfg.setPageSize(1024);
-
- cfg.setDataStorageConfiguration(dsCfg);
-
- // Throttle rebalance.
- cfg.setRebalanceThrottle(100);
-
- return cfg;
- }
-
- /**
- *
- */
- @BeforeClass
- public static void beforeTests() {
- Assume.assumeFalse(MvccFeatureChecker.forcedMvcc());
- }
-
- /**
- *
- */
- @Before
- public void before() throws Exception {
- cleanPersistenceDir();
-
- stopAllGrids();
-
- if (histRebalance)
- System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0");
- }
-
- /**
- *
- */
- @After
- public void after() throws Exception {
- if (histRebalance)
- System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
-
- stopAllGrids();
-
- cleanPersistenceDir();
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- @WithSystemProperty(key = IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED, value = "false")
- public void removeAndRebalance() throws Exception {
- IgniteEx ignite0 = startGrid(0);
-
- IgniteCache<TestKey, TestValue> cache0;
-
- final int ADD_NODES = persistence ? 2 : 3;
- final int KEYS = persistence ? 5_000 : 10_000;
-
- if (persistence) {
- // Preload initial data to all nodes to have start point for WAL rebalance.
- for (int i = 0, idx = 1; i < ADD_NODES; i++, idx++)
- startGrid(idx);
-
- ignite0.cluster().active(true);
-
- awaitPartitionMapExchange();
-
- cache0 = ignite0.getOrCreateCache(cacheConfiguration());
-
- for (int k = 0; k < KEYS; k++)
- cache0.put(new TestKey(k, DUMMY_DATA), new TestValue(DUMMY_DATA));
-
- forceCheckpoint();
-
- for (int i = 0, idx = 1; i < ADD_NODES; i++, idx++) {
- stopGrid(idx);
-
- awaitPartitionMapExchange();
- }
- }
-
- final int pageSize = ignite0.configuration().getDataStorageConfiguration().getPageSize();
-
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- List<TestKey> keys = new ArrayList<>();
-
- Map<TestKey, TestValue> data = new HashMap<>();
-
- for (int i = 0; i < KEYS; i++) {
- TestKey key = new TestKey(i, new byte[rnd.nextInt(pageSize * 3)]);
-
- keys.add(key);
-
- data.put(key, new TestValue(new byte[rnd.nextInt(pageSize * 3)]));
- }
-
- cache0 = ignite0.getOrCreateCache(cacheConfiguration());
-
- cache0.putAll(data);
-
- AtomicInteger nodeIdx = new AtomicInteger();
-
- for (int iter = 0; iter < ADD_NODES; iter++) {
- IgniteInternalFuture<?> nodeStartFut = GridTestUtils.runAsync(() -> {
- int idx = nodeIdx.incrementAndGet();
-
- info("Start node: " + idx);
-
- U.sleep(500);
-
- return startGrid(idx);
- });
-
- long endTime = U.currentTimeMillis() + 5_000;
-
- while (U.currentTimeMillis() < endTime) {
- for (int i = 0; i < 100; i++) {
- TestKey key = keys.get(rnd.nextInt(keys.size()));
-
- if (rnd.nextBoolean()) {
- cache0.remove(key);
-
- data.remove(key);
- }
- else {
- TestValue val = new TestValue(new byte[rnd.nextInt(pageSize * 3)]);
-
- cache0.put(key, val);
- data.put(key, val);
- }
-
- U.sleep(10);
- }
- }
-
- nodeStartFut.get(30_000);
-
- checkData(keys, data);
-
- waitTombstoneCleanup();
-
- checkData(keys, data);
- }
-
- awaitPartitionMapExchange();
-
- for (int iter = 0; iter < ADD_NODES; iter++) {
- IgniteInternalFuture<?> nodeStopFut = GridTestUtils.runAsync(() -> {
- int idx = nodeIdx.getAndDecrement();
-
- info("Stop node: " + idx);
-
- stopGrid(idx);
-
- awaitPartitionMapExchange();
-
- return null;
- });
-
- long endTime = U.currentTimeMillis() + 2_500;
-
- while (U.currentTimeMillis() < endTime) {
- for (int i = 0; i < 100; i++) {
- TestKey key = keys.get(rnd.nextInt(keys.size()));
-
- if (rnd.nextBoolean()) {
- cache0.remove(key);
-
- data.remove(key);
- } else {
- TestValue val = new TestValue(new byte[rnd.nextInt(pageSize * 3)]);
-
- cache0.put(key, val);
- data.put(key, val);
- }
- }
-
- U.sleep(10);
- }
-
- nodeStopFut.get(30_000);
-
- checkData(keys, data);
-
- waitTombstoneCleanup();
-
- checkData(keys, data);
- }
- }
-
- /**
- * @param keys Keys to check.
- * @param data Expected data.
- */
- private void checkData(List<TestKey> keys, Map<TestKey, TestValue> data) {
- for (Ignite node : Ignition.allGrids()) {
- info("Check node: " + node.name());
-
- IgniteCache<TestKey, TestValue> cache = node.cache(DEFAULT_CACHE_NAME);
-
- for (TestKey key : keys) {
- TestValue expVal = data.get(key);
- TestValue val = cache.get(key);
-
- if (expVal == null)
- assertNull(val);
- else {
- assertNotNull(val);
- assertTrue(Arrays.equals(expVal.dummyData, val.dummyData));
- }
- }
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- private void waitTombstoneCleanup() throws Exception {
- for (Ignite node : Ignition.allGrids()) {
- final LongMetric tombstones = ((IgniteEx)node).context().metric().registry(
- MetricUtils.cacheGroupMetricsRegistryName(DEFAULT_CACHE_NAME)).findMetric("Tombstones");
-
- GridTestUtils.waitForCondition(() -> tombstones.value() == 0, 30_000);
-
- assertEquals("Failed to wait for tombstone cleanup: " + node.name(), 0, tombstones.value());
- }
- }
-
- /**
- * @return Cache configuration.
- */
- private CacheConfiguration<TestKey, TestValue> cacheConfiguration() {
- CacheConfiguration<TestKey, TestValue> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
-
- ccfg.setAtomicityMode(TRANSACTIONAL);
- ccfg.setCacheMode(PARTITIONED);
- ccfg.setBackups(2);
- ccfg.setRebalanceMode(SYNC);
- ccfg.setReadFromBackup(true);
- ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- ccfg.setAffinity(new RendezvousAffinityFunction(false, 64));
-
- return ccfg;
- }
-
- /**
- *
- */
- static class TestKey {
- /** */
- private final int id;
-
- /** */
- private final byte[] dummyData;
-
- /**
- * @param id ID.
- * @param dummyData Dummy byte array (to test with various key sizes).
- */
- public TestKey(int id, byte[] dummyData) {
- this.id = id;
- this.dummyData = dummyData;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- TestKey testKey = (TestKey) o;
-
- return id == testKey.id && Arrays.equals(dummyData, testKey.dummyData);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int result = Objects.hash(id);
- result = 31 * result + Arrays.hashCode(dummyData);
- return result;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return "TestKey [id=" + id + "]";
- }
- }
-
- /**
- *
- */
- static class TestValue {
- /** */
- private final byte[] dummyData;
-
- /**
- * @param dummyData Dummy byte array (to test with various value sizes).
- */
- public TestValue(byte[] dummyData) {
- this.dummyData = dummyData;
- }
- }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
deleted file mode 100644
index c8b0a7f..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
-import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
-import org.apache.ignite.spi.metric.LongMetric;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
-
-/**
- *
- */
-@RunWith(Parameterized.class)
-public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest {
- /** Test parameters. */
- @Parameterized.Parameters(name = "persistenceEnabled={0}, historicalRebalance={1}")
- public static Collection parameters() {
- List<Object[]> res = new ArrayList<>();
-
- for (boolean persistenceEnabled : new boolean[] {false, true}) {
- for (boolean histRebalance : new boolean[] {false, true}) {
- if (!persistenceEnabled && histRebalance)
- continue;
-
- res.add(new Object[]{persistenceEnabled, histRebalance});
- }
- }
-
- return res;
- }
-
- /** */
- @Parameterized.Parameter(0)
- public boolean persistence;
-
- /** */
- @Parameterized.Parameter(1)
- public boolean histRebalance;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
-
- cfg.setConsistentId(gridName);
-
- cfg.setCommunicationSpi(commSpi);
-
- if (persistence) {
- DataStorageConfiguration dsCfg = new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(
- new DataRegionConfiguration()
- .setInitialSize(256L * 1024 * 1024)
- .setMaxSize(256L * 1024 * 1024)
- .setPersistenceEnabled(true)
- )
- .setWalMode(WALMode.LOG_ONLY);
-
- cfg.setDataStorageConfiguration(dsCfg);
- }
-
- return cfg;
- }
-
- /**
- *
- */
- @Before
- public void before() throws Exception {
- stopAllGrids();
-
- cleanPersistenceDir();
-
- if (histRebalance)
- System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0");
- }
-
- /**
- *
- */
- @After
- public void after() throws Exception {
- if (histRebalance)
- System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
-
- stopAllGrids();
-
- cleanPersistenceDir();
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testRemoveAndRebalanceRaceTx() throws Exception {
- testRemoveAndRebalanceRace(TRANSACTIONAL, true);
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testRemoveAndRebalanceRaceAtomic() throws Exception {
- testRemoveAndRebalanceRace(ATOMIC, false);
- }
-
- /**
- * @throws Exception If failed.
- * @param expTombstone {@code True} if tombstones should be created.
- */
- private void testRemoveAndRebalanceRace(CacheAtomicityMode atomicityMode, boolean expTombstone) throws Exception {
- IgniteEx ignite0 = startGrid(0);
-
- if (histRebalance)
- startGrid(1);
-
- if (persistence)
- ignite0.cluster().active(true);
-
- IgniteCache<Object, Object> cache0 = ignite0.createCache(cacheConfiguration(atomicityMode));
-
- final int KEYS = histRebalance ? 1024 : 1024 * 256;
-
- if (histRebalance) {
- // Preload initial data to have start point for WAL rebalance.
- try (IgniteDataStreamer<Object, Object> streamer = ignite0.dataStreamer(DEFAULT_CACHE_NAME)) {
- streamer.allowOverwrite(true);
-
- for (int i = 0; i < KEYS; i++)
- streamer.addData(-i, 0);
- }
-
- forceCheckpoint();
-
- stopGrid(1);
- }
-
- // This data will be rebalanced.
- try (IgniteDataStreamer<Object, Object> streamer = ignite0.dataStreamer(DEFAULT_CACHE_NAME)) {
- streamer.allowOverwrite(true);
-
- for (int i = 0; i < KEYS; i++)
- streamer.addData(i, i);
- }
-
- blockRebalance(ignite0);
-
- IgniteEx ignite1 = GridTestUtils.runAsync(() -> startGrid(1)).get(10, TimeUnit.SECONDS);
-
- if (persistence) {
- ignite0.cluster().baselineAutoAdjustEnabled(false);
-
- ignite0.cluster().setBaselineTopology(2);
- }
-
- TestRecordingCommunicationSpi.spi(ignite0).waitForBlocked();
-
- Set<Integer> keysWithTombstone = new HashSet<>();
-
- // Do removes while rebalance is in progress.
- // All keys are removed during historical rebalance.
- for (int i = 0, step = histRebalance ? 1 : 64; i < KEYS; i += step) {
- keysWithTombstone.add(i);
-
- cache0.remove(i);
- }
-
- final LongMetric tombstoneMetric0 = ignite0.context().metric().registry(
- MetricUtils.cacheGroupMetricsRegistryName(DEFAULT_CACHE_NAME)).findMetric("Tombstones");
-
- final LongMetric tombstoneMetric1 = ignite1.context().metric().registry(
- MetricUtils.cacheGroupMetricsRegistryName(DEFAULT_CACHE_NAME)).findMetric("Tombstones");
-
- // On first node there should not be tombstones.
- assertEquals(0, tombstoneMetric0.value());
-
- if (expTombstone)
- assertEquals(keysWithTombstone.size(), tombstoneMetric1.value());
- else
- assertEquals(0, tombstoneMetric1.value());
-
- // Update some of removed keys, this should remove tombstones.
- for (int i = 0; i < KEYS; i += 128) {
- keysWithTombstone.remove(i);
-
- cache0.put(i, i);
- }
-
- assertTrue("Keys with tombstones should exist", !keysWithTombstone.isEmpty());
-
- assertEquals(0, tombstoneMetric0.value());
-
- if (expTombstone)
- assertEquals(keysWithTombstone.size(), tombstoneMetric1.value());
- else
- assertEquals(0, tombstoneMetric1.value());
-
- TestRecordingCommunicationSpi.spi(ignite0).stopBlock();
-
- awaitPartitionMapExchange();
-
- IgniteCache<Integer, Integer> cache1 = ignite(1).cache(DEFAULT_CACHE_NAME);
-
- for (int i = 0; i < KEYS; i++) {
- if (keysWithTombstone.contains(i))
- assertNull(cache1.get(i));
- else
- assertEquals((Object)i, cache1.get(i));
- }
-
- // Tombstones should be removed after once rebalance is completed.
- GridTestUtils.waitForCondition(() -> tombstoneMetric1.value() == 0, 30_000);
-
- assertEquals(0, tombstoneMetric1.value());
- }
-
- /**
- *
- */
- private static void blockRebalance(IgniteEx node) {
- final int grpId = groupIdForCache(node, DEFAULT_CACHE_NAME);
-
- TestRecordingCommunicationSpi.spi(node).blockMessages((node0, msg) ->
- (msg instanceof GridDhtPartitionSupplyMessage)
- && ((GridCacheGroupIdMessage)msg).groupId() == grpId
- );
- }
-
- /**
- * @param atomicityMode Cache atomicity mode.
- * @return Cache configuration.
- */
- private CacheConfiguration<Object, Object> cacheConfiguration(CacheAtomicityMode atomicityMode) {
- return new CacheConfiguration<>(DEFAULT_CACHE_NAME)
- .setAtomicityMode(atomicityMode)
- .setCacheMode(PARTITIONED)
- .setBackups(2)
- .setRebalanceMode(ASYNC)
- .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
- .setAffinity(new RendezvousAffinityFunction(false, 64));
- }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/CacheRemoveWithTombstonesFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/CacheRemoveWithTombstonesFailoverTest.java
deleted file mode 100644
index db60ffc..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/CacheRemoveWithTombstonesFailoverTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
-
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
-import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
-import org.apache.ignite.spi.metric.LongMetric;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.WithSystemProperty;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
-
-/**
- * Tests to check failover scenarios over tombstones.
- */
-public class CacheRemoveWithTombstonesFailoverTest extends PartitionsEvictManagerAbstractTest {
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
-
- cfg.setConsistentId(gridName);
-
- cfg.setCommunicationSpi(commSpi);
-
- DataStorageConfiguration dsCfg = new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(
- new DataRegionConfiguration()
- .setInitialSize(256L * 1024 * 1024)
- .setMaxSize(256L * 1024 * 1024)
- .setPersistenceEnabled(true)
- )
- .setCheckpointFrequency(1024 * 1024 * 1024)
- .setWalMode(WALMode.LOG_ONLY);
-
- cfg.setDataStorageConfiguration(dsCfg);
-
- cfg.setCacheConfiguration(cacheConfiguration());
-
- return cfg;
- }
-
- /**
- * Test check that tombstones reside in persistent partition will be cleared after node restart.
- */
- @Test
- @WithSystemProperty(key = IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED, value = "false")
- public void testTombstonesClearedAfterRestart() throws Exception {
- IgniteEx crd = startGrid(0);
-
- crd.cluster().active(true);
-
- final int KEYS = 1024;
-
- for (int k = 0; k < KEYS; k++)
- crd.cache(DEFAULT_CACHE_NAME).put(k, k);
-
- blockRebalance(crd);
-
- IgniteEx node = startGrid(1);
-
- // Do not run clear tombsones task.
- instrumentEvictionQueue(node, task -> {
- if (task instanceof PartitionsEvictManager.ClearTombstonesTask)
- return null;
-
- return task;
- });
-
- resetBaselineTopology();
-
- TestRecordingCommunicationSpi.spi(crd).waitForBlocked();
-
- Set<Integer> keysWithTombstone = new HashSet<>();
-
- // Do removes while rebalance is in progress.
- for (int i = 0; i < KEYS; i += 2) {
- keysWithTombstone.add(i);
-
- crd.cache(DEFAULT_CACHE_NAME).remove(i);
- }
-
- final LongMetric tombstoneMetric = node.context().metric().registry(
- MetricUtils.cacheGroupMetricsRegistryName(DEFAULT_CACHE_NAME)).findMetric("Tombstones");
-
- Assert.assertEquals(keysWithTombstone.size(), tombstoneMetric.value());
-
- // Resume rebalance.
- TestRecordingCommunicationSpi.spi(crd).stopBlock();
-
- // Partitions should be in OWNING state.
- awaitPartitionMapExchange();
-
- // But tombstones removal should be skipped.
- Assert.assertEquals(keysWithTombstone.size(), tombstoneMetric.value());
-
- // Stop node with tombstones.
- stopGrid(1);
-
- // Stop coordinator.
- stopGrid(0);
-
- // Startup node with tombstones in inactive state.
- node = startGrid(1);
-
- final int grpId = groupIdForCache(node, DEFAULT_CACHE_NAME);
-
- // Tombstone metrics are unavailable before join to topology, using internal api.
- long tombstonesBeforeActivation = node.context().cache().cacheGroup(grpId).topology().localPartitions()
- .stream().map(part -> part.dataStore().tombstonesCount()).reduce(Long::sum).orElse(0L);
-
- Assert.assertEquals(keysWithTombstone.size(), tombstonesBeforeActivation);
-
- crd = startGrid(0);
-
- crd.cluster().active(true);
-
- awaitPartitionMapExchange();
-
- final LongMetric tombstoneMetric1 = node.context().metric().registry(
- MetricUtils.cacheGroupMetricsRegistryName(DEFAULT_CACHE_NAME)).findMetric("Tombstones");
-
- // Tombstones should be removed after join to topology.
- GridTestUtils.waitForCondition(() -> tombstoneMetric1.value() == 0, 30_000);
-
- assertEquals(0, tombstoneMetric1.value());
- }
-
- /**
- *
- */
- private static void blockRebalance(IgniteEx node) {
- final int grpId = groupIdForCache(node, DEFAULT_CACHE_NAME);
-
- TestRecordingCommunicationSpi.spi(node).blockMessages((node0, msg) ->
- (msg instanceof GridDhtPartitionSupplyMessage)
- && ((GridCacheGroupIdMessage)msg).groupId() == grpId
- );
- }
-
- /**
- * @return Cache configuration.
- */
- private CacheConfiguration<Object, Object> cacheConfiguration() {
- return new CacheConfiguration<>(DEFAULT_CACHE_NAME)
- .setAtomicityMode(TRANSACTIONAL)
- .setCacheMode(PARTITIONED)
- .setBackups(1)
- .setRebalanceMode(ASYNC)
- .setReadFromBackup(true)
- .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
- .setAffinity(new RendezvousAffinityFunction(false, 64));
- }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DropCacheContextDuringEvictionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DropCacheContextDuringEvictionTest.java
index 72a2853..4a7de04 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DropCacheContextDuringEvictionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DropCacheContextDuringEvictionTest.java
@@ -39,15 +39,11 @@ public class DropCacheContextDuringEvictionTest extends PartitionsEvictManagerAb
public void testDeactivation() throws Exception {
T2<IgniteEx, CountDownLatch> nodeAndEvictLatch = makeNodeWithEvictLatch();
- nodeAndEvictLatch.get1().createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ IgniteCache<Object, Object> cache = nodeAndEvictLatch.get1().createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
.setGroupName("test-grp"));
- try (IgniteDataStreamer<Object, Object> streamer = nodeAndEvictLatch.get1().dataStreamer(DEFAULT_CACHE_NAME)) {
- streamer.allowOverwrite(true);
-
- for (int k = 0; k < 100_000; k++)
- streamer.addData(k, k);
- }
+ for (int i = 0; i < 100_000; i++)
+ cache.put(i, i);
doActionDuringEviction(nodeAndEvictLatch, () -> nodeAndEvictLatch.get1().cluster().active(false));
@@ -64,19 +60,17 @@ public class DropCacheContextDuringEvictionTest extends PartitionsEvictManagerAb
List<String> caches = new ArrayList<>();
for (int idx = 0; idx < 10; idx++) {
- String cacheName = DEFAULT_CACHE_NAME + idx;
-
- nodeAndEvictLatch.get1().createCache(new CacheConfiguration<>(cacheName)
+ IgniteCache<Object, Object> cache = nodeAndEvictLatch.get1().createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME + idx)
.setGroupName("test-grp"));
- try (IgniteDataStreamer<Object, Object> streamer = nodeAndEvictLatch.get1().dataStreamer(cacheName)) {
+ caches.add(cache.getName());
+
+ try (IgniteDataStreamer streamer = nodeAndEvictLatch.get1().dataStreamer(cache.getName())) {
streamer.allowOverwrite(true);
- for (int k = 0; k < 100_000; k++)
- streamer.addData(k, k);
+ for (int i = 0; i < 200_000; i++)
+ streamer.addData(i, i);
}
-
- caches.add(cacheName);
}
doActionDuringEviction(nodeAndEvictLatch, () -> nodeAndEvictLatch.get1().destroyCaches(caches));
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManagerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManagerAbstractTest.java
index 233b25e..e49e07f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManagerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManagerAbstractTest.java
@@ -17,22 +17,26 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.AbstractFailureHandler;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.NotNull;
/**
*
@@ -45,7 +49,8 @@ public abstract class PartitionsEvictManagerAbstractTest extends GridCommonAbstr
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- cfg.setActiveOnStart(false);
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)));
cfg.setFailureHandler(new AbstractFailureHandler() {
/** {@inheritDoc} */
@@ -62,16 +67,12 @@ public abstract class PartitionsEvictManagerAbstractTest extends GridCommonAbstr
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
- stopAllGrids();
-
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
-
- cleanPersistenceDir();
}
/**
@@ -91,43 +92,40 @@ public abstract class PartitionsEvictManagerAbstractTest extends GridCommonAbstr
protected void awaitEvictionQueueForFilling(IgniteEx node, int ms) throws IgniteInterruptedCheckedException {
PartitionsEvictManager.BucketQueue evictionQueue = node.context().cache().context().evict().evictionQueue;
- assertTrue(GridTestUtils.waitForCondition(() -> {
- for (Queue queue : evictionQueue.buckets)
- return ((InstrumentedEvictionQueue) queue).itemOffered;
-
- return false;
- }, ms));
+ assertTrue(GridTestUtils.waitForCondition(() -> !evictionQueue.isEmpty(), ms));
}
/**
* @param node Node.
- * @param interceptor Interceptor that will be invoked after task from eviction has polled.
+ * @param latch Latch.
+ * @param completeWithError Inner future throws exception.
*/
- protected void instrumentEvictionQueue(
- IgniteEx node,
- IgniteClosure<PartitionsEvictManager.AbstractEvictionTask,
- PartitionsEvictManager.AbstractEvictionTask> interceptor
- ) {
+ protected void subscribeEvictionQueueAtLatch(IgniteEx node, CountDownLatch latch, boolean completeWithError) {
PartitionsEvictManager.BucketQueue evictionQueue = node.context().cache().context().evict().evictionQueue;
Queue[] buckets = evictionQueue.buckets;
for (int i = 0; i < buckets.length; i++)
- buckets[i] = new InstrumentedEvictionQueue(interceptor);
+ buckets[i] = new WaitingQueue(latch, completeWithError);
}
/**
*
*/
protected T2<IgniteEx, CountDownLatch> makeNodeWithEvictLatch() throws Exception {
+ return makeNodeWithEvictLatch(false);
+ }
+
+ /**
+ *
+ */
+ protected T2<IgniteEx, CountDownLatch> makeNodeWithEvictLatch(boolean completeWithError) throws Exception {
IgniteEx node1 = startGrid(0);
- CountDownLatch latch = new CountDownLatch(1);
+ node1.cluster().baselineAutoAdjustEnabled(false);
- instrumentEvictionQueue(node1, task -> {
- U.awaitQuiet(latch);
+ CountDownLatch latch = new CountDownLatch(1);
- return task;
- });
+ subscribeEvictionQueueAtLatch(node1, latch, completeWithError);
node1.cluster().active(true);
@@ -139,7 +137,11 @@ public abstract class PartitionsEvictManagerAbstractTest extends GridCommonAbstr
* @param r R.
*/
protected void doActionDuringEviction(T2<IgniteEx, CountDownLatch> nodeAndEvictLatch, Runnable r) throws Exception {
- startGrid(1);
+ IgniteEx node2 = startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ nodeAndEvictLatch.get1().cluster().setBaselineTopology(node2.cluster().topologyVersion());
awaitEvictionQueueForFilling(nodeAndEvictLatch.get1(), 100_000);
@@ -151,38 +153,55 @@ public abstract class PartitionsEvictManagerAbstractTest extends GridCommonAbstr
}
/**
- * Queue that executes an interceptor during eviction task poll.
+ * Queue witch waits on the poll or breaks a PartitionEvictionTask.
*/
- private static class InstrumentedEvictionQueue extends LinkedBlockingQueue {
- /** Interceptor. */
- private final IgniteClosure<PartitionsEvictManager.AbstractEvictionTask,
- PartitionsEvictManager.AbstractEvictionTask> interceptor;
+ private class WaitingQueue extends LinkedBlockingQueue {
+ /** Latch. */
+ private final CountDownLatch latch;
- /** Empty indicator. */
- private volatile boolean itemOffered;
+ /** Complete with error. */
+ private final boolean completeWithError;
/**
- * @param interceptor Interceptor.
+ * @param latch Latch.
+ * @param completeWithError flag.
*/
- private InstrumentedEvictionQueue(IgniteClosure<PartitionsEvictManager.AbstractEvictionTask,
- PartitionsEvictManager.AbstractEvictionTask> interceptor
- ) {
- this.interceptor = interceptor;
- }
-
- /** {@inheritDoc} */
- @Override public boolean offer(@NotNull Object o) {
- itemOffered = true;
-
- return super.offer(o);
+ public WaitingQueue(CountDownLatch latch, boolean completeWithError) {
+ this.latch = latch;
+ this.completeWithError = completeWithError;
}
/** {@inheritDoc} */
@Override public Object poll() {
+ U.awaitQuiet(latch);
+
Object obj = super.poll();
- if (obj instanceof PartitionsEvictManager.AbstractEvictionTask)
- return interceptor.apply((PartitionsEvictManager.AbstractEvictionTask) obj);
+ // This code uses for failure handler testing into PartitionEvictionTask.
+ if(obj != null && completeWithError) {
+ try {
+ Field field = U.findField(PartitionsEvictManager.PartitionEvictionTask.class, "finishFut");
+
+ field.setAccessible(true);
+
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+
+ field.set(obj, new GridFutureAdapter<Object>() {
+ @Override
+ protected boolean onDone(@Nullable Object res, @Nullable Throwable err, boolean cancel) {
+ if (err == null)
+ throw new RuntimeException("TEST");
+
+ return super.onDone(res, err, cancel);
+ }
+ });
+ }
+ catch (Exception e) {
+ fail();
+ }
+ }
return obj;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictionTaskFailureHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictionTaskFailureHandlerTest.java
index e78c61b..58c2460 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictionTaskFailureHandlerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictionTaskFailureHandlerTest.java
@@ -17,16 +17,11 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
-import org.apache.ignite.IgniteDataStreamer;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
@@ -34,69 +29,20 @@ import org.junit.Test;
*
*/
public class PartitionsEvictionTaskFailureHandlerTest extends PartitionsEvictManagerAbstractTest {
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
- cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
-
- return cfg;
- }
-
/**
*
*/
@Test
public void testEvictionTaskShouldCallFailureHandler() throws Exception {
- IgniteEx node = startGrid(0);
-
- AtomicBoolean once = new AtomicBoolean();
-
- // Partition eviction task should throw exception after completion.
- instrumentEvictionQueue(node, task -> {
- if (!(task instanceof PartitionsEvictManager.PartitionEvictionTask))
- return task;
-
- // Fail once.
- if (!once.compareAndSet(false, true))
- return task;
-
- try {
- Field field = U.findField(PartitionsEvictManager.PartitionEvictionTask.class, "finishFut");
-
- field.setAccessible(true);
-
- Field modifiersField = Field.class.getDeclaredField("modifiers");
- modifiersField.setAccessible(true);
- modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-
- field.set(task, new GridFutureAdapter<Object>() {
- @Override protected boolean onDone(@Nullable Object res, @Nullable Throwable err, boolean cancel) {
- if (err == null)
- throw new RuntimeException("TEST");
-
- return super.onDone(res, err, cancel);
- }
- });
- }
- catch (Exception e) {
- fail();
- }
-
- return task;
- });
-
- node.cluster().active(true);
+ T2<IgniteEx, CountDownLatch> nodeAndEvictLatch = makeNodeWithEvictLatch(true);
- try (IgniteDataStreamer<Object, Object> streamer = node.dataStreamer(DEFAULT_CACHE_NAME)) {
- streamer.allowOverwrite(true);
+ IgniteCache<Object, Object> cache = nodeAndEvictLatch.get1().createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setGroupName("test-grp"));
- for (int k = 0; k < 1024; k++)
- node.cache(DEFAULT_CACHE_NAME).put(k, k);
- }
+ for (int i = 0; i < 100_000; i++)
+ cache.put(i, i);
- // Some partitions from node 0 should be evicted.
- startGrid(1);
+ doActionDuringEviction(nodeAndEvictLatch, () -> {});
assertTrue(GridTestUtils.waitForCondition(() -> failure.get(), 10_000));
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
index 3ee9e95..e4a8c9b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
@@ -624,11 +624,6 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public void cacheId(int cacheId) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
@Override public long newMvccCoordinatorVersion() {
return 0;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite9.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite9.java
index 008d588..c1decfe 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite9.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite9.java
@@ -22,31 +22,28 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.metric.IoStatisticsCachePersistenceSelfTest;
+import org.apache.ignite.internal.metric.IoStatisticsCacheSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectionsSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest;
-import org.apache.ignite.internal.processors.cache.distributed.CacheRemoveWithTombstonesLoadTest;
-import org.apache.ignite.internal.processors.cache.distributed.CacheRemoveWithTombstonesTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCachePrimarySyncTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteTxConcurrentRemoveObjectsTest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.CacheRemoveWithTombstonesFailoverTest;
import org.apache.ignite.internal.processors.cache.transactions.TxCrossCachePartitionConsistencyTest;
import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyHistoryRebalanceTest;
-import org.apache.ignite.internal.metric.IoStatisticsCachePersistenceSelfTest;
-import org.apache.ignite.internal.metric.IoStatisticsCacheSelfTest;
-import org.apache.ignite.testframework.junits.DynamicSuite;
+import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyTest;
import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyVolatileRebalanceTest;
import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryOneBackupHistoryRebalanceTest;
import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryOneBackupTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryTwoBackupsFailAllHistoryRebalanceTest;
import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest;
import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest;
-import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryTwoBackupsFailAllHistoryRebalanceTest;
import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryTwoBackupsTest;
import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStatePutTest;
import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateTwoPrimaryTwoBackupsTest;
-import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyTest;
import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateWithFilterTest;
+import org.apache.ignite.testframework.junits.DynamicSuite;
import org.junit.runner.RunWith;
/**
@@ -95,11 +92,6 @@ public class IgniteCacheMvccTestSuite9 {
ignoredTests.add(IoStatisticsCacheSelfTest.class);
ignoredTests.add(IoStatisticsCachePersistenceSelfTest.class);
- // Tombstones are not created with mvcc.
- ignoredTests.add(CacheRemoveWithTombstonesTest.class);
- ignoredTests.add(CacheRemoveWithTombstonesLoadTest.class);
- ignoredTests.add(CacheRemoveWithTombstonesFailoverTest.class);
-
return new ArrayList<>(IgniteCacheTestSuite9.suite(ignoredTests));
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
index 4b669ca..d4dcf48 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
@@ -36,14 +36,11 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectio
import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheOperationsInterruptTest;
-import org.apache.ignite.internal.processors.cache.distributed.CacheRemoveWithTombstonesLoadTest;
-import org.apache.ignite.internal.processors.cache.distributed.CacheRemoveWithTombstonesTest;
import org.apache.ignite.internal.processors.cache.distributed.FailBackupOnAtomicOperationTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCachePrimarySyncTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCacheWriteSynchronizationModesMultithreadedTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteTxConcurrentRemoveObjectsTest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.CacheRemoveWithTombstonesFailoverTest;
import org.apache.ignite.internal.processors.cache.transactions.PartitionUpdateCounterTest;
import org.apache.ignite.internal.processors.cache.transactions.TxCrossCachePartitionConsistencyTest;
import org.apache.ignite.internal.processors.cache.transactions.TxDataConsistencyOnCommitFailureTest;
@@ -129,10 +126,6 @@ public class IgniteCacheTestSuite9 {
GridTestUtils.addTestIfNeeded(suite, FailBackupOnAtomicOperationTest.class, ignoredTests);
- GridTestUtils.addTestIfNeeded(suite, CacheRemoveWithTombstonesTest.class, ignoredTests);
- GridTestUtils.addTestIfNeeded(suite, CacheRemoveWithTombstonesLoadTest.class, ignoredTests);
- GridTestUtils.addTestIfNeeded(suite, CacheRemoveWithTombstonesFailoverTest.class, ignoredTests);
-
return suite;
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
index db85a7b..f9ab6a4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
@@ -129,7 +129,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
continue;
if (filter == null || filter.applyPartition(part))
- cursors.add(store.cursor(cctx.cacheId(), lowerObj, upperObj, null, mvccSnapshot, false));
+ cursors.add(store.cursor(cctx.cacheId(), lowerObj, upperObj, null, mvccSnapshot));
}
return new H2PkHashIndexCursor(cursors.iterator());
@@ -209,7 +209,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
int part = store.partId();
if (partsFilter == null || partsFilter.applyPartition(part))
- cursors.add(store.cursor(cctx.cacheId(), false));
+ cursors.add(store.cursor(cctx.cacheId()));
}
Cursor pkHashCursor = new H2PkHashIndexCursor(cursors.iterator());
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/H2CacheRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/H2CacheRow.java
index 4ac6303a..4bd584f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/H2CacheRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/H2CacheRow.java
@@ -231,11 +231,6 @@ public class H2CacheRow extends H2Row implements CacheDataRow {
}
/** {@inheritDoc} */
- @Override public void cacheId(int cacheId) {
- row.cacheId(cacheId);
- }
-
- /** {@inheritDoc} */
@Override public long mvccCoordinatorVersion() {
return row.mvccCoordinatorVersion();
}