You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/08/30 11:53:56 UTC
[34/38] ignite git commit: IGNITE-4191: MVCC and transactional SQL
support. Joint multi-man-years efforts of Semen Boikov, Igor Seliverstov,
Alexander Paschenko, Igor Sapego, Sergey Kalashnikov, Roman Kondakov,
Pavel Kuznetsov, Ivan Pavlukhin, Andrey Mas
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 4b98060..bfe0001 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
-import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
@@ -48,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionsEvictManager;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
@@ -794,6 +794,13 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @return Cache mvcc coordinator manager.
+ */
+ public MvccProcessor coordinators() {
+ return kernalCtx.coordinators();
+ }
+
+ /**
* @return Partition evict manager.
*/
public PartitionsEvictManager evict() {
@@ -855,7 +862,7 @@ public class GridCacheSharedContext<K, V> {
/**
* Captures all ongoing operations that we need to wait before we can proceed to the next topology version.
* This method must be called only after
- * {@link GridDhtPartitionTopology#updateTopologyVersion(GridDhtTopologyFuture, DiscoCache, long, boolean)}
+ * {@link GridDhtPartitionTopology#updateTopologyVersion}
* method is called so that all new updates will wait to switch to the new version.
* This method will capture:
* <ul>
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
index 92af83b..b646cf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
@@ -17,8 +17,12 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.List;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
@@ -29,26 +33,62 @@ public class GridCacheUpdateTxResult {
/** Success flag.*/
private final boolean success;
- /** Old value. */
- @GridToStringInclude
- private final CacheObject oldVal;
-
- /** Partition idx. */
+ /** Partition update counter. */
private long updateCntr;
/** */
+ private GridLongList mvccWaitTxs;
+
+ /** */
+ private GridFutureAdapter<GridCacheUpdateTxResult> fut;
+
+ /** */
private WALPointer logPtr;
+ /** */
+ private List<MvccLinkAwareSearchRow> mvccHistory;
+
+ /**
+ * Constructor.
+ *
+ * @param success Success flag.
+ */
+ GridCacheUpdateTxResult(boolean success) {
+ this.success = success;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param success Success flag.
+ * @param logPtr Logger WAL pointer for the update.
+ */
+ GridCacheUpdateTxResult(boolean success, WALPointer logPtr) {
+ this.success = success;
+ this.logPtr = logPtr;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param success Success flag.
+ * @param fut Update future.
+ */
+ GridCacheUpdateTxResult(boolean success, GridFutureAdapter<GridCacheUpdateTxResult> fut) {
+ this.success = success;
+ this.fut = fut;
+ }
+
/**
* Constructor.
*
* @param success Success flag.
- * @param oldVal Old value (if any),
+ * @param updateCntr Update counter.
* @param logPtr Logger WAL pointer for the update.
*/
- GridCacheUpdateTxResult(boolean success, @Nullable CacheObject oldVal, WALPointer logPtr) {
+ GridCacheUpdateTxResult(boolean success, long updateCntr, WALPointer logPtr) {
this.success = success;
- this.oldVal = oldVal;
+ this.updateCntr = updateCntr;
this.logPtr = logPtr;
}
@@ -56,20 +96,21 @@ public class GridCacheUpdateTxResult {
* Constructor.
*
* @param success Success flag.
- * @param oldVal Old value (if any).
+ * @param updateCntr Update counter.
* @param logPtr Logger WAL pointer for the update.
+ * @param mvccWaitTxs List of transactions to wait for completion.
*/
- GridCacheUpdateTxResult(boolean success, @Nullable CacheObject oldVal, long updateCntr, WALPointer logPtr) {
+ GridCacheUpdateTxResult(boolean success, long updateCntr, WALPointer logPtr, GridLongList mvccWaitTxs) {
this.success = success;
- this.oldVal = oldVal;
this.updateCntr = updateCntr;
this.logPtr = logPtr;
+ this.mvccWaitTxs = mvccWaitTxs;
}
/**
- * @return Partition idx.
+ * @return Partition update counter.
*/
- public long updatePartitionCounter() {
+ public long updateCounter() {
return updateCntr;
}
@@ -88,10 +129,33 @@ public class GridCacheUpdateTxResult {
}
/**
- * @return Old value.
+ * @return Update future.
+ */
+ @Nullable public IgniteInternalFuture<GridCacheUpdateTxResult> updateFuture() {
+ return fut;
+ }
+
+ /**
+ * @return List of transactions to wait for completion.
+ */
+ @Nullable public GridLongList mvccWaitTransactions() {
+ return mvccWaitTxs;
+ }
+
+ /**
+ *
+ * @return Mvcc history rows.
+ */
+ @Nullable public List<MvccLinkAwareSearchRow> mvccHistory() {
+ return mvccHistory;
+ }
+
+ /**
+ *
+ * @param mvccHistory Mvcc history rows.
*/
- @Nullable public CacheObject oldValue() {
- return oldVal;
+ public void mvccHistory(List<MvccLinkAwareSearchRow> mvccHistory) {
+ this.mvccHistory = mvccHistory;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 213fb2e..9ef470c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.configuration.Factory;
@@ -67,10 +68,13 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
import org.apache.ignite.internal.managers.discovery.IgniteClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -99,7 +103,6 @@ import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.plugin.CachePluginConfiguration;
import org.apache.ignite.plugin.security.SecurityException;
-import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -173,6 +176,14 @@ public class GridCacheUtils {
/** System cache name. */
public static final String UTILITY_CACHE_NAME = "ignite-sys-cache";
+ /** Reserved cache names */
+ public static final String[] RESERVED_NAMES = new String[] {
+ SYS_CACHE_HADOOP_MR,
+ UTILITY_CACHE_NAME,
+ MetaStorage.METASTORAGE_CACHE_NAME,
+ TxLog.TX_LOG_CACHE_NAME,
+ };
+
/** */
public static final String CONTINUOUS_QRY_LOG_CATEGORY = "org.apache.ignite.continuous.query";
@@ -319,6 +330,18 @@ public class GridCacheUtils {
}
};
+ /** Query mapped filter. */
+ public static final IgnitePredicate<GridDistributedTxMapping> FILTER_QUERY_MAPPING = new P1<GridDistributedTxMapping>() {
+
+ @Override public boolean apply(GridDistributedTxMapping m) {
+ return m.queryUpdate();
+ }
+
+ @Override public String toString() {
+ return "FILTER_QUERY_MAPPING";
+ }
+ };
+
/** Transaction entry to key. */
private static final IgniteClosure tx2key = new C1<IgniteTxEntry, Object>() {
@Override public Object apply(IgniteTxEntry e) {
@@ -589,6 +612,30 @@ public class GridCacheUtils {
}
/**
+ * @return Long reducer.
+ */
+ public static IgniteReducer<Long, Long> longReducer() {
+ return new IgniteReducer<Long, Long>() {
+ private final LongAdder res = new LongAdder();
+
+ @Override public boolean collect(Long l) {
+ if(l != null)
+ res.add(l);
+
+ return true;
+ }
+
+ @Override public Long reduce() {
+ return res.sum();
+ }
+
+ @Override public String toString() {
+ return "Long reducer: " + res;
+ }
+ };
+ }
+
+ /**
* Gets reducer that aggregates maps into one.
*
* @param size Predicted size of the resulting map to avoid resizings.
@@ -1543,6 +1590,17 @@ public class GridCacheUtils {
}
/**
+ * @param name Cache name.
+ * @throws IllegalArgumentException In case the name is not valid.
+ */
+ public static void validateNewCacheName(String name) throws IllegalArgumentException {
+ validateCacheName(name);
+
+ A.ensure(!isReservedCacheName(name), "Cache name cannot be \"" + name +
+ "\" because it is reserved for internal purposes.");
+ }
+
+ /**
* @param cacheNames Cache names to validate.
* @throws IllegalArgumentException In case the name is not valid.
*/
@@ -1558,7 +1616,20 @@ public class GridCacheUtils {
public static void validateConfigurationCacheNames(Collection<CacheConfiguration> ccfgs)
throws IllegalArgumentException {
for (CacheConfiguration ccfg : ccfgs)
- validateCacheName(ccfg.getName());
+ validateNewCacheName(ccfg.getName());
+ }
+
+ /**
+ * @param name Cache name.
+ * @return {@code True} if it is a reserved cache name.
+ */
+ public static boolean isReservedCacheName(String name) {
+ for (String reserved : RESERVED_NAMES) {
+ if (reserved.equals(name))
+ return true;
+ }
+
+ return false;
}
/**
@@ -1830,6 +1901,44 @@ public class GridCacheUtils {
}
/**
+ * @param sctx Shared context.
+ * @param cacheIds Cache ids.
+ * @return First partitioned cache or {@code null} in case no partitioned cache ids are in list.
+ */
+ public static GridCacheContext<?, ?> firstPartitioned(GridCacheSharedContext<?, ?> sctx, int[] cacheIds) {
+ for (int i = 0; i < cacheIds.length; i++) {
+ GridCacheContext<?, ?> cctx = sctx.cacheContext(cacheIds[i]);
+
+ if (cctx == null)
+ throw new CacheException("Failed to find cache.");
+
+ if (!cctx.isLocal() && !cctx.isReplicated())
+ return cctx;
+ }
+
+ return null;
+ }
+
+ /**
+ * @param sctx Shared context.
+ * @param cacheIds Cache ids.
+ * @return First partitioned cache or {@code null} in case no partitioned cache ids are in list.
+ */
+ public static GridCacheContext<?, ?> firstPartitioned(GridCacheSharedContext<?, ?> sctx, Iterable<Integer> cacheIds) {
+ for (Integer i : cacheIds) {
+ GridCacheContext<?, ?> cctx = sctx.cacheContext(i);
+
+ if (cctx == null)
+ throw new CacheException("Failed to find cache.");
+
+ if (!cctx.isLocal() && !cctx.isReplicated())
+ return cctx;
+ }
+
+ return null;
+ }
+
+ /**
* @param cacheName Name of cache or cache template.
* @return {@code true} if cache name ends with asterisk (*), and therefire is a template name.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index fa25412..a021394 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -17,25 +17,33 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.List;
import java.util.Map;
import javax.cache.Cache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
import org.apache.ignite.internal.processors.cache.persistence.RowStore;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.IgniteTree;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
/**
@@ -171,6 +179,202 @@ public interface IgniteCacheOffheapManager {
/**
* @param cctx Cache context.
* @param key Key.
+ * @return Cached row, if available, null otherwise.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public CacheDataRow mvccRead(GridCacheContext cctx, KeyCacheObject key, MvccSnapshot ver)
+ throws IgniteCheckedException;
+
+ /**
+ * For testing only.
+ *
+ * @param cctx Cache context.
+ * @param key Key.
+ * @return All stored versions for given key.
+ * @throws IgniteCheckedException If failed.
+ */
+ public List<IgniteBiTuple<Object, MvccVersion>> mvccAllVersions(GridCacheContext cctx, KeyCacheObject key)
+ throws IgniteCheckedException;
+
+ /**
+ * Returns iterator over the all row versions for the given key.
+ *
+ * @param cctx Cache context.
+ * @param key Key.
+ * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+ * @return Iterator over all versions.
+ * @throws IgniteCheckedException If failed.
+ */
+ GridCursor<CacheDataRow> mvccAllVersionsCursor(GridCacheContext cctx, KeyCacheObject key, Object x)
+ throws IgniteCheckedException;
+
+ /**
+ * @param entry Entry.
+ * @param val Value.
+ * @param ver Version.
+ * @param expireTime Expire time.
+ * @return {@code True} if value was inserted.
+ * @throws IgniteCheckedException If failed.
+ */
+ default boolean mvccInitialValue(
+ GridCacheMapEntry entry,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ long expireTime
+ ) throws IgniteCheckedException {
+ return mvccInitialValue(entry, val, ver, expireTime, null, null);
+ }
+
+ /**
+ * @param entry Entry.
+ * @param val Value.
+ * @param ver Version.
+ * @param expireTime Expire time.
+ * @param mvccVer MVCC version.
+ * @param newMvccVer New MVCC version.
+ * @return {@code True} if value was inserted.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean mvccInitialValue(
+ GridCacheMapEntry entry,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccVersion mvccVer,
+ MvccVersion newMvccVer
+ ) throws IgniteCheckedException;
+
+ /**
+ * @param entry Entry.
+ * @param val Value.
+ * @param ver Version.
+ * @param expireTime Expire time.
+ * @param mvccVer MVCC version.
+ * @param newMvccVer New MVCC version.
+ * @param txState Tx state hint for the mvcc version.
+ * @param newTxState Tx state hint for the new mvcc version.
+ * @return {@code True} if value was inserted.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean mvccInitialValueIfAbsent(
+ GridCacheMapEntry entry,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccVersion mvccVer,
+ MvccVersion newMvccVer,
+ byte txState,
+ byte newTxState
+ ) throws IgniteCheckedException;
+
+ /**
+ * @param entry Entry.
+ * @param val Value.
+ * @param ver Cache version.
+ * @param expireTime Expire time.
+ * @param mvccSnapshot MVCC snapshot.
+ * @param primary {@code True} if on primary node.
+ * @param needHistory Flag to collect history.
+ * @param noCreate Flag indicating that row should not be created if absent.
+ * @return Update result.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public MvccUpdateResult mvccUpdate(
+ GridCacheMapEntry entry,
+ CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccSnapshot mvccSnapshot,
+ boolean primary,
+ boolean needHistory,
+ boolean noCreate) throws IgniteCheckedException;
+
+ /**
+ * @param entry Entry.
+ * @param mvccSnapshot MVCC snapshot.
+ * @param primary {@code True} if on primary node.
+ * @param needHistory Flag to collect history.
+ * @return Update result.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public MvccUpdateResult mvccRemove(
+ GridCacheMapEntry entry,
+ MvccSnapshot mvccSnapshot,
+ boolean primary,
+ boolean needHistory) throws IgniteCheckedException;
+
+ /**
+ * @param entry Entry.
+ * @param mvccSnapshot MVCC snapshot.
+ * @return Update result.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public MvccUpdateResult mvccLock(
+ GridCacheMapEntry entry,
+ MvccSnapshot mvccSnapshot
+ ) throws IgniteCheckedException;
+
+ /**
+ * @param entry Entry.
+ * @param val Value.
+ * @param ver Version.
+ * @param mvccVer MVCC version.
+ * @param newMvccVer New MVCC version.
+ * @return {@code True} if value was inserted.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean mvccUpdateRowWithPreloadInfo(
+ GridCacheMapEntry entry,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccVersion mvccVer,
+ MvccVersion newMvccVer,
+ byte mvccTxState,
+ byte newMvccTxState
+ ) throws IgniteCheckedException;
+
+ /**
+ * @param primary {@code True} if on primary node.
+ * @param entry Entry.
+ * @param val Value.
+ * @param ver Cache version.
+ * @param expireTime Expire time.
+ * @param mvccSnapshot MVCC snapshot.
+ * @return Transactions to wait for before finishing current transaction.
+ * @throws IgniteCheckedException If failed.
+ */
+ GridLongList mvccUpdateNative(
+ boolean primary,
+ GridCacheMapEntry entry,
+ CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccSnapshot mvccSnapshot) throws IgniteCheckedException;
+
+ /**
+ * @param primary {@code True} if on primary node.
+ * @param entry Entry.
+ * @param mvccSnapshot MVCC snapshot.
+ * @return Transactions to wait for before finishing current transaction.
+ * @throws IgniteCheckedException If failed.
+ */
+ GridLongList mvccRemoveNative(
+ boolean primary,
+ GridCacheMapEntry entry,
+ MvccSnapshot mvccSnapshot
+ ) throws IgniteCheckedException;
+
+ /**
+ * @param entry Entry.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void mvccRemoveAll(GridCacheMapEntry entry)
+ throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
* @param val Value.
* @param ver Version.
* @param expireTime Expire time.
@@ -209,6 +413,7 @@ public interface IgniteCacheOffheapManager {
public int onUndeploy(ClassLoader ldr);
/**
+ *
* @param cacheId Cache ID.
* @param primary Primary entries flag.
* @param backup Backup entries flag.
@@ -219,7 +424,8 @@ public interface IgniteCacheOffheapManager {
public GridIterator<CacheDataRow> cacheIterator(int cacheId,
boolean primary,
boolean backup,
- final AffinityTopologyVersion topVer)
+ AffinityTopologyVersion topVer,
+ @Nullable MvccSnapshot mvccSnapshot)
throws IgniteCheckedException;
/**
@@ -228,7 +434,20 @@ public interface IgniteCacheOffheapManager {
* @return Partition data iterator.
* @throws IgniteCheckedException If failed.
*/
- public GridIterator<CacheDataRow> cachePartitionIterator(int cacheId, final int part) throws IgniteCheckedException;
+ public default GridIterator<CacheDataRow> cachePartitionIterator(int cacheId, final int part)
+ throws IgniteCheckedException {
+ return cachePartitionIterator(cacheId, part, null);
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param part Partition.
+ * @param mvccSnapshot MVCC snapshot.
+ * @return Partition data iterator.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridIterator<CacheDataRow> cachePartitionIterator(int cacheId, final int part,
+ @Nullable MvccSnapshot mvccSnapshot) throws IgniteCheckedException;
/**
* @param part Partition number.
@@ -251,6 +470,7 @@ public interface IgniteCacheOffheapManager {
* @return Partition data iterator.
* @throws IgniteCheckedException If failed.
*/
+ // TODO: MVCC>
public IgniteRebalanceIterator rebalanceIterator(IgniteDhtDemandedPartitionsMap parts, AffinityTopologyVersion topVer)
throws IgniteCheckedException;
@@ -263,6 +483,7 @@ public interface IgniteCacheOffheapManager {
* @return Entries iterator.
* @throws IgniteCheckedException If failed.
*/
+ // TODO: MVCC>
public <K, V> GridCloseableIterator<Cache.Entry<K, V>> cacheEntriesIterator(
GridCacheContext cctx,
final boolean primary,
@@ -276,6 +497,7 @@ public interface IgniteCacheOffheapManager {
* @return Iterator.
* @throws IgniteCheckedException If failed.
*/
+ // TODO: MVCC>
public GridCloseableIterator<KeyCacheObject> cacheKeysIterator(int cacheId, final int part)
throws IgniteCheckedException;
@@ -287,6 +509,7 @@ public interface IgniteCacheOffheapManager {
* @return Entries count.
* @throws IgniteCheckedException If failed.
*/
+ // TODO: MVCC>
public long cacheEntriesCount(int cacheId, boolean primary, boolean backup, AffinityTopologyVersion topVer)
throws IgniteCheckedException;
@@ -402,7 +625,7 @@ public interface IgniteCacheOffheapManager {
long updateCounter();
/**
- *
+ * @param val Update counter.
*/
void updateCounter(long val);
@@ -412,6 +635,16 @@ public interface IgniteCacheOffheapManager {
public long nextUpdateCounter();
/**
+ * @return Next mvcc update counter.
+ */
+ long nextMvccUpdateCounter();
+
+ /**
+ * @return Current mvcc update counter value.
+ */
+ long mvccUpdateCounter();
+
+ /**
* @return Initial update counter.
*/
public long initialUpdateCounter();
@@ -436,6 +669,24 @@ public interface IgniteCacheOffheapManager {
/**
* @param cctx Cache context.
+ * @param cleanupRows Rows to cleanup.
+ * @throws IgniteCheckedException If failed.
+ * @return Cleaned rows count.
+ */
+ public int cleanup(GridCacheContext cctx, @Nullable List<MvccLinkAwareSearchRow> cleanupRows)
+ throws IgniteCheckedException;
+
+ /**
+ *
+ * @param cctx Cache context.
+ * @param row Row.
+ * @throws IgniteCheckedException
+ */
+ public void updateTxState(GridCacheContext cctx, CacheSearchRow row)
+ throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
* @param key Key.
* @param val Value.
* @param ver Version.
@@ -454,6 +705,163 @@ public interface IgniteCacheOffheapManager {
/**
* @param cctx Cache context.
* @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param mvccVer MVCC version.
+ * @param newMvccVer New MVCC version.
+ * @return {@code True} if new value was inserted.
+ * @throws IgniteCheckedException If failed.
+ */
+ boolean mvccInitialValue(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccVersion mvccVer,
+ MvccVersion newMvccVer) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param mvccVer MVCC version.
+ * @param newMvccVer New MVCC version.
+ * @param txState Tx state hint for the mvcc version.
+ * @param newTxState Tx state hint for the new mvcc version.
+ * @return {@code True} if new value was inserted.
+ * @throws IgniteCheckedException If failed.
+ */
+ boolean mvccInitialValueIfAbsent(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccVersion mvccVer,
+ MvccVersion newMvccVer,
+ byte txState,
+ byte newTxState) throws IgniteCheckedException;
+
+ /**
+ *
+ * @param cctx Grid cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param expireTime Expiration time.
+ * @param mvccVer Mvcc version.
+ * @param newMvccVer New mvcc version.
+ * @return {@code true} on success.
+ * @throws IgniteCheckedException, if failed.
+ */
+ boolean mvccUpdateRowWithPreloadInfo(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccVersion mvccVer,
+ MvccVersion newMvccVer,
+ byte mvccTxState,
+ byte newMvccTxState) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param expireTime Expire time.
+ * @param mvccSnapshot MVCC snapshot.
+ * @param primary {@code True} if update is executed on primary node.
+ * @param needHistory Flag to collect history.
+ * @param noCreate Flag indicating that row should not be created if absent.
+ * @return Update result.
+ * @throws IgniteCheckedException If failed.
+ */
+ MvccUpdateResult mvccUpdate(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccSnapshot mvccSnapshot,
+ boolean primary,
+ boolean needHistory,
+ boolean noCreate) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @param mvccSnapshot MVCC snapshot.
+ * @param primary {@code True} if update is executed on primary node.
+ * @param needHistory Flag to collect history.
+ * @return List of transactions to wait for.
+ * @throws IgniteCheckedException If failed.
+ */
+ MvccUpdateResult mvccRemove(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ MvccSnapshot mvccSnapshot,
+ boolean primary,
+ boolean needHistory) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @param mvccSnapshot MVCC snapshot.
+ * @return List of transactions to wait for.
+ * @throws IgniteCheckedException If failed.
+ */
+ MvccUpdateResult mvccLock(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ MvccSnapshot mvccSnapshot) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param primary {@code True} if update is executed on primary node.
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param expireTime Expire time.
+ * @param mvccSnapshot MVCC snapshot.
+ * @return Update result.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable GridLongList mvccUpdateNative(
+ GridCacheContext cctx,
+ boolean primary,
+ KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccSnapshot mvccSnapshot) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param primary {@code True} if update is executed on primary node.
+ * @param key Key.
+ * @param mvccSnapshot MVCC snapshot.
+ * @return List of transactions to wait for.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable GridLongList mvccRemoveNative(GridCacheContext cctx,
+ boolean primary,
+ KeyCacheObject key,
+ MvccSnapshot mvccSnapshot) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @throws IgniteCheckedException If failed.
+ */
+ void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
* @param c Closure.
* @throws IgniteCheckedException If failed.
*/
@@ -476,12 +884,58 @@ public interface IgniteCacheOffheapManager {
public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException;
/**
+ * Returns iterator over the all row versions for the given key.
+ *
+ * @param cctx Cache context.
+ * @param key Key.
+ * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+ * @return Iterator over all versions.
+ * @throws IgniteCheckedException If failed.
+ */
+ GridCursor<CacheDataRow> mvccAllVersionsCursor(GridCacheContext cctx, KeyCacheObject key, Object x)
+ throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @return Data row.
+ * @throws IgniteCheckedException If failed.
+ */
+ public CacheDataRow mvccFind(GridCacheContext cctx, KeyCacheObject key, MvccSnapshot snapshot)
+ throws IgniteCheckedException;
+
+ /**
+ * For testing only.
+ *
+ * @param cctx Cache context.
+ * @param key Key.
+ * @return All stored versions for given key.
+ * @throws IgniteCheckedException If failed.
+ */
+ List<IgniteBiTuple<Object, MvccVersion>> mvccFindAllVersions(GridCacheContext cctx, KeyCacheObject key)
+ throws IgniteCheckedException;
+
+ /**
* @return Data cursor.
* @throws IgniteCheckedException If failed.
*/
public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException;
/**
+ * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+ * @return Data cursor.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridCursor<? extends CacheDataRow> cursor(Object x) throws IgniteCheckedException;
+
+ /**
+ * @param mvccSnapshot MVCC snapshot.
+ * @return Data cursor.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridCursor<? extends CacheDataRow> cursor(MvccSnapshot mvccSnapshot) throws IgniteCheckedException;
+
+ /**
* @param cacheId Cache ID.
* @return Data cursor.
* @throws IgniteCheckedException If failed.
@@ -490,6 +944,15 @@ public interface IgniteCacheOffheapManager {
/**
* @param cacheId Cache ID.
+ * @param mvccSnapshot Mvcc snapshot.
+ * @return Data cursor.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridCursor<? extends CacheDataRow> cursor(int cacheId, MvccSnapshot mvccSnapshot)
+ throws IgniteCheckedException;
+
+ /**
+ * @param cacheId Cache ID.
* @param lower Lower bound.
* @param upper Upper bound.
* @return Data cursor.
@@ -510,6 +973,18 @@ public interface IgniteCacheOffheapManager {
KeyCacheObject upper, Object x) throws IgniteCheckedException;
/**
+ * @param cacheId Cache ID.
+ * @param lower Lower bound.
+ * @param upper Upper bound.
+ * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+ * @param snapshot Mvcc snapshot.
+ * @return Data cursor.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
+ KeyCacheObject upper, Object x, MvccSnapshot snapshot) throws IgniteCheckedException;
+
+ /**
* Destroys the tree associated with the store.
*
* @throws IgniteCheckedException If failed.