You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/11 10:43:11 UTC
[13/17] ignite git commit: ignite-5937
ignite-5937
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bb969db0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bb969db0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bb969db0
Branch: refs/heads/ignite-5932
Commit: bb969db0457e46fc2db4322927bd3536fdd9fb7b
Parents: de3ed0d
Author: sboikov <sb...@gridgain.com>
Authored: Tue Oct 10 15:36:01 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Oct 10 15:41:21 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheEntryInfo.java | 5 -
.../processors/cache/GridCacheMapEntry.java | 18 ++-
.../cache/IgniteCacheOffheapManager.java | 39 +++++
.../cache/IgniteCacheOffheapManagerImpl.java | 160 +++++++++++++------
.../cache/mvcc/MvccCoordinatorVersion.java | 5 -
.../mvcc/MvccCoordinatorVersionResponse.java | 5 -
.../persistence/GridCacheOffheapManager.java | 14 ++
.../processors/cache/tree/MvccRemoveRow.java | 11 +-
.../processors/cache/tree/MvccUpdateRow.java | 25 ++-
.../datastreamer/DataStreamerImpl.java | 6 +-
.../cache/mvcc/CacheMvccTransactionsTest.java | 34 +++-
11 files changed, 235 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
index 8a5f0df..e09d33c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
@@ -101,11 +101,6 @@ public class GridCacheEntryInfo implements Message, MvccCoordinatorVersion {
return 0;
}
- /** {@inheritDoc} */
- @Override public boolean initialLoad() {
- return true;
- }
-
/**
* @return Cache ID.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index ded9513..a1535e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2581,6 +2581,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean walEnabled = !cctx.isNear() && cctx.shared().wal() != null;
+ // TODO IGNITE-3478: move checks in special initialValue method.
if (cctx.shared().database().persistenceEnabled()) {
unswap(false);
@@ -2603,14 +2604,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
- if (val != null) {
- if (cctx.mvccEnabled())
- cctx.offheap().mvccUpdate(false, this, val, ver, mvccVer);
- else
- storeValue(val, expTime, ver, null);
+ if (cctx.mvccEnabled()) {
+ cctx.offheap().mvccInitialValue(this, val, ver, mvccVer);
+
+ if (val != null)
+ update(val, expTime, ttl, ver, true);
}
+ else {
+ if (val != null) {
+ storeValue(val, expTime, ver, null);
- update(val, expTime, ttl, ver, true);
+ update(val, expTime, ttl, ver, true);
+ }
+ }
boolean skipQryNtf = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index bee2108..9e3d0fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -189,6 +189,7 @@ public interface IgniteCacheOffheapManager {
throws IgniteCheckedException;
/**
+ * @param primary {@code True} if on primary node.
* @param entry Entry.
* @param val Value.
* @param ver Cache version.
@@ -204,6 +205,13 @@ public interface IgniteCacheOffheapManager {
MvccCoordinatorVersion mvccVer
) throws IgniteCheckedException;
+ /**
+ * @param primary {@code True} if on primary node.
+ * @param entry Entry.
+ * @param mvccVer Mvcc update version.
+ * @return Transactions to wait for before finishing current transaction.
+ * @throws IgniteCheckedException If failed.
+ */
@Nullable public GridLongList mvccRemove(
boolean primary,
GridCacheMapEntry entry,
@@ -211,6 +219,21 @@ public interface IgniteCacheOffheapManager {
) throws IgniteCheckedException;
/**
+ * @param entry Entry.
+ * @param val Value.
+ * @param ver Version.
+ * @param mvccVer Mvcc update version.
+ * @return {@code True} if value was inserted.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean mvccInitialValue(
+ GridCacheMapEntry entry,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer
+ ) throws IgniteCheckedException;
+
+ /**
* @param cctx Cache context.
* @param key Key.
* @param val Value.
@@ -507,6 +530,22 @@ public interface IgniteCacheOffheapManager {
* @param val Value.
* @param ver Version.
* @param mvccVer Mvcc version.
+ * @return {@code True} if new value was inserted.
+ * @throws IgniteCheckedException If failed.
+ */
+ boolean mvccInitialValue(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param mvccVer Mvcc version.
* @return List of transactions to wait for.
* @throws IgniteCheckedException If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 380ec94..4fb5bfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -383,6 +383,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
+ @Override public boolean mvccInitialValue(
+ GridCacheMapEntry entry,
+ CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
+ return dataStore(entry.localPartition()).mvccInitialValue(
+ entry.context(),
+ entry.key(),
+ val,
+ ver,
+ mvccVer);
+ }
+
+ /** {@inheritDoc} */
@Override public GridLongList mvccUpdate(
boolean primary,
GridCacheMapEntry entry,
@@ -1360,9 +1374,76 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public GridLongList mvccRemove(GridCacheContext cctx,
+ @Override public boolean mvccInitialValue(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer)
+ throws IgniteCheckedException
+ {
+ assert mvccVer != null;
+
+ if (!busyLock.enterBusy())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+ try {
+ assert val != null || CacheCoordinatorsProcessor.versionForRemovedValue(mvccVer.coordinatorVersion());
+
+ int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+
+ CacheObjectContext coCtx = cctx.cacheObjectContext();
+
+ // Make sure value bytes initialized.
+ key.valueBytes(coCtx);
+
+ MvccUpdateRow updateRow;
+
+ if (val != null) {
+ val.valueBytes(coCtx);
+
+ updateRow = new MvccUpdateRow(
+ key,
+ val,
+ ver,
+ mvccVer,
+ partId,
+ cacheId);
+ }
+ else {
+ updateRow = new MvccRemoveRow(
+ key,
+ mvccVer,
+ partId,
+ cacheId);
+ }
+
+ if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+ updateRow.cacheId(cctx.cacheId());
+
+ rowStore.addRow(updateRow);
+
+ boolean old = dataTree.putx(updateRow);
+
+ assert !old;
+
+ if (val != null)
+ incrementSize(cctx.cacheId());
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridLongList mvccUpdate(
+ GridCacheContext cctx,
boolean primary,
KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
assert mvccVer != null;
@@ -1376,9 +1457,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
// Make sure value bytes initialized.
key.valueBytes(coCtx);
+ val.valueBytes(coCtx);
- MvccRemoveRow updateRow = new MvccRemoveRow(
+ MvccUpdateRow updateRow = new MvccUpdateRow(
key,
+ val,
+ ver,
mvccVer,
partId,
cacheId);
@@ -1392,27 +1476,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
assert !primary : updateRow;
-
- cleanup(updateRow.cleanupRows(), false);
}
else {
- if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL)
- decrementSize(cacheId);
-
- CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), true);
-
- if (rmvRow == null)
- rowStore.addRow(updateRow);
- else
- updateRow.link(rmvRow.link());
-
- assert updateRow.link() != 0L;
+ rowStore.addRow(updateRow);
boolean old = dataTree.putx(updateRow);
assert !old;
+
+ if (res == MvccUpdateRow.UpdateResult.PREV_NULL)
+ incrementSize(cctx.cacheId());
}
+ cleanup(updateRow.cleanupRows(), false);
+
return updateRow.activeTransactions();
}
finally {
@@ -1421,12 +1498,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public GridLongList mvccUpdate(
- GridCacheContext cctx,
+ @Override public GridLongList mvccRemove(GridCacheContext cctx,
boolean primary,
KeyCacheObject key,
- CacheObject val,
- GridCacheVersion ver,
MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
assert mvccVer != null;
@@ -1440,12 +1514,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
// Make sure value bytes initialized.
key.valueBytes(coCtx);
- val.valueBytes(coCtx);
- MvccUpdateRow updateRow = new MvccUpdateRow(
+ MvccRemoveRow updateRow = new MvccRemoveRow(
key,
- val,
- ver,
mvccVer,
partId,
cacheId);
@@ -1453,42 +1524,34 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
updateRow.cacheId(cctx.cacheId());
- GridLongList waitTxs = null;
-
- if (mvccVer.initialLoad()) {
- rowStore.addRow(updateRow);
+ dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
- boolean old = dataTree.putx(updateRow);
+ MvccUpdateRow.UpdateResult res = updateRow.updateResult();
- assert !old;
+ if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
+ assert !primary : updateRow;
- incrementSize(cctx.cacheId());
+ cleanup(updateRow.cleanupRows(), false);
}
else {
- dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
+ if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL)
+ decrementSize(cacheId);
- MvccUpdateRow.UpdateResult res = updateRow.updateResult();
+ CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), true);
- if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
- assert !primary : updateRow;
- }
- else {
+ if (rmvRow == null)
rowStore.addRow(updateRow);
+ else
+ updateRow.link(rmvRow.link());
- boolean old = dataTree.putx(updateRow);
-
- assert !old;
-
- if (res == MvccUpdateRow.UpdateResult.PREV_NULL)
- incrementSize(cctx.cacheId());
- }
+ assert updateRow.link() != 0L;
- cleanup(updateRow.cleanupRows(), false);
+ boolean old = dataTree.putx(updateRow);
- waitTxs = updateRow.activeTransactions();
+ assert !old;
}
- return waitTxs;
+ return updateRow.activeTransactions();
}
finally {
busyLock.leaveBusy();
@@ -1848,7 +1911,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
- if (rowCrdVer > ver.coordinatorVersion() || row.mvccCounter() > ver.counter())
+ if (rowCrdVer > ver.coordinatorVersion())
+ continue;
+
+ if (rowCrdVer == ver.coordinatorVersion() && row.mvccCounter() > ver.counter())
continue;
MvccLongList txs = ver.activeTransactions();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
index 4003b73..d80e43c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
@@ -42,9 +42,4 @@ public interface MvccCoordinatorVersion extends Message {
* @return Counter.
*/
public long counter();
-
- /**
- * @return {@code True} if version for initial load update.
- */
- public boolean initialLoad();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
index 20d23ed..c037226 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
@@ -159,11 +159,6 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M
}
/** {@inheritDoc} */
- @Override public boolean initialLoad() {
- return false;
- }
-
- /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index ee651c2..45b78d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1256,6 +1256,20 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
+ @Override public boolean mvccInitialValue(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer)
+ throws IgniteCheckedException
+ {
+ CacheDataStore delegate = init0(false);
+
+ return delegate.mvccInitialValue(cctx, key, val, ver, mvccVer);
+ }
+
+ /** {@inheritDoc} */
@Override public GridLongList mvccUpdate(
GridCacheContext cctx,
boolean primary,
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
index 8fd8a6e..af11a9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
@@ -18,10 +18,12 @@
package org.apache.ignite.internal.processors.cache.tree;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.createVersionForRemovedValue;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
/**
*
*/
@@ -42,7 +44,12 @@ public class MvccRemoveRow extends MvccUpdateRow {
/** {@inheritDoc} */
@Override public long mvccCoordinatorVersion() {
- return CacheCoordinatorsProcessor.createVersionForRemovedValue(super.mvccCoordinatorVersion());
+ return createVersionForRemovedValue(super.mvccCoordinatorVersion());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long unmaskedCoordinatorVersion() {
+ return unmaskCoordinatorVersion(super.mvccCoordinatorVersion());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
index 794661d..137ca28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -105,13 +105,13 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
long rowCrdVer = unmaskCoordinatorVersion(io.getMvccCoordinatorVersion(pageAddr, idx));
long rowCntr = io.getMvccCounter(pageAddr, idx);
- int cmp = Long.compare(mvccVer.coordinatorVersion(), rowCrdVer);
+ int cmp = Long.compare(unmaskedCoordinatorVersion(), rowCrdVer);
if (cmp == 0)
cmp = Long.compare(mvccVer.counter(), rowCntr);
// Can be equals if backup rebalanced value updated on primary.
- assert cmp >= 0 : "[updCrd=" + mvccVer.coordinatorVersion() +
+ assert cmp >= 0 : "[updCrd=" + unmaskedCoordinatorVersion() +
", updCntr=" + mvccVer.counter() +
", rowCrd=" + rowCrdVer +
", rowCntr=" + rowCntr + ']';
@@ -138,11 +138,13 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
long rowCrdVerMasked = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
+ long crdVer = unmaskedCoordinatorVersion();
+
if (res == null) {
- int cmp = Long.compare(mvccVer.coordinatorVersion(), rowCrdVer);
+ int cmp = Long.compare(crdVer, rowCrdVer);
if (cmp == 0)
- cmp = Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCounter(pageAddr, idx));
+ cmp = Long.compare(mvccVer.counter(), rowIo.getMvccCounter(pageAddr, idx));
if (cmp == 0)
res = UpdateResult.VERSION_FOUND;
@@ -152,7 +154,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
}
// Suppose transactions on previous coordinator versions are done.
- if (checkActive && mvccVer.coordinatorVersion() == rowCrdVer) {
+ if (checkActive && crdVer == rowCrdVer) {
long rowMvccCntr = rowIo.getMvccCounter(pageAddr, idx);
if (mvccVer.activeTransactions().contains(rowMvccCntr)) {
@@ -166,11 +168,11 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
}
if (!txActive) {
- assert Long.compare(mvccVer.coordinatorVersion(), rowCrdVer) >= 0;
+ assert Long.compare(crdVer, rowCrdVer) >= 0;
int cmp;
- if (mvccVer.coordinatorVersion() == rowCrdVer)
+ if (crdVer == rowCrdVer)
cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx));
else
cmp = 1;
@@ -183,7 +185,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
assert row.link() != 0 && row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row;
// Should not be possible to cleanup active tx.
- assert rowCrdVer != mvccVer.coordinatorVersion()
+ assert rowCrdVer != crdVer
|| !mvccVer.activeTransactions().contains(row.mvccCounter());
if (cleanupRows == null)
@@ -199,6 +201,13 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
return true;
}
+ /**
+ * @return Coordinator version without flags.
+ */
+ protected long unmaskedCoordinatorVersion() {
+ return mvccVer.coordinatorVersion();
+ }
+
/** {@inheritDoc} */
@Override public long mvccCoordinatorVersion() {
return mvccVer.coordinatorVersion();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 6ced2f9..30145ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -134,11 +134,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** Version which is less then any version generated on coordinator. */
private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER =
- new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L) {
- @Override public boolean initialLoad() {
- return true;
- }
- };
+ new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L);
/** Cache receiver. */
private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index d45afe7..1abc116 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -1786,13 +1786,33 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64));
- final int KEYS = 100;
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < 100; k++)
+ cache.remove(k);
- checkValues(new HashMap<>(), cache);
+ tx.commit();
+ }
+
+ Map<Object, Object> expVals = new HashMap<>();
try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
- for (int k = 0; k < KEYS; k++)
- cache.remove(k);
+ for (int k = 100; k < 200; k++) {
+ cache.put(k, k);
+
+ expVals.put(k, k);
+ }
+
+ tx.commit();
+ }
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 100; k < 200; k++) {
+ if (k % 2 == 0) {
+ cache.remove(k);
+
+ expVals.remove(k);
+ }
+ }
tx.commit();
}
@@ -1800,6 +1820,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
startGrid(1);
awaitPartitionMapExchange();
+
+ checkValues(expVals, jcache(1));
+
+ stopGrid(0);
+
+ checkValues(expVals, jcache(1));
}
/**