You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/11 08:24:49 UTC
[1/7] ignite git commit: IGNITE-5733: Fixed failures in
JerryRestProcessorAbstractSelfTest.
Repository: ignite
Updated Branches:
refs/heads/ignite-3478 61b46c47f -> 970cf47a5
IGNITE-5733: Fixed failures in JerryRestProcessorAbstractSelfTest.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f2a3e17
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f2a3e17
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f2a3e17
Branch: refs/heads/ignite-3478
Commit: 5f2a3e178d8ae992607b6418699a2e1e17d23592
Parents: bf99fd3
Author: Alexander Belyak <al...@xored.com>
Authored: Tue Oct 10 15:09:30 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Oct 10 15:09:30 2017 +0300
----------------------------------------------------------------------
.../processors/rest/JettyRestProcessorAbstractSelfTest.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f2a3e17/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index b2725b8..13613ef 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -711,12 +711,12 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
* @throws Exception If failed.
*/
public void testDeactivateActivate() throws Exception {
-
assertClusterState(true);
changeClusterState(false);
-
changeClusterState(true);
+
+ initCache();
}
/**
[3/7] ignite git commit: Fixed "IGNITE-6234 Initialize schemaIds to
empty set if schemas field is null during the deserialization".
Posted by sb...@apache.org.
Fixed "IGNITE-6234 Initialize schemaIds to empty set if schemas field is null during the deserialization".
Signed-off-by: nikolay_tikhonov <nt...@gridgain.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4385f12f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4385f12f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4385f12f
Branch: refs/heads/ignite-3478
Commit: 4385f12f8f35ed2afb6ad1f25ed99a642fb326e8
Parents: 17680ae
Author: Krzysztof Chmielewski <kr...@gmail.com>
Authored: Tue Oct 10 17:50:59 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Oct 10 17:50:59 2017 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/binary/BinaryMetadata.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4385f12f/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadata.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadata.java
index ead00b7..4c3448f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadata.java
@@ -186,6 +186,9 @@ public class BinaryMetadata implements Externalizable {
* @return {@code true} if <b>BinaryMetadata</b> instance has schema with ID specified, {@code false} otherwise.
*/
public boolean hasSchema(int schemaId) {
+ if (schemaIds == null)
+ return false;
+
return schemaIds.contains(schemaId);
}
@@ -304,8 +307,11 @@ public class BinaryMetadata implements Externalizable {
int schemasSize = in.readInt();
- if (schemasSize == -1)
+ if (schemasSize == -1) {
schemas = null;
+
+ schemaIds = Collections.emptySet();
+ }
else {
schemas = new ArrayList<>();
[7/7] ignite git commit: ignite-3478 Support for removes
Posted by sb...@apache.org.
ignite-3478 Support for removes
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/970cf47a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/970cf47a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/970cf47a
Branch: refs/heads/ignite-3478
Commit: 970cf47a51dc6e754677b00e85e60effc48083ba
Parents: 69fd367
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 11 11:24:32 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Oct 11 11:24:32 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheEntryInfo.java | 5 -
.../processors/cache/GridCacheMapEntry.java | 36 +-
.../cache/IgniteCacheOffheapManager.java | 82 +++-
.../cache/IgniteCacheOffheapManagerImpl.java | 401 ++++++++++-------
.../distributed/dht/GridDhtCacheEntry.java | 5 +-
.../dht/preloader/GridDhtPartitionSupplier.java | 24 +-
.../GridDhtPartitionSupplyMessage.java | 4 +-
.../GridDhtPartitionsExchangeFuture.java | 2 +-
.../cache/mvcc/CacheCoordinatorsProcessor.java | 41 +-
.../cache/mvcc/MvccCoordinatorVersion.java | 5 -
.../mvcc/MvccCoordinatorVersionResponse.java | 5 -
.../cache/persistence/CacheDataRow.java | 5 +
.../cache/persistence/CacheDataRowAdapter.java | 5 +
.../cache/persistence/CacheSearchRow.java | 6 +
.../persistence/GridCacheOffheapManager.java | 43 +-
.../processors/cache/persistence/RowStore.java | 2 +
.../persistence/freelist/FreeListImpl.java | 11 +-
.../cache/persistence/tree/io/DataPageIO.java | 22 +-
.../transactions/IgniteTxLocalAdapter.java | 26 +-
.../cache/tree/AbstractDataInnerIO.java | 10 +-
.../cache/tree/AbstractDataLeafIO.java | 10 +-
.../cache/tree/CacheDataRowStore.java | 18 +-
.../processors/cache/tree/CacheDataTree.java | 8 +-
.../internal/processors/cache/tree/DataRow.java | 7 +
.../processors/cache/tree/MvccDataRow.java | 32 +-
.../cache/tree/MvccKeyMaxVersionBound.java | 9 +-
.../processors/cache/tree/MvccRemoveRow.java | 64 +++
.../processors/cache/tree/MvccUpdateRow.java | 88 +++-
.../cache/tree/MvccVersionBasedSearchRow.java | 16 +-
.../datastreamer/DataStreamerImpl.java | 6 +-
.../cache/mvcc/CacheMvccClusterRestartTest.java | 173 ++++++++
.../cache/mvcc/CacheMvccTransactionsTest.java | 434 +++++++++++++++++--
.../database/FreeListImplSelfTest.java | 5 +
.../processors/query/h2/opt/GridH2Row.java | 5 +
34 files changed, 1313 insertions(+), 302 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
index 8a5f0df..e09d33c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
@@ -101,11 +101,6 @@ public class GridCacheEntryInfo implements Message, MvccCoordinatorVersion {
return 0;
}
- /** {@inheritDoc} */
- @Override public boolean initialLoad() {
- return true;
- }
-
/**
* @return Cache ID.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 8432a77..a1535e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1012,7 +1012,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (cctx.mvccEnabled() && !((IgniteCacheOffheapManagerImpl)cctx.offheap()).IGNITE_FAKE_MVCC_STORAGE) {
assert mvccVer != null;
- mvccWaitTxs = cctx.offheap().mvccUpdate(this, val, newVer, mvccVer);
+ mvccWaitTxs = cctx.offheap().mvccUpdate(tx.local(),
+ this,
+ val,
+ newVer,
+ mvccVer);
}
else
storeValue(val, expireTime, newVer, null);
@@ -1141,6 +1145,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean marked = false;
+ GridLongList mvccWaitTxs = null;
+
synchronized (this) {
checkObsolete();
@@ -1181,7 +1187,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
- removeValue();
+ if (cctx.mvccEnabled() && !((IgniteCacheOffheapManagerImpl)cctx.offheap()).IGNITE_FAKE_MVCC_STORAGE) {
+ assert mvccVer != null;
+
+ mvccWaitTxs = cctx.offheap().mvccRemove(tx.local(), this, mvccVer);
+ }
+ else
+ removeValue();
update(null, 0, 0, newVer, true);
@@ -1292,7 +1304,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
cctx.config().getInterceptor().onAfterRemove(entry0);
if (valid)
- return new GridCacheUpdateTxResult(true, updateCntr0, null);
+ return new GridCacheUpdateTxResult(true, updateCntr0, mvccWaitTxs);
else
return new GridCacheUpdateTxResult(false);
}
@@ -2569,6 +2581,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean walEnabled = !cctx.isNear() && cctx.shared().wal() != null;
+ // TODO IGNITE-3478: move checks in special initialValue method.
if (cctx.shared().database().persistenceEnabled()) {
unswap(false);
@@ -2591,14 +2604,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
- if (val != null) {
- if (cctx.mvccEnabled())
- cctx.offheap().mvccUpdate(this, val, ver, mvccVer);
- else
- storeValue(val, expTime, ver, null);
+ if (cctx.mvccEnabled()) {
+ cctx.offheap().mvccInitialValue(this, val, ver, mvccVer);
+
+ if (val != null)
+ update(val, expTime, ttl, ver, true);
}
+ else {
+ if (val != null) {
+ storeValue(val, expTime, ver, null);
- update(val, expTime, ttl, ver, true);
+ update(val, expTime, ttl, ver, true);
+ }
+ }
boolean skipQryNtf = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 8967ce8..2c070fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -191,15 +191,54 @@ public interface IgniteCacheOffheapManager {
/**
* @param entry Entry.
* @param val Value.
+ * @param ver Version.
+ * @param mvccVer Mvcc update version.
+ * @return {@code True} if value was inserted.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean mvccInitialValue(
+ GridCacheMapEntry entry,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer
+ ) throws IgniteCheckedException;
+
+ /**
+ * @param primary {@code True} if on primary node.
+ * @param entry Entry.
+ * @param val Value.
* @param ver Cache version.
* @param mvccVer Mvcc update version.
* @return Transactions to wait for before finishing current transaction.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public GridLongList mvccUpdate(GridCacheMapEntry entry,
+ @Nullable public GridLongList mvccUpdate(
+ boolean primary,
+ GridCacheMapEntry entry,
CacheObject val,
GridCacheVersion ver,
- MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
+ MvccCoordinatorVersion mvccVer
+ ) throws IgniteCheckedException;
+
+ /**
+ * @param primary {@code True} if on primary node.
+ * @param entry Entry.
+ * @param mvccVer Mvcc update version.
+ * @return Transactions to wait for before finishing current transaction.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public GridLongList mvccRemove(
+ boolean primary,
+ GridCacheMapEntry entry,
+ MvccCoordinatorVersion mvccVer
+ ) throws IgniteCheckedException;
+
+ /**
+ * @param entry Entry.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void mvccRemoveAll(GridCacheMapEntry entry)
+ throws IgniteCheckedException;
/**
* @param cctx Cache context.
@@ -498,11 +537,29 @@ public interface IgniteCacheOffheapManager {
* @param val Value.
* @param ver Version.
* @param mvccVer Mvcc version.
+ * @return {@code True} if new value was inserted.
+ * @throws IgniteCheckedException If failed.
+ */
+ boolean mvccInitialValue(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param primary {@code True} if update is executed on primary node.
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param mvccVer Mvcc version.
* @return List of transactions to wait for.
* @throws IgniteCheckedException If failed.
*/
@Nullable GridLongList mvccUpdate(
GridCacheContext cctx,
+ boolean primary,
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
@@ -510,6 +567,27 @@ public interface IgniteCacheOffheapManager {
/**
* @param cctx Cache context.
+ * @param primary {@code True} if update is executed on primary node.
+ * @param key Key.
+ * @param mvccVer Mvcc version.
+ * @return List of transactions to wait for.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable GridLongList mvccRemove(
+ GridCacheContext cctx,
+ boolean primary,
+ KeyCacheObject key,
+ MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @throws IgniteCheckedException If failed.
+ */
+ void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
* @param key Key.
* @throws IgniteCheckedException If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 25f36b2..2bff203 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
@@ -54,9 +55,9 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.cache.tree.DataRow;
-import org.apache.ignite.internal.processors.cache.tree.MvccDataRow;
import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound;
import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound;
+import org.apache.ignite.internal.processors.cache.tree.MvccRemoveRow;
import org.apache.ignite.internal.processors.cache.tree.MvccSearchRow;
import org.apache.ignite.internal.processors.cache.tree.MvccUpdateRow;
import org.apache.ignite.internal.processors.cache.tree.MvccVersionBasedSearchRow;
@@ -85,6 +86,8 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
/**
*
@@ -380,11 +383,28 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public GridLongList mvccUpdate(GridCacheMapEntry entry,
+ @Override public boolean mvccInitialValue(
+ GridCacheMapEntry entry,
+ CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
+ return dataStore(entry.localPartition()).mvccInitialValue(
+ entry.context(),
+ entry.key(),
+ val,
+ ver,
+ mvccVer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridLongList mvccUpdate(
+ boolean primary,
+ GridCacheMapEntry entry,
CacheObject val,
GridCacheVersion ver,
MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
return dataStore(entry.localPartition()).mvccUpdate(entry.context(),
+ primary,
entry.key(),
val,
ver,
@@ -392,6 +412,23 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
+ @Override public GridLongList mvccRemove(
+ boolean primary,
+ GridCacheMapEntry entry,
+ MvccCoordinatorVersion mvccVer
+ ) throws IgniteCheckedException {
+ return dataStore(entry.localPartition()).mvccRemove(entry.context(),
+ primary,
+ entry.key(),
+ mvccVer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void mvccRemoveAll(GridCacheMapEntry entry) throws IgniteCheckedException {
+ dataStore(entry.localPartition()).mvccRemoveAll(entry.context(), entry.key());
+ }
+
+ /** {@inheritDoc} */
@Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition part)
throws IgniteCheckedException {
dataStore(part).updateIndexes(cctx, key);
@@ -1340,18 +1377,74 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
return dataRow;
}
- private int compare(CacheDataRow row, long crdVer, long mvccCntr) {
- int cmp = Long.compare(row.mvccCoordinatorVersion(), crdVer);
+ /** {@inheritDoc} */
+ @Override public boolean mvccInitialValue(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer)
+ throws IgniteCheckedException
+ {
+ assert mvccVer != null;
+
+ if (!busyLock.enterBusy())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+ try {
+ assert val != null || CacheCoordinatorsProcessor.versionForRemovedValue(mvccVer.coordinatorVersion());
+
+ int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+
+ CacheObjectContext coCtx = cctx.cacheObjectContext();
+
+ // Make sure value bytes initialized.
+ key.valueBytes(coCtx);
+
+ MvccUpdateRow updateRow;
+
+ if (val != null) {
+ val.valueBytes(coCtx);
+
+ updateRow = new MvccUpdateRow(
+ key,
+ val,
+ ver,
+ mvccVer,
+ partId,
+ cacheId);
+ }
+ else {
+ updateRow = new MvccRemoveRow(
+ key,
+ mvccVer,
+ partId,
+ cacheId);
+ }
+
+ if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+ updateRow.cacheId(cctx.cacheId());
+
+ rowStore.addRow(updateRow);
- if (cmp != 0)
- return cmp;
+ boolean old = dataTree.putx(updateRow);
- return Long.compare(row.mvccCounter(), mvccCntr);
+ assert !old;
+
+ if (val != null)
+ incrementSize(cctx.cacheId());
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+
+ return true;
}
/** {@inheritDoc} */
@Override public GridLongList mvccUpdate(
GridCacheContext cctx,
+ boolean primary,
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
@@ -1370,139 +1463,160 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
key.valueBytes(coCtx);
val.valueBytes(coCtx);
- if (true) {
- MvccUpdateRow updateRow = new MvccUpdateRow(
- key,
- val,
- ver,
- mvccVer,
- partId,
- cacheId);
+ MvccUpdateRow updateRow = new MvccUpdateRow(
+ key,
+ val,
+ ver,
+ mvccVer,
+ partId,
+ cacheId);
- rowStore.addRow(updateRow);
+ if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+ updateRow.cacheId(cctx.cacheId());
- assert updateRow.link() != 0 : updateRow;
+ dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
- if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
- updateRow.cacheId(cctx.cacheId());
+ MvccUpdateRow.UpdateResult res = updateRow.updateResult();
- GridLongList waitTxs = null;
+ if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
+ assert !primary : updateRow;
+ }
+ else {
+ rowStore.addRow(updateRow);
- if (mvccVer.initialLoad()) {
- boolean old = dataTree.putx(updateRow);
+ boolean old = dataTree.putx(updateRow);
- assert !old;
+ assert !old;
+ if (res == MvccUpdateRow.UpdateResult.PREV_NULL)
incrementSize(cctx.cacheId());
- }
- else {
- dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
+ }
- boolean old = dataTree.putx(updateRow);
+ cleanup(updateRow.cleanupRows(), false);
+
+ return updateRow.activeTransactions();
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridLongList mvccRemove(GridCacheContext cctx,
+ boolean primary,
+ KeyCacheObject key,
+ MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
+ assert mvccVer != null;
- assert !old;
+ if (!busyLock.enterBusy())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
- if (!updateRow.previousNotNull())
- incrementSize(cctx.cacheId());
+ try {
+ int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
- waitTxs = updateRow.activeTransactions();
+ CacheObjectContext coCtx = cctx.cacheObjectContext();
- List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
+ // Make sure value bytes initialized.
+ key.valueBytes(coCtx);
- if (cleanupRows != null) {
- for (int i = 0; i < cleanupRows.size(); i++) {
- CacheSearchRow oldRow = cleanupRows.get(i);
+ MvccRemoveRow updateRow = new MvccRemoveRow(
+ key,
+ mvccVer,
+ partId,
+ cacheId);
- assert oldRow.link() != 0L : oldRow;
+ if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+ updateRow.cacheId(cctx.cacheId());
- boolean rmvd = dataTree.removex(oldRow);
+ dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
- assert rmvd;
+ MvccUpdateRow.UpdateResult res = updateRow.updateResult();
- rowStore.removeRow(oldRow.link());
- }
- }
- }
+ if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
+ assert !primary : updateRow;
- return waitTxs;
+ cleanup(updateRow.cleanupRows(), false);
}
else {
- MvccDataRow dataRow = new MvccDataRow(
- key,
- val,
- ver,
- partId,
- cacheId,
- mvccVer.coordinatorVersion(),
- mvccVer.counter());
+ if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL)
+ decrementSize(cacheId);
- rowStore.addRow(dataRow);
+ CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), true);
- assert dataRow.link() != 0 : dataRow;
+ if (rmvRow == null)
+ rowStore.addRow(updateRow);
+ else
+ updateRow.link(rmvRow.link());
- if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID)
- dataRow.cacheId(cctx.cacheId());
+ assert updateRow.link() != 0L;
- boolean old = dataTree.putx(dataRow);
+ boolean old = dataTree.putx(updateRow);
assert !old;
+ }
- GridLongList waitTxs = null;
+ return updateRow.activeTransactions();
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
- if (!mvccVer.initialLoad()) {
- MvccLongList activeTxs = mvccVer.activeTransactions();
+ /** {@inheritDoc} */
+ @Override public void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
+ key.valueBytes(cctx.cacheObjectContext());
- // TODO IGNITE-3484: need special method.
- GridCursor<CacheDataRow> cur = dataTree.find(
- new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1),
- new MvccSearchRow(cacheId, key, 1, 1));
+ int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
- boolean first = true;
+ GridCursor<CacheDataRow> cur = dataTree.find(
+ new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
+ new MvccSearchRow(cacheId, key, 1, 1),
+ CacheDataRowAdapter.RowData.KEY_ONLY);
- boolean activeTx = false;
+ while (cur.next()) {
+ CacheDataRow row = cur.get();
- while (cur.next()) {
- CacheDataRow oldVal = cur.get();
+ assert row.link() != 0;
- assert oldVal.link() != 0 : oldVal;
+ boolean rmvd = dataTree.removex(row);
- if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() &&
- activeTxs.contains(oldVal.mvccCounter())) {
- if (waitTxs == null)
- waitTxs = new GridLongList();
+ assert rmvd;
- assert oldVal.mvccCounter() != mvccVer.counter();
+ rowStore.removeRow(row.link());
+ }
+ }
- waitTxs.add(oldVal.mvccCounter());
+ /**
+ * @param cleanupRows Rows to cleanup.
+ * @param findRmv {@code True} if need keep removed row entry.
+ * @return Removed row entry if found.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable private CacheSearchRow cleanup(@Nullable List<CacheSearchRow> cleanupRows, boolean findRmv)
+ throws IgniteCheckedException {
+ CacheSearchRow rmvRow = null;
- activeTx = true;
- }
+ if (cleanupRows != null) {
+ for (int i = 0; i < cleanupRows.size(); i++) {
+ CacheSearchRow oldRow = cleanupRows.get(i);
- if (!activeTx) {
- // Should not delete oldest version which is less than cleanup version.
- int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
+ assert oldRow.link() != 0L : oldRow;
- if (cmp <= 0) {
- if (first)
- first = false;
- else {
- boolean rmvd = dataTree.removex(oldVal);
+ boolean rmvd = dataTree.removex(oldRow);
- assert rmvd;
+ assert rmvd;
- rowStore.removeRow(oldVal.link());
- }
- }
- }
- }
+ if (findRmv &&
+ rmvRow == null &&
+ versionForRemovedValue(oldRow.mvccCoordinatorVersion())) {
+ rmvRow = oldRow;
}
-
- return waitTxs;
+ else
+ rowStore.removeRow(oldRow.link());
}
}
- finally {
- busyLock.leaveBusy();
- }
+
+ return rmvRow;
}
/** {@inheritDoc} */
@@ -1709,26 +1823,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
CacheDataRow row;
if (grp.mvccEnabled()) {
- if (true) {
- MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId, key);
-
- dataTree.iterate(
- searchRow,
- new MvccKeyMinVersionBound(cacheId, key),
- searchRow // Use the same instance as closure to do not create extra object.
- );
+ MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId, key);
- row = searchRow.row();
- }
- else {
- GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
- new MvccSearchRow(cacheId, key, 1, 1));
+ dataTree.iterate(
+ searchRow,
+ new MvccKeyMinVersionBound(cacheId, key),
+ searchRow // Use the same instance as closure to do not create extra object.
+ );
- if (cur.next())
- row = cur.get();
- else
- row = null;
- }
+ row = searchRow.row();
}
else
row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY);
@@ -1781,55 +1884,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
- if (true) {
- MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver);
-
- dataTree.iterate(
- lower,
- new MvccKeyMinVersionBound(cacheId, key),
- lower // Use the same instance as closure to do not create extra object.
- );
-
- CacheDataRow row = lower.row();
-
- afterRowFound(row, key);
-
- return row;
- }
- else {
- GridCursor<CacheDataRow> cur = dataTree.find(
- new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()),
- new MvccSearchRow(cacheId, key, 1, 1));
-
- CacheDataRow row = null;
-
- MvccLongList txs = ver.activeTransactions();
-
- while (cur.next()) {
- CacheDataRow row0 = cur.get();
+ MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver);
- assert row0.mvccCoordinatorVersion() > 0 : row0;
+ dataTree.iterate(
+ lower,
+ new MvccKeyMinVersionBound(cacheId, key),
+ lower // Use the same instance as closure to do not create extra object.
+ );
- boolean visible;
-
- if (txs != null) {
- visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion()
- || !txs.contains(row0.mvccCounter());
- }
- else
- visible = true;
-
- if (visible) {
- row = row0;
-
- break;
- }
- }
+ CacheDataRow row = lower.row();
- assert row == null || key.equals(row.key());
+ afterRowFound(row, key);
- return row;
- }
+ return row;
}
/**
@@ -1868,18 +1935,30 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
while (cur.next()) {
CacheDataRow row = cur.get();
- if (row.mvccCoordinatorVersion() > ver.coordinatorVersion()
- || row.mvccCounter() > ver.counter())
+ long rowCrdVerMasked = row.mvccCoordinatorVersion();
+
+ long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
+
+ if (rowCrdVer > ver.coordinatorVersion())
+ continue;
+
+ if (rowCrdVer == ver.coordinatorVersion() && row.mvccCounter() > ver.counter())
continue;
MvccLongList txs = ver.activeTransactions();
- if (txs != null && row.mvccCoordinatorVersion() == ver.coordinatorVersion() && txs.contains(row.mvccCounter()))
+ if (txs != null && rowCrdVer == ver.coordinatorVersion() && txs.contains(row.mvccCounter()))
continue;
if (curKey != null && row.key().equals(curKey))
continue;
+ if (CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked)) {
+ curKey = row.key();
+
+ continue;
+ }
+
curRow = row;
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 77cc642..a3309a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -585,7 +585,10 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
']');
}
- removeValue();
+ if (cctx.mvccEnabled())
+ cctx.offheap().mvccRemoveAll(this);
+ else
+ removeValue();
// Give to GC.
update(null, 0L, 0L, ver, true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 0905917..357fef8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -31,9 +31,9 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.T3;
@@ -43,6 +43,7 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.IgniteSpiException;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
/**
* Thread pool for supplying partitions to demanding nodes.
@@ -375,13 +376,24 @@ class GridDhtPartitionSupplier {
GridCacheEntryInfo info = grp.mvccEnabled() ?
new GridCacheMvccEntryInfo() : new GridCacheEntryInfo();
+
info.key(row.key());
- info.expireTime(row.expireTime());
- info.version(row.version());
- info.value(row.value());
info.cacheId(row.cacheId());
- info.mvccCoordinatorVersion(row.mvccCoordinatorVersion());
- info.mvccCounter(row.mvccCounter());
+
+ boolean rmvd = false;
+
+ if (grp.mvccEnabled()) {
+ info.mvccCoordinatorVersion(row.mvccCoordinatorVersion());
+ info.mvccCounter(row.mvccCounter());
+
+ rmvd = versionForRemovedValue(row.mvccCoordinatorVersion());
+ }
+
+ if (!rmvd) {
+ info.value(row.value());
+ info.version(row.version());
+ info.expireTime(row.expireTime());
+ }
if (preloadPred == null || preloadPred.apply(info))
s.addEntry0(part, info, grp.shared(), grp.cacheObjectContext());
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 90d11f5..6675f8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -42,6 +42,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
/**
* Partition supply message.
*/
@@ -217,7 +219,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple
void addEntry0(int p, GridCacheEntryInfo info, GridCacheSharedContext ctx, CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
assert info != null;
assert info.key() != null : info;
- assert info.value() != null : info;
+ assert info.value() != null || versionForRemovedValue(info.coordinatorVersion()): info;
// Need to call this method to initialize info properly.
marshalInfo(info, ctx, cacheObjCtx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 830d50b..88095ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -560,7 +560,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
MvccCoordinator mvccCrd = firstEvtDiscoCache.mvccCoordinator();
boolean mvccCrdChange = mvccCrd != null &&
- initialVersion().equals(mvccCrd.topologyVersion());
+ (initialVersion().equals(mvccCrd.topologyVersion()) || activateCluster());
cctx.kernalContext().coordinators().currentCoordinator(mvccCrd);
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index b9b8ea1..9f9a7a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -66,6 +66,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
/**
@@ -86,7 +87,13 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
/** */
private static final byte MSG_POLICY = SYSTEM_POOL;
-
+
+ /** */
+ private static final long CRD_VER_MASK = 0x3F_FF_FF_FF_FF_FF_FF_FFL;
+
+ /** */
+ private static final long RMVD_VAL_VER_MASK = 0x80_00_00_00_00_00_00_00L;
+
/** */
private volatile MvccCoordinator curCrd;
@@ -139,6 +146,30 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
super(ctx);
}
+ /**
+ * @param crdVer Coordinator version.
+ * @return Coordinator version with removed value flag.
+ */
+ public static long createVersionForRemovedValue(long crdVer) {
+ return crdVer | RMVD_VAL_VER_MASK;
+ }
+
+ /**
+ * @param crdVer Coordinator version with flags.
+ * @return {@code True} if removed value flag is set.
+ */
+ public static boolean versionForRemovedValue(long crdVer) {
+ return (crdVer & RMVD_VAL_VER_MASK) != 0;
+ }
+
+ /**
+ * @param crdVer Coordinator version with flags.
+ * @return Coordinator version.
+ */
+ public static long unmaskCoordinatorVersion(long crdVer) {
+ return crdVer & CRD_VER_MASK;
+ }
+
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
statCntrs = new StatCounter[7];
@@ -199,7 +230,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
* @param topVer Topology version.
*/
public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer) {
- if (evtType == EVT_NODE_METRICS_UPDATED)
+ if (evtType == EVT_NODE_METRICS_UPDATED || evtType == EVT_DISCOVERY_CUSTOM_EVT)
return;
MvccCoordinator crd;
@@ -778,7 +809,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId) {
assert crdVer != 0;
- return activeQueries.assignQueryCounter(qryNodeId, futId);
+ MvccCoordinatorVersionResponse res = activeQueries.assignQueryCounter(qryNodeId, futId);
+
+ return res;
// MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse();
//
@@ -989,7 +1022,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() +
", topVer=" + topVer + ']');
- crdVer = topVer.topologyVersion();
+ crdVer = topVer.topologyVersion() + ctx.discovery().gridStartTime();
prevCrdQueries.init(activeQueries, discoCache, ctx.discovery());
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
index a0fd5ee..d80e43c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
@@ -42,9 +42,4 @@ public interface MvccCoordinatorVersion extends Message {
* @return Counter.
*/
public long counter();
-
- /**
- *
- */
- public boolean initialLoad();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
index 20d23ed..c037226 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
@@ -159,11 +159,6 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M
}
/** {@inheritDoc} */
- @Override public boolean initialLoad() {
- return false;
- }
-
- /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
index 57aeaef..b76826f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
@@ -54,4 +54,9 @@ public interface CacheDataRow extends CacheSearchRow {
* @param key Key.
*/
public void key(KeyCacheObject key);
+
+ /**
+ * @return {@code True} if this is row for cache remove operation (used only with mvcc).
+ */
+ public boolean removed();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index 925431f..d0f2dab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -582,6 +582,11 @@ public class CacheDataRowAdapter implements CacheDataRow {
return 0;
}
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return false;
+ }
+
/**
*
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
index 5bf53d8..efdc08f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
@@ -43,7 +43,13 @@ public interface CacheSearchRow {
*/
public int cacheId();
+ /**
+ * @return Mvcc coordinator version.
+ */
public long mvccCoordinatorVersion();
+ /**
+ * @return Mvcc counter.
+ */
public long mvccCounter();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index cb01b6c..e5a9736 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -837,6 +837,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
@Override public long mvccCoordinatorVersion() {
return 0; // TODO IGNITE-3478.
}
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return false; // TODO IGNITE-3478.
+ }
}
/**
@@ -1251,14 +1256,48 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public GridLongList mvccUpdate(GridCacheContext cctx,
+ @Override public boolean mvccInitialValue(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer)
+ throws IgniteCheckedException
+ {
+ CacheDataStore delegate = init0(false);
+
+ return delegate.mvccInitialValue(cctx, key, val, ver, mvccVer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridLongList mvccUpdate(
+ GridCacheContext cctx,
+ boolean primary,
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
- return delegate.mvccUpdate(cctx, key, val, ver, mvccVer);
+ return delegate.mvccUpdate(cctx, primary, key, val, ver, mvccVer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridLongList mvccRemove(
+ GridCacheContext cctx,
+ boolean primary,
+ KeyCacheObject key,
+ MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(false);
+
+ return delegate.mvccRemove(cctx, primary, key, mvccVer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(false);
+
+ delegate.mvccRemoveAll(cctx, key);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
index 9cc5c62..41d2c4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
@@ -82,6 +82,8 @@ public class RowStore {
try {
freeList.insertDataRow(row);
+
+ assert row.link() != 0L;
}
finally {
ctx.database().checkpointReadUnlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
index 3eb62ae..9bd27b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
@@ -590,12 +590,19 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
*/
public static int getRowSize(CacheDataRow row, boolean withCacheId) throws IgniteCheckedException {
KeyCacheObject key = row.key();
- CacheObject val = row.value();
int keyLen = key.valueBytesLength(null);
+
+ int len = keyLen + (withCacheId ? 4 : 0);
+
+ if (row.removed())
+ return len;
+
+ CacheObject val = row.value();
+
int valLen = val.valueBytesLength(null);
- return keyLen + valLen + CacheVersionIO.size(row.version(), false) + 8 + (withCacheId ? 4 : 0);
+ return len + valLen + CacheVersionIO.size(row.version(), false) + 8;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
index 628ff38..da012e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
@@ -1040,13 +1040,19 @@ public class DataPageIO extends PageIO {
final int payloadSize
) throws IgniteCheckedException {
final int keySize = row.key().valueBytesLength(null);
- final int valSize = row.value().valueBytesLength(null);
+
+ boolean rmvd = row.removed();
+
+ final int valSize = rmvd ? 0 : row.value().valueBytesLength(null);
int written = writeFragment(row, buf, rowOff, payloadSize, EntryPart.CACHE_ID, keySize, valSize);
written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.KEY, keySize, valSize);
- written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.EXPIRE_TIME, keySize, valSize);
- written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VALUE, keySize, valSize);
- written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VERSION, keySize, valSize);
+
+ if (!rmvd) {
+ written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.EXPIRE_TIME, keySize, valSize);
+ written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VALUE, keySize, valSize);
+ written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VERSION, keySize, valSize);
+ }
assert written == payloadSize;
}
@@ -1414,9 +1420,15 @@ public class DataPageIO extends PageIO {
}
addr += row.key().putValue(addr);
+
+ if (row.removed())
+ return;
}
- else
+ else {
+ assert !row.removed() : row;
+
addr += (2 + cacheIdSize + row.key().valueBytesLength(null));
+ }
addr += row.value().putValue(addr);
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 92e6785..d8f911c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -707,12 +707,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
GridLongList waitTxs = updRes.mvccWaitTransactions();
- if (waitTxs != null) {
- if (this.mvccWaitTxs == null)
- this.mvccWaitTxs = waitTxs;
- else
- this.mvccWaitTxs.addAll(waitTxs);
- }
+ updateWaitTxs(waitTxs);
}
if (nearCached != null && updRes.success()) {
@@ -762,9 +757,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
null,
mvccInfo != null ? mvccInfo.version() : null);
- if (updRes.success())
+ if (updRes.success()) {
txEntry.updateCounter(updRes.updatePartitionCounter());
+ GridLongList waitTxs = updRes.mvccWaitTransactions();
+
+ updateWaitTxs(waitTxs);
+ }
+
if (nearCached != null && updRes.success()) {
nearCached.innerRemove(
null,
@@ -924,6 +924,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
/**
+ * @param waitTxs Tx ids to wait for.
+ */
+ private void updateWaitTxs(@Nullable GridLongList waitTxs) {
+ if (waitTxs != null) {
+ if (this.mvccWaitTxs == null)
+ this.mvccWaitTxs = waitTxs;
+ else
+ this.mvccWaitTxs.addAll(waitTxs);
+ }
+ }
+
+ /**
* Commits transaction to transaction manager. Used for one-phase commit transactions only.
*
* @param commit If {@code true} commits transaction, otherwise rollbacks.
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index a07d012..fc82cbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.tree;
import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -27,6 +26,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInne
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteInClosure;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
/**
*
*/
@@ -59,8 +61,8 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
}
if (storeMvccVersion()) {
- assert row.mvccCoordinatorVersion() > 0 : row;
- assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row;
+ assert unmaskCoordinatorVersion(row.mvccCoordinatorVersion()) > 0 : row;
+ assert row.mvccCounter() != COUNTER_NA : row;
PageUtils.putLong(pageAddr, off, row.mvccCoordinatorVersion());
off += 8;
@@ -123,7 +125,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
long mvcCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
assert mvccTopVer > 0 : mvccTopVer;
- assert mvcCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+ assert mvcCntr != COUNTER_NA;
PageUtils.putLong(dstPageAddr, off, mvccTopVer);
off += 8;
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index ef08bec..c956d22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.tree;
import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -27,6 +26,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeaf
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteInClosure;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
/**
*
*/
@@ -61,8 +63,8 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
long mvccCrdVer = row.mvccCoordinatorVersion();
long mvccUpdateCntr = row.mvccCounter();
- assert mvccCrdVer > 0 : mvccCrdVer;
- assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+ assert unmaskCoordinatorVersion(mvccCrdVer) > 0 : mvccCrdVer;
+ assert mvccUpdateCntr != COUNTER_NA;
PageUtils.putLong(pageAddr, off, mvccCrdVer);
off += 8;
@@ -98,7 +100,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx);
assert mvccUpdateTopVer >=0 : mvccUpdateCntr;
- assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+ assert mvccUpdateCntr != COUNTER_NA;
PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer);
off += 8;
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
index f9e1eb3..85624d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
@@ -25,6 +25,8 @@ import org.apache.ignite.internal.processors.cache.persistence.RowStore;
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
/**
*
*/
@@ -65,17 +67,25 @@ public class CacheDataRowStore extends RowStore {
* @param cacheId Cache ID.
* @param hash Hash code.
* @param link Link.
- * @param mvccTopVer
- * @param mvccCntr
+ * @param rowData Required row data.
+ * @param crdVer Mvcc coordinator version.
+ * @param mvccCntr Mvcc counter.
* @return Search row.
*/
- MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long mvccTopVer, long mvccCntr) {
+ MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long crdVer, long mvccCntr) {
+ if (rowData != CacheDataRowAdapter.RowData.KEY_ONLY && versionForRemovedValue(crdVer)) {
+ if (rowData == CacheDataRowAdapter.RowData.NO_KEY)
+ return MvccDataRow.removedRowNoKey(partId, cacheId, crdVer, mvccCntr);
+ else
+ rowData = CacheDataRowAdapter.RowData.KEY_ONLY;
+ }
+
MvccDataRow dataRow = new MvccDataRow(grp,
hash,
link,
partId,
rowData,
- mvccTopVer,
+ crdVer,
mvccCntr);
initDataRow(dataRow, cacheId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index eaeefee..6309153 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId;
import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
/**
*
@@ -160,7 +161,8 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
long mvccCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
- cmp = Long.compare(row.mvccCoordinatorVersion(), mvccCrdVer);
+ cmp = Long.compare(unmaskCoordinatorVersion(row.mvccCoordinatorVersion()),
+ unmaskCoordinatorVersion(mvccCrdVer));
if (cmp != 0)
return cmp;
@@ -188,10 +190,10 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
CacheDataRowAdapter.RowData.FULL;
if (grp.mvccEnabled()) {
- long mvccTopVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+ long mvccCrdVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
long mvccCntr = rowIo.getMvccCounter(pageAddr, idx);
- return rowStore.mvccRow(cacheId, hash, link, x, mvccTopVer, mvccCntr);
+ return rowStore.mvccRow(cacheId, hash, link, x, mvccCrdVer, mvccCntr);
}
else
return rowStore.dataRow(cacheId, hash, link, x);
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
index 29bbaaf..d1e90d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
@@ -81,6 +81,13 @@ public class DataRow extends CacheDataRowAdapter {
this.cacheId = cacheId;
}
+ /**
+ *
+ */
+ protected DataRow() {
+ super(0);
+ }
+
/** {@inheritDoc} */
@Override public int partition() {
return part;
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
index eb1ee10..916ea93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
@@ -18,12 +18,11 @@
package org.apache.ignite.internal.processors.cache.tree;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
/**
*
*/
@@ -46,7 +45,7 @@ public class MvccDataRow extends DataRow {
MvccDataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData, long crdVer, long mvccCntr) {
super(grp, hash, link, part, rowData);
- assert crdVer > 0 : crdVer;
+ assert unmaskCoordinatorVersion(crdVer) > 0 : crdVer;
assert mvccCntr != CacheCoordinatorsProcessor.COUNTER_NA;
this.crdVer = crdVer;
@@ -54,25 +53,32 @@ public class MvccDataRow extends DataRow {
}
/**
- * @param key Key.
- * @param val Value.
- * @param ver Version.
+ *
+ */
+ private MvccDataRow() {
+ // No-op.
+ }
+
+ /**
* @param part Partition.
* @param cacheId Cache ID.
* @param crdVer Mvcc coordinator version.
* @param mvccCntr Mvcc counter.
+ * @return Row.
*/
- public MvccDataRow(KeyCacheObject key,
- CacheObject val,
- GridCacheVersion ver,
+ static MvccDataRow removedRowNoKey(
int part,
int cacheId,
long crdVer,
long mvccCntr) {
- super(key, val, ver, part, 0L, cacheId);
+ MvccDataRow row = new MvccDataRow();
- this.mvccCntr = mvccCntr;
- this.crdVer = crdVer;
+ row.cacheId = cacheId;
+ row.part = part;
+ row.crdVer = crdVer;
+ row.mvccCntr = mvccCntr;
+
+ return row;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
index aa9422d..007ac09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
@@ -27,6 +27,8 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
/**
*
*/
@@ -55,7 +57,12 @@ public class MvccKeyMaxVersionBound extends SearchRow implements BPlusTree.TreeR
int idx)
throws IgniteCheckedException
{
- resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+ RowLinkIO rowIo = (RowLinkIO)io;
+
+ if (versionForRemovedValue(rowIo.getMvccCoordinatorVersion(pageAddr, idx)))
+ resRow = null;
+ else
+ resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
return false; // Stop search.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
new file mode 100644
index 0000000..af11a9d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.createVersionForRemovedValue;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
+/**
+ *
+ */
+public class MvccRemoveRow extends MvccUpdateRow {
+ /**
+ * @param key Key.
+ * @param mvccVer Mvcc version.
+ * @param part Partition.
+ * @param cacheId Cache ID.
+ */
+ public MvccRemoveRow(
+ KeyCacheObject key,
+ MvccCoordinatorVersion mvccVer,
+ int part,
+ int cacheId) {
+ super(key, null, null, mvccVer, part, cacheId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCoordinatorVersion() {
+ return createVersionForRemovedValue(super.mvccCoordinatorVersion());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long unmaskedCoordinatorVersion() {
+ return unmaskCoordinatorVersion(super.mvccCoordinatorVersion());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccRemoveRow.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
index 79544e6..137ca28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
@@ -32,12 +33,14 @@ import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
/**
*
*/
public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
/** */
- private Boolean hasPrev;
+ private UpdateResult res;
/** */
private boolean canCleanup;
@@ -74,8 +77,8 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
/**
* @return {@code True} if previous value was non-null.
*/
- public boolean previousNotNull() {
- return hasPrev != null && hasPrev;
+ public UpdateResult updateResult() {
+ return res == null ? UpdateResult.PREV_NULL : res;
}
/**
@@ -92,6 +95,30 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
return cleanupRows;
}
+ /**
+ * @param io IO.
+ * @param pageAddr Page address.
+ * @param idx Item index.
+ * @return Always {@code true}.
+ */
+ private boolean assertVersion(RowLinkIO io, long pageAddr, int idx) {
+ long rowCrdVer = unmaskCoordinatorVersion(io.getMvccCoordinatorVersion(pageAddr, idx));
+ long rowCntr = io.getMvccCounter(pageAddr, idx);
+
+ int cmp = Long.compare(unmaskedCoordinatorVersion(), rowCrdVer);
+
+ if (cmp == 0)
+ cmp = Long.compare(mvccVer.counter(), rowCntr);
+
+ // Can be equals if backup rebalanced value updated on primary.
+ assert cmp >= 0 : "[updCrd=" + unmaskedCoordinatorVersion() +
+ ", updCntr=" + mvccVer.counter() +
+ ", rowCrd=" + rowCrdVer +
+ ", rowCntr=" + rowCntr + ']';
+
+ return true;
+ }
+
/** {@inheritDoc} */
@Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree,
BPlusIO<CacheSearchRow> io,
@@ -101,16 +128,33 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
{
RowLinkIO rowIo = (RowLinkIO)io;
- // All previous version should be less then new one.
- assert mvccVer.coordinatorVersion() >= rowIo.getMvccCoordinatorVersion(pageAddr, idx);
- assert mvccVer.coordinatorVersion() > rowIo.getMvccCoordinatorVersion(pageAddr, idx) || mvccVer.counter() > rowIo.getMvccCounter(pageAddr, idx);
+ // Assert version grows.
+ assert assertVersion(rowIo, pageAddr, idx);
boolean checkActive = mvccVer.activeTransactions().size() > 0;
boolean txActive = false;
+ long rowCrdVerMasked = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+ long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
+
+ long crdVer = unmaskedCoordinatorVersion();
+
+ if (res == null) {
+ int cmp = Long.compare(crdVer, rowCrdVer);
+
+ if (cmp == 0)
+ cmp = Long.compare(mvccVer.counter(), rowIo.getMvccCounter(pageAddr, idx));
+
+ if (cmp == 0)
+ res = UpdateResult.VERSION_FOUND;
+ else
+ res = CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked) ?
+ UpdateResult.PREV_NULL : UpdateResult.PREV_NOT_NULL;
+ }
+
// Suppose transactions on previous coordinator versions are done.
- if (checkActive && mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx)) {
+ if (checkActive && crdVer == rowCrdVer) {
long rowMvccCntr = rowIo.getMvccCounter(pageAddr, idx);
if (mvccVer.activeTransactions().contains(rowMvccCntr)) {
@@ -123,15 +167,12 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
}
}
- if (hasPrev == null)
- hasPrev = Boolean.TRUE; // TODO IGNITE-3478 support removes.
-
if (!txActive) {
- assert Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCoordinatorVersion(pageAddr, idx)) >= 0;
+ assert Long.compare(crdVer, rowCrdVer) >= 0;
int cmp;
- if (mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx))
+ if (crdVer == rowCrdVer)
cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx));
else
cmp = 1;
@@ -141,10 +182,10 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
if (canCleanup) {
CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx);
- assert row.link() != 0 && row.mvccCoordinatorVersion() > 0 : row;
+ assert row.link() != 0 && row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row;
// Should not be possible to cleanup active tx.
- assert row.mvccCoordinatorVersion() != mvccVer.coordinatorVersion()
+ assert rowCrdVer != crdVer
|| !mvccVer.activeTransactions().contains(row.mvccCounter());
if (cleanupRows == null)
@@ -160,6 +201,13 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
return true;
}
+ /**
+ * @return Coordinator version without flags.
+ */
+ protected long unmaskedCoordinatorVersion() {
+ return mvccVer.coordinatorVersion();
+ }
+
/** {@inheritDoc} */
@Override public long mvccCoordinatorVersion() {
return mvccVer.coordinatorVersion();
@@ -174,4 +222,16 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
@Override public String toString() {
return S.toString(MvccUpdateRow.class, this, "super", super.toString());
}
+
+ /**
+ *
+ */
+ public enum UpdateResult {
+ /** */
+ VERSION_FOUND,
+ /** */
+ PREV_NULL,
+ /** */
+ PREV_NOT_NULL
+ }
}
[5/7] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-3478
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-3478
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/69fd367d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/69fd367d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/69fd367d
Branch: refs/heads/ignite-3478
Commit: 69fd367dc2eb9e76d89f344063d9788171cb359f
Parents: 61b46c4 0f3546a
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 11 11:18:10 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Oct 11 11:18:10 2017 +0300
----------------------------------------------------------------------
.../JettyRestProcessorAbstractSelfTest.java | 4 ++--
.../ignite/internal/binary/BinaryMetadata.java | 8 ++++++-
.../query/h2/opt/GridH2IndexBase.java | 25 +++++++++-----------
3 files changed, 20 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
[2/7] ignite git commit: Merge remote-tracking branch 'origin/master'
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/17680ae3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/17680ae3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/17680ae3
Branch: refs/heads/ignite-3478
Commit: 17680ae3cd65805f7762623cf40eda736d5a4bfa
Parents: 5f2a3e1 74c4849
Author: devozerov <vo...@gridgain.com>
Authored: Tue Oct 10 15:09:54 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Oct 10 15:09:54 2017 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtLocalPartition.java | 59 --------------------
.../dht/preloader/GridDhtPartitionDemander.java | 9 ---
2 files changed, 68 deletions(-)
----------------------------------------------------------------------
[6/7] ignite git commit: ignite-3478 Support for removes
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
index c829afb..a1d0127 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
@@ -28,6 +28,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
/**
*
*/
@@ -66,16 +69,23 @@ public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.Tr
{
boolean visible = true;
+ RowLinkIO rowIo = (RowLinkIO)io;
+
+ long crdVerMasked = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+
if (ver.activeTransactions().size() > 0) {
- RowLinkIO rowIo = (RowLinkIO)io;
+ long rowCrdVer = unmaskCoordinatorVersion(crdVerMasked);
// TODO IGNITE-3478 sort active transactions?
- if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) == ver.coordinatorVersion())
+ if (rowCrdVer == ver.coordinatorVersion())
visible = !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx));
}
if (visible) {
- resRow = ((CacheDataTree) tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+ if (versionForRemovedValue(crdVerMasked))
+ resRow = null;
+ else
+ resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
return false; // Stop search.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 6ced2f9..30145ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -134,11 +134,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** Version which is less then any version generated on coordinator. */
private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER =
- new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L) {
- @Override public boolean initialLoad() {
- return true;
- }
- };
+ new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L);
/** Cache receiver. */
private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
new file mode 100644
index 0000000..ed7b62d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class CacheMvccClusterRestartTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setConsistentId(gridName);
+
+ cfg.setMvccEnabled(true);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ MemoryConfiguration memCfg = new MemoryConfiguration();
+ memCfg.setPageSize(1024);
+ memCfg.setDefaultMemoryPolicySize(100 * 1024 * 1024);
+
+ cfg.setMemoryConfiguration(memCfg);
+
+ cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration().setWalMode(WALMode.LOG_ONLY));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ GridTestUtils.deleteDbFiles();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ GridTestUtils.deleteDbFiles();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ GridTestUtils.deleteDbFiles();
+
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestart1() throws Exception {
+ restart1(3, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestart2() throws Exception {
+ restart1(1, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestart3() throws Exception {
+ restart1(3, 1);
+ }
+
+ /**
+ * @param srvBefore Number of servers before restart.
+ * @param srvAfter Number of servers after restart.
+ * @throws Exception If failed.
+ */
+ private void restart1(int srvBefore, int srvAfter) throws Exception {
+ Ignite srv0 = startGridsMultiThreaded(srvBefore);
+
+ srv0.active(true);
+
+ IgniteCache<Object, Object> cache = srv0.createCache(cacheConfiguration());
+
+ Set<Integer> keys = new HashSet<>(primaryKeys(cache, 1, 0));
+
+ try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (Integer k : keys)
+ cache.put(k, k);
+
+ tx.commit();
+ }
+
+ stopAllGrids();
+
+ srv0 = startGridsMultiThreaded(srvAfter);
+
+ srv0.active(true);
+
+ cache = srv0.cache(DEFAULT_CACHE_NAME);
+
+ Map<Object, Object> res = cache.getAll(keys);
+
+ assertEquals(keys.size(), res.size());
+
+ for (Integer k : keys)
+ assertEquals(k, cache.get(k));
+
+ try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (Integer k : keys)
+ cache.put(k, k + 1);
+
+ tx.commit();
+ }
+
+ for (Integer k : keys)
+ assertEquals(k + 1, cache.get(k));
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Object, Object> cacheConfiguration() {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setBackups(2);
+
+ return ccfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 115e8a2..8bf9e39 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.processors.cache.mvcc;
+import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -45,6 +47,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
@@ -119,6 +122,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/** */
private String nodeAttr;
+ /** */
+ private static final int PAGE_SIZE = MemoryConfiguration.DFLT_PAGE_SIZE;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -137,6 +143,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
if (nodeAttr != null)
cfg.setUserAttributes(F.asMap(nodeAttr, true));
+ MemoryConfiguration memCfg = new MemoryConfiguration();
+
+ memCfg.setPageSize(PAGE_SIZE);
+
+ cfg.setMemoryConfiguration(memCfg);
+
return cfg;
}
@@ -376,6 +388,147 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testSimplePutRemove() throws Exception {
+ simplePutRemove(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSimplePutRemove_LargeKeys() throws Exception {
+ simplePutRemove(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ * @param largeKeys {@code True} to use large keys (not fitting in single page).
+ */
+ private void simplePutRemove(boolean largeKeys) throws Exception {
+ Ignite node = startGrid(0);
+
+ IgniteTransactions txs = node.transactions();
+
+ final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1));
+
+ final int KEYS = 100;
+
+ checkValues(new HashMap<>(), cache);
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < KEYS; k++)
+ cache.remove(testKey(largeKeys, k));
+
+ tx.commit();
+ }
+
+ checkValues(new HashMap<>(), cache);
+
+ Map<Object, Object> expVals = new HashMap<>();
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < KEYS; k++) {
+ Object key = testKey(largeKeys, k);
+
+ expVals.put(key, k);
+
+ cache.put(key, k);
+ }
+
+ tx.commit();
+ }
+
+ checkValues(expVals, cache);
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < KEYS; k++) {
+ if (k % 2 == 0) {
+ Object key = testKey(largeKeys, k);
+
+ cache.remove(key);
+
+ expVals.remove(key);
+ }
+ }
+
+ tx.commit();
+ }
+
+ checkValues(expVals, cache);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ Object key = testKey(largeKeys, 0);
+
+ for (int i = 0; i < 500; i++) {
+ boolean rmvd;
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ if (rnd.nextBoolean()) {
+ cache.remove(key);
+
+ rmvd = true;
+ }
+ else {
+ cache.put(key, i);
+
+ rmvd = false;
+ }
+
+ tx.commit();
+ }
+
+ if (rmvd) {
+ assertNull(cache.get(key));
+ assertTrue(cache.getAll(F.asSet(key)).isEmpty());
+ }
+ else {
+ assertEquals(i, cache.get(key));
+
+ Map<Object, Object> res = cache.getAll(F.asSet(key));
+
+ assertEquals(i, res.get(key));
+ }
+ }
+ }
+
+ /**
+ * @param largeKeys {@code True} to use large keys (not fitting in single page).
+ * @param idx Index.
+ * @return Key instance.
+ */
+ private static Object testKey(boolean largeKeys, int idx) {
+ if (largeKeys) {
+ int payloadSize = PAGE_SIZE + ThreadLocalRandom.current().nextInt(PAGE_SIZE * 10);
+
+ return new TestKey(idx, payloadSize);
+ }
+ else
+ return idx;
+ }
+
+ /**
+ * @param expVals Expected values.
+ * @param cache Cache.
+ */
+ private void checkValues(Map<Object, Object> expVals, IgniteCache<Object, Object> cache) {
+ for (Map.Entry<Object, Object> e : expVals.entrySet())
+ assertEquals(e.getValue(), cache.get(e.getKey()));
+
+ Map<Object, Object> res = cache.getAll(expVals.keySet());
+
+ assertEquals(expVals, res);
+
+ res = new HashMap<>();
+
+ for (IgniteCache.Entry<Object, Object> e : cache)
+ res.put(e.getKey(), e.getValue());
+
+ assertEquals(expVals, res);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testThreadUpdatesAreVisibleForThisThread() throws Exception {
final Ignite ignite = startGrid(0);
@@ -1181,42 +1334,49 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testAccountsTxGetAll_SingleNode() throws Exception {
- accountsTxGetAll(1, 0, 0, 64, ReadMode.GET_ALL);
+ accountsTxGetAll(1, 0, 0, 64, false, ReadMode.GET_ALL);
}
/**
* @throws Exception If failed.
*/
public void testAccountsTxGetAll_SingleNode_SinglePartition() throws Exception {
- accountsTxGetAll(1, 0, 0, 1, ReadMode.GET_ALL);
+ accountsTxGetAll(1, 0, 0, 1, false, ReadMode.GET_ALL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAccountsTxGetAll_WithRemoves_SingleNode_SinglePartition() throws Exception {
+ accountsTxGetAll(1, 0, 0, 1, true, ReadMode.GET_ALL);
}
/**
* @throws Exception If failed.
*/
public void testAccountsTxGetAll_ClientServer_Backups0() throws Exception {
- accountsTxGetAll(4, 2, 0, 64, ReadMode.GET_ALL);
+ accountsTxGetAll(4, 2, 0, 64, false, ReadMode.GET_ALL);
}
/**
* @throws Exception If failed.
*/
public void testAccountsTxGetAll_ClientServer_Backups1() throws Exception {
- accountsTxGetAll(4, 2, 1, 64, ReadMode.GET_ALL);
+ accountsTxGetAll(4, 2, 1, 64, false, ReadMode.GET_ALL);
}
/**
* @throws Exception If failed.
*/
public void testAccountsTxGetAll_ClientServer_Backups2() throws Exception {
- accountsTxGetAll(4, 2, 2, 64, ReadMode.GET_ALL);
+ accountsTxGetAll(4, 2, 2, 64, false, ReadMode.GET_ALL);
}
/**
* @throws Exception If failed.
*/
public void testAccountsTxScan_SingleNode_SinglePartition() throws Exception {
- accountsTxGetAll(1, 0, 0, 1, ReadMode.SCAN);
+ accountsTxGetAll(1, 0, 0, 1, false, ReadMode.SCAN);
}
/**
@@ -1224,6 +1384,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
* @param clients Number of client nodes.
* @param cacheBackups Number of cache backups.
* @param cacheParts Number of cache partitions.
+ * @param withRmvs If {@code true} then in addition to puts tests also executes removes.
* @param readMode Read mode.
* @throws Exception If failed.
*/
@@ -1232,6 +1393,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
final int clients,
int cacheBackups,
int cacheParts,
+ final boolean withRmvs,
final ReadMode readMode
)
throws Exception
@@ -1261,6 +1423,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
}
};
+ final Set<Integer> rmvdIds = new HashSet<>();
+
GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer =
new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
@Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) {
@@ -1285,8 +1449,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
keys.add(id1);
keys.add(id2);
- Integer cntr1;
- Integer cntr2;
+ Integer cntr1 = null;
+ Integer cntr2 = null;
try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
MvccTestAccount a1;
@@ -1297,28 +1461,74 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
a1 = accounts.get(id1);
a2 = accounts.get(id2);
- assertNotNull(a1);
- assertNotNull(a2);
+ if (!withRmvs) {
+ assertNotNull(a1);
+ assertNotNull(a2);
- cntr1 = a1.updateCnt + 1;
- cntr2 = a2.updateCnt + 1;
+ cntr1 = a1.updateCnt + 1;
+ cntr2 = a2.updateCnt + 1;
- cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1));
- cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2));
+ cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1));
+ cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2));
+ }
+ else {
+ if (a1 != null || a2 != null) {
+ if (a1 != null && a2 != null) {
+ Integer rmvd = null;
+
+ if (rnd.nextInt(10) == 0) {
+ synchronized (rmvdIds) {
+ if (rmvdIds.size() < ACCOUNTS / 2) {
+ rmvd = rnd.nextBoolean() ? id1 : id2;
+
+ assertTrue(rmvdIds.add(rmvd));
+ }
+ }
+ }
+
+ if (rmvd != null) {
+ cache.remove(rmvd);
+
+ cache.put(rmvd.equals(id1) ? id2 : id1,
+ new MvccTestAccount(a1.val + a2.val, 1));
+ }
+ else {
+ cache.put(id1, new MvccTestAccount(a1.val + 1, 1));
+ cache.put(id2, new MvccTestAccount(a2.val - 1, 1));
+ }
+ }
+ else {
+ if (a1 == null) {
+ cache.put(id1, new MvccTestAccount(100, 1));
+ cache.put(id2, new MvccTestAccount(a2.val - 100, 1));
+
+ assertTrue(rmvdIds.remove(id1));
+ }
+ else {
+ cache.put(id1, new MvccTestAccount(a1.val - 100, 1));
+ cache.put(id2, new MvccTestAccount(100, 1));
+
+ assertTrue(rmvdIds.remove(id2));
+ }
+ }
+ }
+ }
tx.commit();
}
- Map<Integer, MvccTestAccount> accounts = cache.getAll(keys);
+ if (!withRmvs) {
+ Map<Integer, MvccTestAccount> accounts = cache.getAll(keys);
- MvccTestAccount a1 = accounts.get(id1);
- MvccTestAccount a2 = accounts.get(id2);
+ MvccTestAccount a1 = accounts.get(id1);
+ MvccTestAccount a2 = accounts.get(id2);
- assertNotNull(a1);
- assertNotNull(a2);
+ assertNotNull(a1);
+ assertNotNull(a2);
- assertTrue(a1.updateCnt >= cntr1);
- assertTrue(a2.updateCnt >= cntr2);
+ assertTrue(a1.updateCnt >= cntr1);
+ assertTrue(a2.updateCnt >= cntr2);
+ }
}
info("Writer finished, updates: " + cnt);
@@ -1354,23 +1564,26 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
else
accounts = cache.getAll(keys);
- assertEquals(ACCOUNTS, accounts.size());
+ if (!withRmvs)
+ assertEquals(ACCOUNTS, accounts.size());
int sum = 0;
for (int i = 0; i < ACCOUNTS; i++) {
MvccTestAccount account = accounts.get(i);
- assertNotNull(account);
+ if (account != null) {
+ sum += account.val;
- sum += account.val;
+ Integer cntr = lastUpdateCntrs.get(i);
- Integer cntr = lastUpdateCntrs.get(i);
+ if (cntr != null)
+ assertTrue(cntr <= account.updateCnt);
- if (cntr != null)
- assertTrue(cntr <= account.updateCnt);
-
- lastUpdateCntrs.put(i, cntr);
+ lastUpdateCntrs.put(i, cntr);
+ }
+ else
+ assertTrue(withRmvs);
}
assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
@@ -1386,9 +1599,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
for (int i = 0; i < ACCOUNTS; i++) {
MvccTestAccount account = accounts.get(i);
- info("Account [id=" + i + ", val=" + account.val + ']');
+ assertTrue(account != null || withRmvs);
- sum += account.val;
+ info("Account [id=" + i + ", val=" + (account != null ? account.val : null) + ']');
+
+ if (account != null)
+ sum += account.val;
}
info("Sum: " + sum);
@@ -1601,7 +1817,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testRebalance1() throws Exception {
+ public void testSimpleRebalance() throws Exception {
Ignite srv0 = startGrid(0);
IgniteCache<Integer, Integer> cache = (IgniteCache)srv0.createCache(
@@ -1664,6 +1880,58 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testSimpleRebalanceWithRemovedValues() throws Exception {
+ Ignite node = startGrid(0);
+
+ IgniteTransactions txs = node.transactions();
+
+ final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64));
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < 100; k++)
+ cache.remove(k);
+
+ tx.commit();
+ }
+
+ Map<Object, Object> expVals = new HashMap<>();
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 100; k < 200; k++) {
+ cache.put(k, k);
+
+ expVals.put(k, k);
+ }
+
+ tx.commit();
+ }
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 100; k < 200; k++) {
+ if (k % 2 == 0) {
+ cache.remove(k);
+
+ expVals.remove(k);
+ }
+ }
+
+ tx.commit();
+ }
+
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ checkValues(expVals, jcache(1));
+
+ stopGrid(0);
+
+ checkValues(expVals, jcache(1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCoordinatorFailurePessimisticTx() throws Exception {
testSpi = true;
@@ -2722,9 +2990,55 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
assertEquals(KEYS, cache.size());
}
- // TODO IGNITE-3478: test removes.
- }
+ int size = KEYS;
+
+ for (int i = 0; i < KEYS; i++) {
+ if (i % 2 == 0) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.remove(key);
+
+ tx.commit();
+ }
+
+ size--;
+
+ assertEquals(size, cache.size());
+ }
+ }
+
+ // Check size does not change if remove already removed keys.
+ for (int i = 0; i < KEYS; i++) {
+ if (i % 2 == 0) {
+ final Integer key = i;
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.remove(key);
+
+ tx.commit();
+ }
+
+ assertEquals(size, cache.size());
+ }
+ }
+
+ for (int i = 0; i < KEYS; i++) {
+ if (i % 2 == 0) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, i);
+
+ tx.commit();
+ }
+
+ size++;
+
+ assertEquals(size, cache.size());
+ }
+ }
+ }
/**
* @throws IgniteCheckedException If failed.
@@ -2792,7 +3106,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
key0,
vers.get(0).get1());
- MvccCoordinatorVersionResponse ver = version(crd.currentCoordinator().coordinatorVersion(), 100000);
+ MvccCoordinatorVersionResponse ver = version(vers.get(0).get2().coordinatorVersion(), 100000);
for (int v = 0; v < vers.size(); v++) {
MvccCounter cntr = vers.get(v).get2();
@@ -3074,4 +3388,54 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
return null;
}
}
+
+ /**
+ *
+ */
+ static class TestKey implements Serializable {
+ /** */
+ private final int key;
+
+ /** */
+ private final byte[] payload;
+
+ /**
+ * @param key Key.
+ * @param payloadSize Payload size.
+ */
+ public TestKey(int key, int payloadSize) {
+ this.key = key;
+ this.payload = new byte[payloadSize];
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestKey testKey = (TestKey)o;
+
+ if (key != testKey.key)
+ return false;
+
+ return Arrays.equals(payload, testKey.payload);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = key;
+
+ res = 31 * res + Arrays.hashCode(payload);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "TestKey [k=" + key + ", payloadLen=" + payload.length + ']';
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
index d0897e1..600c8df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
@@ -435,6 +435,11 @@ public class FreeListImplSelfTest extends GridCommonAbstractTest {
@Override public long mvccCounter() {
return 0;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return false;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
index 392301c..1819cfe 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
@@ -174,4 +174,9 @@ public abstract class GridH2Row implements SearchRow, CacheDataRow, Row {
@Override public long mvccCounter() {
throw new UnsupportedOperationException();
}
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ throw new UnsupportedOperationException();
+ }
}
\ No newline at end of file
[4/7] ignite git commit: IGNITE-6588: SQL: optimized index segment
resolution. This closes #2825.
Posted by sb...@apache.org.
IGNITE-6588: SQL: optimized index segment resolution. This closes #2825.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0f3546a9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0f3546a9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0f3546a9
Branch: refs/heads/ignite-3478
Commit: 0f3546a9b45e55181f0c16b11f9378452357b2ec
Parents: 4385f12
Author: devozerov <vo...@gridgain.com>
Authored: Tue Oct 10 17:54:50 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Oct 10 17:54:50 2017 +0300
----------------------------------------------------------------------
.../query/h2/opt/GridH2IndexBase.java | 25 +++++++++-----------
1 file changed, 11 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f3546a9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 6568f13..048192a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -804,26 +804,23 @@ public abstract class GridH2IndexBase extends BaseIndex {
protected int segmentForRow(SearchRow row) {
assert row != null;
- CacheObject key;
-
- if (ctx != null) {
- final Value keyColValue = row.getValue(KEY_COL);
+ if (segmentsCount() == 1 || ctx == null)
+ return 0;
- assert keyColValue != null;
+ CacheObject key;
- final Object o = keyColValue.getObject();
+ final Value keyColValue = row.getValue(KEY_COL);
- if (o instanceof CacheObject)
- key = (CacheObject)o;
- else
- key = ctx.toCacheKeyObject(o);
+ assert keyColValue != null;
- return segmentForPartition(ctx.affinity().partition(key));
- }
+ final Object o = keyColValue.getObject();
- assert segmentsCount() == 1;
+ if (o instanceof CacheObject)
+ key = (CacheObject)o;
+ else
+ key = ctx.toCacheKeyObject(o);
- return 0;
+ return segmentForPartition(ctx.affinity().partition(key));
}
/**