You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/11 10:43:09 UTC
[11/17] ignite git commit: ignite-5937
ignite-5937
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/21bc6338
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/21bc6338
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/21bc6338
Branch: refs/heads/ignite-5932
Commit: 21bc6338da4e9d794e624b0c84ae5a5615f86d53
Parents: c1b2c03
Author: sboikov <sb...@gridgain.com>
Authored: Tue Oct 10 13:31:40 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Oct 10 14:41:34 2017 +0300
----------------------------------------------------------------------
.../cache/IgniteCacheOffheapManagerImpl.java | 141 ++++++-----
.../dht/preloader/GridDhtPartitionSupplier.java | 24 +-
.../GridDhtPartitionSupplyMessage.java | 4 +-
.../cache/mvcc/CacheCoordinatorsProcessor.java | 27 +-
.../cache/mvcc/MvccCoordinatorVersion.java | 2 +-
.../cache/persistence/CacheDataRow.java | 5 +
.../cache/persistence/CacheDataRowAdapter.java | 5 +
.../cache/persistence/CacheSearchRow.java | 6 +
.../persistence/GridCacheOffheapManager.java | 5 +
.../processors/cache/persistence/RowStore.java | 2 +
.../persistence/freelist/FreeListImpl.java | 11 +-
.../cache/persistence/tree/io/DataPageIO.java | 22 +-
.../cache/tree/AbstractDataInnerIO.java | 10 +-
.../cache/tree/AbstractDataLeafIO.java | 10 +-
.../cache/tree/CacheDataRowStore.java | 18 +-
.../processors/cache/tree/CacheDataTree.java | 8 +-
.../internal/processors/cache/tree/DataRow.java | 7 +
.../processors/cache/tree/MvccDataRow.java | 32 ++-
.../cache/tree/MvccKeyMaxVersionBound.java | 9 +-
.../processors/cache/tree/MvccRemoveRow.java | 57 +++++
.../processors/cache/tree/MvccUpdateRow.java | 67 +++--
.../cache/tree/MvccVersionBasedSearchRow.java | 16 +-
.../cache/mvcc/CacheMvccTransactionsTest.java | 245 ++++++++++++++++++-
.../database/FreeListImplSelfTest.java | 5 +
.../processors/query/h2/opt/GridH2Row.java | 5 +
25 files changed, 604 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 80d36c1..380ec94 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
@@ -54,9 +55,9 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.cache.tree.DataRow;
-import org.apache.ignite.internal.processors.cache.tree.MvccDataRow;
import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound;
import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound;
+import org.apache.ignite.internal.processors.cache.tree.MvccRemoveRow;
import org.apache.ignite.internal.processors.cache.tree.MvccSearchRow;
import org.apache.ignite.internal.processors.cache.tree.MvccUpdateRow;
import org.apache.ignite.internal.processors.cache.tree.MvccVersionBasedSearchRow;
@@ -85,6 +86,8 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
/**
*
@@ -1374,60 +1377,43 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
// Make sure value bytes initialized.
key.valueBytes(coCtx);
- MvccUpdateRow updateRow = new MvccUpdateRow(
+ MvccRemoveRow updateRow = new MvccRemoveRow(
key,
- null,
- null,
mvccVer,
partId,
cacheId);
- rowStore.addRow(updateRow);
-
- assert updateRow.link() != 0 : updateRow;
-
if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
updateRow.cacheId(cctx.cacheId());
- GridLongList waitTxs = null;
+ dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
- if (mvccVer.initialLoad()) {
- boolean old = dataTree.putx(updateRow);
+ MvccUpdateRow.UpdateResult res = updateRow.updateResult();
- assert !old;
+ if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
+ assert !primary : updateRow;
- incrementSize(cctx.cacheId());
+ cleanup(updateRow.cleanupRows(), false);
}
else {
- dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
-
- boolean old = dataTree.putx(updateRow);
-
- assert !old;
+ if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL)
+ decrementSize(cacheId);
- if (!updateRow.previousNotNull())
- incrementSize(cctx.cacheId());
+ CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), true);
- waitTxs = updateRow.activeTransactions();
-
- List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
-
- if (cleanupRows != null) {
- for (int i = 0; i < cleanupRows.size(); i++) {
- CacheSearchRow oldRow = cleanupRows.get(i);
-
- assert oldRow.link() != 0L : oldRow;
+ if (rmvRow == null)
+ rowStore.addRow(updateRow);
+ else
+ updateRow.link(rmvRow.link());
- boolean rmvd = dataTree.removex(oldRow);
+ assert updateRow.link() != 0L;
- assert rmvd;
+ boolean old = dataTree.putx(updateRow);
- rowStore.removeRow(oldRow.link());
- }
- }
+ assert !old;
}
- return waitTxs;
+ return updateRow.activeTransactions();
}
finally {
busyLock.leaveBusy();
@@ -1464,16 +1450,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
partId,
cacheId);
- rowStore.addRow(updateRow);
-
- assert updateRow.link() != 0 : updateRow;
-
if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
updateRow.cacheId(cctx.cacheId());
GridLongList waitTxs = null;
if (mvccVer.initialLoad()) {
+ rowStore.addRow(updateRow);
+
boolean old = dataTree.putx(updateRow);
assert !old;
@@ -1483,30 +1467,25 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
else {
dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
- boolean old = dataTree.putx(updateRow);
-
- assert !old;
-
- if (!updateRow.previousNotNull())
- incrementSize(cctx.cacheId());
-
- waitTxs = updateRow.activeTransactions();
+ MvccUpdateRow.UpdateResult res = updateRow.updateResult();
- List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
+ if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
+ assert !primary : updateRow;
+ }
+ else {
+ rowStore.addRow(updateRow);
- if (cleanupRows != null) {
- for (int i = 0; i < cleanupRows.size(); i++) {
- CacheSearchRow oldRow = cleanupRows.get(i);
+ boolean old = dataTree.putx(updateRow);
- assert oldRow.link() != 0L : oldRow;
+ assert !old;
- boolean rmvd = dataTree.removex(oldRow);
+ if (res == MvccUpdateRow.UpdateResult.PREV_NULL)
+ incrementSize(cctx.cacheId());
+ }
- assert rmvd;
+ cleanup(updateRow.cleanupRows(), false);
- rowStore.removeRow(oldRow.link());
- }
- }
+ waitTxs = updateRow.activeTransactions();
}
return waitTxs;
@@ -1516,6 +1495,39 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
}
+ /**
+ * @param cleanupRows Rows to cleanup.
+ * @param findRmv {@code True} if need keep removed row entry.
+ * @return Removed row entry if found.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable private CacheSearchRow cleanup(@Nullable List<CacheSearchRow> cleanupRows, boolean findRmv)
+ throws IgniteCheckedException {
+ CacheSearchRow rmvRow = null;
+
+ if (cleanupRows != null) {
+ for (int i = 0; i < cleanupRows.size(); i++) {
+ CacheSearchRow oldRow = cleanupRows.get(i);
+
+ assert oldRow.link() != 0L : oldRow;
+
+ boolean rmvd = dataTree.removex(oldRow);
+
+ assert rmvd;
+
+ if (findRmv &&
+ rmvRow == null &&
+ versionForRemovedValue(oldRow.mvccCoordinatorVersion())) {
+ rmvRow = oldRow;
+ }
+ else
+ rowStore.removeRow(oldRow.link());
+ }
+ }
+
+ return rmvRow;
+ }
+
/** {@inheritDoc} */
@Override public void update(GridCacheContext cctx,
KeyCacheObject key,
@@ -1832,18 +1844,27 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
while (cur.next()) {
CacheDataRow row = cur.get();
- if (row.mvccCoordinatorVersion() > ver.coordinatorVersion()
- || row.mvccCounter() > ver.counter())
+ long rowCrdVerMasked = row.mvccCoordinatorVersion();
+
+ long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
+
+ if (rowCrdVer > ver.coordinatorVersion() || row.mvccCounter() > ver.counter())
continue;
MvccLongList txs = ver.activeTransactions();
- if (txs != null && row.mvccCoordinatorVersion() == ver.coordinatorVersion() && txs.contains(row.mvccCounter()))
+ if (txs != null && rowCrdVer == ver.coordinatorVersion() && txs.contains(row.mvccCounter()))
continue;
if (curKey != null && row.key().equals(curKey))
continue;
+ if (CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked)) {
+ curKey = row.key();
+
+ continue;
+ }
+
curRow = row;
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 0905917..357fef8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -31,9 +31,9 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.T3;
@@ -43,6 +43,7 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.IgniteSpiException;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
/**
* Thread pool for supplying partitions to demanding nodes.
@@ -375,13 +376,24 @@ class GridDhtPartitionSupplier {
GridCacheEntryInfo info = grp.mvccEnabled() ?
new GridCacheMvccEntryInfo() : new GridCacheEntryInfo();
+
info.key(row.key());
- info.expireTime(row.expireTime());
- info.version(row.version());
- info.value(row.value());
info.cacheId(row.cacheId());
- info.mvccCoordinatorVersion(row.mvccCoordinatorVersion());
- info.mvccCounter(row.mvccCounter());
+
+ boolean rmvd = false;
+
+ if (grp.mvccEnabled()) {
+ info.mvccCoordinatorVersion(row.mvccCoordinatorVersion());
+ info.mvccCounter(row.mvccCounter());
+
+ rmvd = versionForRemovedValue(row.mvccCoordinatorVersion());
+ }
+
+ if (!rmvd) {
+ info.value(row.value());
+ info.version(row.version());
+ info.expireTime(row.expireTime());
+ }
if (preloadPred == null || preloadPred.apply(info))
s.addEntry0(part, info, grp.shared(), grp.cacheObjectContext());
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 90d11f5..6675f8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -42,6 +42,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
/**
* Partition supply message.
*/
@@ -217,7 +219,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple
void addEntry0(int p, GridCacheEntryInfo info, GridCacheSharedContext ctx, CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
assert info != null;
assert info.key() != null : info;
- assert info.value() != null : info;
+ assert info.value() != null || versionForRemovedValue(info.coordinatorVersion()): info;
// Need to call this method to initialize info properly.
marshalInfo(info, ctx, cacheObjCtx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 54fb3c8..9f9a7a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -146,21 +146,30 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
super(ctx);
}
- public static int compareCoordinatorVersions(long crdVer1, long crdVer2) {
- crdVer1 = CRD_VER_MASK & crdVer1;
- crdVer2 = CRD_VER_MASK & crdVer2;
-
- return Long.compare(crdVer1, crdVer2);
- }
-
- public long createVersionForRemovedValue(long crdVer) {
+ /**
+ * @param crdVer Coordinator version.
+ * @return Coordinator version with removed value flag.
+ */
+ public static long createVersionForRemovedValue(long crdVer) {
return crdVer | RMVD_VAL_VER_MASK;
}
- public boolean versionForRemovedValue(long crdVer) {
+ /**
+ * @param crdVer Coordinator version with flags.
+ * @return {@code True} if removed value flag is set.
+ */
+ public static boolean versionForRemovedValue(long crdVer) {
return (crdVer & RMVD_VAL_VER_MASK) != 0;
}
+ /**
+ * @param crdVer Coordinator version with flags.
+ * @return Coordinator version.
+ */
+ public static long unmaskCoordinatorVersion(long crdVer) {
+ return crdVer & CRD_VER_MASK;
+ }
+
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
statCntrs = new StatCounter[7];
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
index a0fd5ee..4003b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
@@ -44,7 +44,7 @@ public interface MvccCoordinatorVersion extends Message {
public long counter();
/**
- *
+ * @return {@code True} if version for initial load update.
*/
public boolean initialLoad();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
index 57aeaef..b76826f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
@@ -54,4 +54,9 @@ public interface CacheDataRow extends CacheSearchRow {
* @param key Key.
*/
public void key(KeyCacheObject key);
+
+ /**
+ * @return {@code True} if this is row for cache remove operation (used only with mvcc).
+ */
+ public boolean removed();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index 925431f..d0f2dab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -582,6 +582,11 @@ public class CacheDataRowAdapter implements CacheDataRow {
return 0;
}
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return false;
+ }
+
/**
*
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
index 5bf53d8..efdc08f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
@@ -43,7 +43,13 @@ public interface CacheSearchRow {
*/
public int cacheId();
+ /**
+ * @return Mvcc coordinator version.
+ */
public long mvccCoordinatorVersion();
+ /**
+ * @return Mvcc counter.
+ */
public long mvccCounter();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 1f52309..ee651c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -837,6 +837,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
@Override public long mvccCoordinatorVersion() {
return 0; // TODO IGNITE-3478.
}
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return false; // TODO IGNITE-3478.
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
index 9cc5c62..41d2c4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
@@ -82,6 +82,8 @@ public class RowStore {
try {
freeList.insertDataRow(row);
+
+ assert row.link() != 0L;
}
finally {
ctx.database().checkpointReadUnlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
index 3eb62ae..9bd27b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
@@ -590,12 +590,19 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
*/
public static int getRowSize(CacheDataRow row, boolean withCacheId) throws IgniteCheckedException {
KeyCacheObject key = row.key();
- CacheObject val = row.value();
int keyLen = key.valueBytesLength(null);
+
+ int len = keyLen + (withCacheId ? 4 : 0);
+
+ if (row.removed())
+ return len;
+
+ CacheObject val = row.value();
+
int valLen = val.valueBytesLength(null);
- return keyLen + valLen + CacheVersionIO.size(row.version(), false) + 8 + (withCacheId ? 4 : 0);
+ return len + valLen + CacheVersionIO.size(row.version(), false) + 8;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
index 628ff38..da012e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
@@ -1040,13 +1040,19 @@ public class DataPageIO extends PageIO {
final int payloadSize
) throws IgniteCheckedException {
final int keySize = row.key().valueBytesLength(null);
- final int valSize = row.value().valueBytesLength(null);
+
+ boolean rmvd = row.removed();
+
+ final int valSize = rmvd ? 0 : row.value().valueBytesLength(null);
int written = writeFragment(row, buf, rowOff, payloadSize, EntryPart.CACHE_ID, keySize, valSize);
written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.KEY, keySize, valSize);
- written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.EXPIRE_TIME, keySize, valSize);
- written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VALUE, keySize, valSize);
- written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VERSION, keySize, valSize);
+
+ if (!rmvd) {
+ written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.EXPIRE_TIME, keySize, valSize);
+ written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VALUE, keySize, valSize);
+ written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VERSION, keySize, valSize);
+ }
assert written == payloadSize;
}
@@ -1414,9 +1420,15 @@ public class DataPageIO extends PageIO {
}
addr += row.key().putValue(addr);
+
+ if (row.removed())
+ return;
}
- else
+ else {
+ assert !row.removed() : row;
+
addr += (2 + cacheIdSize + row.key().valueBytesLength(null));
+ }
addr += row.value().putValue(addr);
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index a07d012..fc82cbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.tree;
import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -27,6 +26,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInne
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteInClosure;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
/**
*
*/
@@ -59,8 +61,8 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
}
if (storeMvccVersion()) {
- assert row.mvccCoordinatorVersion() > 0 : row;
- assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row;
+ assert unmaskCoordinatorVersion(row.mvccCoordinatorVersion()) > 0 : row;
+ assert row.mvccCounter() != COUNTER_NA : row;
PageUtils.putLong(pageAddr, off, row.mvccCoordinatorVersion());
off += 8;
@@ -123,7 +125,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
long mvcCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
assert mvccTopVer > 0 : mvccTopVer;
- assert mvcCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+ assert mvcCntr != COUNTER_NA;
PageUtils.putLong(dstPageAddr, off, mvccTopVer);
off += 8;
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index ef08bec..c956d22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.tree;
import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -27,6 +26,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeaf
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteInClosure;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
/**
*
*/
@@ -61,8 +63,8 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
long mvccCrdVer = row.mvccCoordinatorVersion();
long mvccUpdateCntr = row.mvccCounter();
- assert mvccCrdVer > 0 : mvccCrdVer;
- assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+ assert unmaskCoordinatorVersion(mvccCrdVer) > 0 : mvccCrdVer;
+ assert mvccUpdateCntr != COUNTER_NA;
PageUtils.putLong(pageAddr, off, mvccCrdVer);
off += 8;
@@ -98,7 +100,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx);
assert mvccUpdateTopVer >=0 : mvccUpdateCntr;
- assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+ assert mvccUpdateCntr != COUNTER_NA;
PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer);
off += 8;
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
index f9e1eb3..85624d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
@@ -25,6 +25,8 @@ import org.apache.ignite.internal.processors.cache.persistence.RowStore;
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
/**
*
*/
@@ -65,17 +67,25 @@ public class CacheDataRowStore extends RowStore {
* @param cacheId Cache ID.
* @param hash Hash code.
* @param link Link.
- * @param mvccTopVer
- * @param mvccCntr
+ * @param rowData Required row data.
+ * @param crdVer Mvcc coordinator version.
+ * @param mvccCntr Mvcc counter.
* @return Search row.
*/
- MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long mvccTopVer, long mvccCntr) {
+ MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long crdVer, long mvccCntr) {
+ if (rowData != CacheDataRowAdapter.RowData.KEY_ONLY && versionForRemovedValue(crdVer)) {
+ if (rowData == CacheDataRowAdapter.RowData.NO_KEY)
+ return MvccDataRow.removedRowNoKey(partId, cacheId, crdVer, mvccCntr);
+ else
+ rowData = CacheDataRowAdapter.RowData.KEY_ONLY;
+ }
+
MvccDataRow dataRow = new MvccDataRow(grp,
hash,
link,
partId,
rowData,
- mvccTopVer,
+ crdVer,
mvccCntr);
initDataRow(dataRow, cacheId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index eaeefee..6309153 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId;
import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
/**
*
@@ -160,7 +161,8 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
long mvccCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
- cmp = Long.compare(row.mvccCoordinatorVersion(), mvccCrdVer);
+ cmp = Long.compare(unmaskCoordinatorVersion(row.mvccCoordinatorVersion()),
+ unmaskCoordinatorVersion(mvccCrdVer));
if (cmp != 0)
return cmp;
@@ -188,10 +190,10 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
CacheDataRowAdapter.RowData.FULL;
if (grp.mvccEnabled()) {
- long mvccTopVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+ long mvccCrdVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
long mvccCntr = rowIo.getMvccCounter(pageAddr, idx);
- return rowStore.mvccRow(cacheId, hash, link, x, mvccTopVer, mvccCntr);
+ return rowStore.mvccRow(cacheId, hash, link, x, mvccCrdVer, mvccCntr);
}
else
return rowStore.dataRow(cacheId, hash, link, x);
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
index 29bbaaf..d1e90d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
@@ -81,6 +81,13 @@ public class DataRow extends CacheDataRowAdapter {
this.cacheId = cacheId;
}
+ /**
+ *
+ */
+ protected DataRow() {
+ super(0);
+ }
+
/** {@inheritDoc} */
@Override public int partition() {
return part;
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
index eb1ee10..916ea93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
@@ -18,12 +18,11 @@
package org.apache.ignite.internal.processors.cache.tree;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
/**
*
*/
@@ -46,7 +45,7 @@ public class MvccDataRow extends DataRow {
MvccDataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData, long crdVer, long mvccCntr) {
super(grp, hash, link, part, rowData);
- assert crdVer > 0 : crdVer;
+ assert unmaskCoordinatorVersion(crdVer) > 0 : crdVer;
assert mvccCntr != CacheCoordinatorsProcessor.COUNTER_NA;
this.crdVer = crdVer;
@@ -54,25 +53,32 @@ public class MvccDataRow extends DataRow {
}
/**
- * @param key Key.
- * @param val Value.
- * @param ver Version.
+ *
+ */
+ private MvccDataRow() {
+ // No-op.
+ }
+
+ /**
* @param part Partition.
* @param cacheId Cache ID.
* @param crdVer Mvcc coordinator version.
* @param mvccCntr Mvcc counter.
+ * @return Row.
*/
- public MvccDataRow(KeyCacheObject key,
- CacheObject val,
- GridCacheVersion ver,
+ static MvccDataRow removedRowNoKey(
int part,
int cacheId,
long crdVer,
long mvccCntr) {
- super(key, val, ver, part, 0L, cacheId);
+ MvccDataRow row = new MvccDataRow();
- this.mvccCntr = mvccCntr;
- this.crdVer = crdVer;
+ row.cacheId = cacheId;
+ row.part = part;
+ row.crdVer = crdVer;
+ row.mvccCntr = mvccCntr;
+
+ return row;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
index aa9422d..007ac09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
@@ -27,6 +27,8 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
/**
*
*/
@@ -55,7 +57,12 @@ public class MvccKeyMaxVersionBound extends SearchRow implements BPlusTree.TreeR
int idx)
throws IgniteCheckedException
{
- resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+ RowLinkIO rowIo = (RowLinkIO)io;
+
+ if (versionForRemovedValue(rowIo.getMvccCoordinatorVersion(pageAddr, idx)))
+ resRow = null;
+ else
+ resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
return false; // Stop search.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
new file mode 100644
index 0000000..8fd8a6e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class MvccRemoveRow extends MvccUpdateRow {
+ /**
+ * @param key Key.
+ * @param mvccVer Mvcc version.
+ * @param part Partition.
+ * @param cacheId Cache ID.
+ */
+ public MvccRemoveRow(
+ KeyCacheObject key,
+ MvccCoordinatorVersion mvccVer,
+ int part,
+ int cacheId) {
+ super(key, null, null, mvccVer, part, cacheId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCoordinatorVersion() {
+ return CacheCoordinatorsProcessor.createVersionForRemovedValue(super.mvccCoordinatorVersion());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccRemoveRow.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
index d3303e8..794661d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
@@ -32,12 +33,14 @@ import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
/**
*
*/
public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
/** */
- private Boolean hasPrev;
+ private UpdateResult res;
/** */
private boolean canCleanup;
@@ -74,8 +77,8 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
/**
* @return {@code True} if previous value was non-null.
*/
- public boolean previousNotNull() {
- return hasPrev != null && hasPrev;
+ public UpdateResult updateResult() {
+ return res == null ? UpdateResult.PREV_NULL : res;
}
/**
@@ -98,17 +101,18 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
* @param idx Item index.
* @return Always {@code true}.
*/
- private boolean assertVersionGreater(RowLinkIO io, long pageAddr, int idx) {
- long rowCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
+ private boolean assertVersion(RowLinkIO io, long pageAddr, int idx) {
+ long rowCrdVer = unmaskCoordinatorVersion(io.getMvccCoordinatorVersion(pageAddr, idx));
long rowCntr = io.getMvccCounter(pageAddr, idx);
- int cmp = Long.compare(mvccCoordinatorVersion(), rowCrdVer);
+ int cmp = Long.compare(mvccVer.coordinatorVersion(), rowCrdVer);
if (cmp == 0)
- cmp = Long.compare(mvccCounter(), rowCntr);
+ cmp = Long.compare(mvccVer.counter(), rowCntr);
- assert cmp > 0 : "[updCrd=" + mvccCoordinatorVersion() +
- ", updCntr=" + mvccCounter() +
+ // Can be equals if backup rebalanced value updated on primary.
+ assert cmp >= 0 : "[updCrd=" + mvccVer.coordinatorVersion() +
+ ", updCntr=" + mvccVer.counter() +
", rowCrd=" + rowCrdVer +
", rowCntr=" + rowCntr + ']';
@@ -124,15 +128,31 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
{
RowLinkIO rowIo = (RowLinkIO)io;
- // All previous versions should be less then new one.
- assert assertVersionGreater(rowIo, pageAddr, idx);
+ // Assert version grows.
+ assert assertVersion(rowIo, pageAddr, idx);
boolean checkActive = mvccVer.activeTransactions().size() > 0;
boolean txActive = false;
+ long rowCrdVerMasked = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+ long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
+
+ if (res == null) {
+ int cmp = Long.compare(mvccVer.coordinatorVersion(), rowCrdVer);
+
+ if (cmp == 0)
+ cmp = Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCounter(pageAddr, idx));
+
+ if (cmp == 0)
+ res = UpdateResult.VERSION_FOUND;
+ else
+ res = CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked) ?
+ UpdateResult.PREV_NULL : UpdateResult.PREV_NOT_NULL;
+ }
+
// Suppose transactions on previous coordinator versions are done.
- if (checkActive && mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx)) {
+ if (checkActive && mvccVer.coordinatorVersion() == rowCrdVer) {
long rowMvccCntr = rowIo.getMvccCounter(pageAddr, idx);
if (mvccVer.activeTransactions().contains(rowMvccCntr)) {
@@ -145,15 +165,12 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
}
}
- if (hasPrev == null)
- hasPrev = Boolean.TRUE; // TODO IGNITE-3478 support removes.
-
if (!txActive) {
- assert Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCoordinatorVersion(pageAddr, idx)) >= 0;
+ assert Long.compare(mvccVer.coordinatorVersion(), rowCrdVer) >= 0;
int cmp;
- if (mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx))
+ if (mvccVer.coordinatorVersion() == rowCrdVer)
cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx));
else
cmp = 1;
@@ -163,10 +180,10 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
if (canCleanup) {
CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx);
- assert row.link() != 0 && row.mvccCoordinatorVersion() > 0 : row;
+ assert row.link() != 0 && row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row;
// Should not be possible to cleanup active tx.
- assert row.mvccCoordinatorVersion() != mvccVer.coordinatorVersion()
+ assert rowCrdVer != mvccVer.coordinatorVersion()
|| !mvccVer.activeTransactions().contains(row.mvccCounter());
if (cleanupRows == null)
@@ -196,4 +213,16 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
@Override public String toString() {
return S.toString(MvccUpdateRow.class, this, "super", super.toString());
}
+
+ /**
+ *
+ */
+ public enum UpdateResult {
+ /** */
+ VERSION_FOUND,
+ /** */
+ PREV_NULL,
+ /** */
+ PREV_NOT_NULL
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
index c829afb..a1d0127 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
@@ -28,6 +28,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
/**
*
*/
@@ -66,16 +69,23 @@ public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.Tr
{
boolean visible = true;
+ RowLinkIO rowIo = (RowLinkIO)io;
+
+ long crdVerMasked = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+
if (ver.activeTransactions().size() > 0) {
- RowLinkIO rowIo = (RowLinkIO)io;
+ long rowCrdVer = unmaskCoordinatorVersion(crdVerMasked);
// TODO IGNITE-3478 sort active transactions?
- if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) == ver.coordinatorVersion())
+ if (rowCrdVer == ver.coordinatorVersion())
visible = !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx));
}
if (visible) {
- resRow = ((CacheDataTree) tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+ if (versionForRemovedValue(crdVerMasked))
+ resRow = null;
+ else
+ resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
return false; // Stop search.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 115e8a2..d45afe7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.processors.cache.mvcc;
+import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -45,6 +47,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
@@ -119,6 +122,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/** */
private String nodeAttr;
+ /** */
+ private static final int PAGE_SIZE = MemoryConfiguration.DFLT_PAGE_SIZE;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -137,6 +143,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
if (nodeAttr != null)
cfg.setUserAttributes(F.asMap(nodeAttr, true));
+ MemoryConfiguration memCfg = new MemoryConfiguration();
+
+ memCfg.setPageSize(PAGE_SIZE);
+
+ cfg.setMemoryConfiguration(memCfg);
+
return cfg;
}
@@ -376,6 +388,109 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testSimplePutRemove() throws Exception {
+ simplePutRemove(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSimplePutRemove_LargeKeys() throws Exception {
+ simplePutRemove(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ * @param largeKeys {@code True} to use large keys (not fitting in single page).
+ */
+ private void simplePutRemove(boolean largeKeys) throws Exception {
+ Ignite node = startGrid(0);
+
+ IgniteTransactions txs = node.transactions();
+
+ final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1));
+
+ final int KEYS = 100;
+
+ checkValues(new HashMap<>(), cache);
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < KEYS; k++)
+ cache.remove(testKey(largeKeys, k));
+
+ tx.commit();
+ }
+
+ checkValues(new HashMap<>(), cache);
+
+ Map<Object, Object> expVals = new HashMap<>();
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < KEYS; k++) {
+ Object key = testKey(largeKeys, k);
+
+ expVals.put(key, k);
+
+ cache.put(key, k);
+ }
+
+ tx.commit();
+ }
+
+ checkValues(expVals, cache);
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < KEYS; k++) {
+ if (k % 2 == 0) {
+ Object key = testKey(largeKeys, k);
+
+ cache.remove(key);
+
+ expVals.remove(key);
+ }
+ }
+
+ tx.commit();
+ }
+
+ checkValues(expVals, cache);
+ }
+
+ /**
+ * @param largeKeys {@code True} to use large keys (not fitting in single page).
+ * @param idx Index.
+ * @return Key instance.
+ */
+ private static Object testKey(boolean largeKeys, int idx) {
+ if (largeKeys) {
+ int payloadSize = PAGE_SIZE + ThreadLocalRandom.current().nextInt(PAGE_SIZE * 10);
+
+ return new TestKey(idx, payloadSize);
+ }
+ else
+ return idx;
+ }
+
+ /**
+ * @param expVals Expected values.
+ * @param cache Cache.
+ */
+ private void checkValues(Map<Object, Object> expVals, IgniteCache<Object, Object> cache) {
+ Map<Object, Object> res = cache.getAll(expVals.keySet());
+
+ assertEquals(expVals, res);
+
+ res = new HashMap<>();
+
+ for (IgniteCache.Entry<Object, Object> e : cache)
+ res.put(e.getKey(), e.getValue());
+
+ assertEquals(expVals, res);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testThreadUpdatesAreVisibleForThisThread() throws Exception {
final Ignite ignite = startGrid(0);
@@ -1601,7 +1716,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testRebalance1() throws Exception {
+ public void testSimpleRebalance() throws Exception {
Ignite srv0 = startGrid(0);
IgniteCache<Integer, Integer> cache = (IgniteCache)srv0.createCache(
@@ -1664,6 +1779,32 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testSimpleRebalanceWithRemovedValues() throws Exception {
+ Ignite node = startGrid(0);
+
+ IgniteTransactions txs = node.transactions();
+
+ final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64));
+
+ final int KEYS = 100;
+
+ checkValues(new HashMap<>(), cache);
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < KEYS; k++)
+ cache.remove(k);
+
+ tx.commit();
+ }
+
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCoordinatorFailurePessimisticTx() throws Exception {
testSpi = true;
@@ -2722,9 +2863,55 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
assertEquals(KEYS, cache.size());
}
- // TODO IGNITE-3478: test removes.
- }
+ int size = KEYS;
+
+ for (int i = 0; i < KEYS; i++) {
+ if (i % 2 == 0) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.remove(key);
+ tx.commit();
+ }
+
+ size--;
+
+ assertEquals(size, cache.size());
+ }
+ }
+
+ // Check size does not change if remove already removed keys.
+ for (int i = 0; i < KEYS; i++) {
+ if (i % 2 == 0) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.remove(key);
+
+ tx.commit();
+ }
+
+ assertEquals(size, cache.size());
+ }
+ }
+
+ for (int i = 0; i < KEYS; i++) {
+ if (i % 2 == 0) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, i);
+
+ tx.commit();
+ }
+
+ size++;
+
+ assertEquals(size, cache.size());
+ }
+ }
+ }
/**
* @throws IgniteCheckedException If failed.
@@ -2792,7 +2979,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
key0,
vers.get(0).get1());
- MvccCoordinatorVersionResponse ver = version(crd.currentCoordinator().coordinatorVersion(), 100000);
+ MvccCoordinatorVersionResponse ver = version(vers.get(0).get2().coordinatorVersion(), 100000);
for (int v = 0; v < vers.size(); v++) {
MvccCounter cntr = vers.get(v).get2();
@@ -3074,4 +3261,54 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
return null;
}
}
+
+ /**
+ *
+ */
+ static class TestKey implements Serializable {
+ /** */
+ private final int key;
+
+ /** */
+ private final byte[] payload;
+
+ /**
+ * @param key Key.
+ * @param payloadSize Payload size.
+ */
+ public TestKey(int key, int payloadSize) {
+ this.key = key;
+ this.payload = new byte[payloadSize];
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestKey testKey = (TestKey)o;
+
+ if (key != testKey.key)
+ return false;
+
+ return Arrays.equals(payload, testKey.payload);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = key;
+
+ res = 31 * res + Arrays.hashCode(payload);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "TestKey [k=" + key + ", payloadLen=" + payload.length + ']';
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
index d0897e1..600c8df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
@@ -435,6 +435,11 @@ public class FreeListImplSelfTest extends GridCommonAbstractTest {
@Override public long mvccCounter() {
return 0;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return false;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
index 392301c..1819cfe 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
@@ -174,4 +174,9 @@ public abstract class GridH2Row implements SearchRow, CacheDataRow, Row {
@Override public long mvccCounter() {
throw new UnsupportedOperationException();
}
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ throw new UnsupportedOperationException();
+ }
}
\ No newline at end of file