You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/08/30 11:53:58 UTC
[36/38] ignite git commit: IGNITE-4191: MVCC and transactional SQL
support. Joint multi-man-years efforts of Semen Boikov, Igor Seliverstov,
Alexander Paschenko, Igor Sapego, Sergey Kalashnikov, Roman Kondakov,
Pavel Kuznetsov, Ivan Pavlukhin, Andrey Mas
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java
index 6c128a4..e24ecbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.odbc.jdbc.JdbcTableMeta;
import org.apache.ignite.internal.util.typedef.F;
import static java.sql.Connection.TRANSACTION_NONE;
+import static java.sql.Connection.TRANSACTION_REPEATABLE_READ;
import static java.sql.ResultSet.CONCUR_READ_ONLY;
import static java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT;
import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
@@ -630,17 +631,19 @@ public class JdbcThinDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public int getDefaultTransactionIsolation() throws SQLException {
- return TRANSACTION_NONE;
+ return conn.igniteVersion().greaterThanEqual(2, 5, 0) ? TRANSACTION_REPEATABLE_READ :
+ TRANSACTION_NONE;
}
/** {@inheritDoc} */
@Override public boolean supportsTransactions() throws SQLException {
- return false;
+ return conn.igniteVersion().greaterThanEqual(2, 5, 0);
}
/** {@inheritDoc} */
@Override public boolean supportsTransactionIsolationLevel(int level) throws SQLException {
- return false;
+ return conn.igniteVersion().greaterThanEqual(2, 5, 0) &&
+ TRANSACTION_REPEATABLE_READ == level;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
index 30e446f..f0f7337 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
@@ -46,6 +46,8 @@ import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResultInfo;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.sql.SqlKeyword;
import org.apache.ignite.internal.sql.SqlParseException;
@@ -208,7 +210,7 @@ public class JdbcThinStatement implements Statement {
}
JdbcResult res0 = conn.sendRequest(new JdbcQueryExecuteRequest(stmtType, schema, pageSize,
- maxRows, sql, args == null ? null : args.toArray(new Object[args.size()])));
+ maxRows, conn.getAutoCommit(), sql, args == null ? null : args.toArray(new Object[args.size()])));
assert res0 != null;
@@ -646,7 +648,8 @@ public class JdbcThinStatement implements Statement {
throw new SQLException("Batch is empty.");
try {
- JdbcBatchExecuteResult res = conn.sendRequest(new JdbcBatchExecuteRequest(conn.getSchema(), batch, false));
+ JdbcBatchExecuteResult res = conn.sendRequest(new JdbcBatchExecuteRequest(conn.getSchema(), batch,
+ conn.getAutoCommit(), false));
if (res.errorCode() != ClientListenerResponse.STATUS_SUCCESS) {
throw new BatchUpdateException(res.errorMessage(), IgniteQueryErrorCode.codeToSqlState(res.errorCode()),
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index 6128d07..2c3f321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -326,6 +326,7 @@ public class JdbcThinTcpIo {
writer.writeBoolean(connProps.isAutoCloseServerCursor());
writer.writeBoolean(connProps.isLazy());
writer.writeBoolean(connProps.isSkipReducerOnUpdate());
+ writer.writeString(connProps.nestedTxMode());
if (!F.isEmpty(connProps.getUsername())) {
assert ver.compareTo(VER_2_5_0) >= 0 : "Authentication is supported since 2.5";
@@ -374,8 +375,9 @@ public class JdbcThinTcpIo {
+ ", url=" + connProps.getUrl() + ']', SqlStateCode.CONNECTION_REJECTED);
}
- if (VER_2_4_0.equals(srvProtocolVer) || VER_2_3_0.equals(srvProtocolVer) ||
- VER_2_1_5.equals(srvProtocolVer))
+ if (VER_2_4_0.equals(srvProtocolVer)
+ || VER_2_3_0.equals(srvProtocolVer)
+ || VER_2_1_5.equals(srvProtocolVer))
handshake(srvProtocolVer);
else if (VER_2_1_0.equals(srvProtocolVer))
handshake_2_1_0();
@@ -541,8 +543,8 @@ public class JdbcThinTcpIo {
int cnt = !F.isEmpty(qrys) ? Math.min(MAX_BATCH_QRY_CNT, qrys.size()) : 0;
- // One additional byte for last batch flag.
- cap = cnt * DYNAMIC_SIZE_MSG_CAP + 1;
+ // One additional byte for autocommit and last batch flags.
+ cap = cnt * DYNAMIC_SIZE_MSG_CAP + 2;
}
else if (req instanceof JdbcQueryCloseRequest)
cap = QUERY_CLOSE_MSG_SIZE;
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 8d9a700..f515d57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccMessage;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -101,6 +102,7 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
import static org.apache.ignite.internal.GridTopic.TOPIC_IO_TEST;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
@@ -1112,6 +1114,17 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return;
}
+ if (msg.topicOrdinal() == TOPIC_CACHE_COORDINATOR.ordinal()) {
+ MvccMessage msg0 = (MvccMessage)msg.message();
+
+ // see IGNITE-8609
+ /*if (msg0.processedFromNioThread())
+ c.run();
+ else*/
+ ctx.getStripedExecutorService().execute(-1, c);
+
+ return;
+ }
if (plc == GridIoPolicy.SYSTEM_POOL && msg.partition() != GridIoMessage.STRIPE_DISABLED_PART) {
ctx.getStripedExecutorService().execute(msg.partition(), c);
@@ -1648,6 +1661,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (e.getCause() instanceof ClusterTopologyCheckedException)
throw (ClusterTopologyCheckedException)e.getCause();
+ if (!ctx.discovery().alive(node))
+ throw new ClusterTopologyCheckedException("Failed to send message, node left: " + node.id());
+
throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +
"TCP connection cannot be established due to firewall issues) " +
"[node=" + node + ", topic=" + topic +
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 581c32e..8dddd8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -47,13 +47,13 @@ import org.apache.ignite.internal.processors.cache.CacheInvokeDirectResult;
import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.WalStateAckMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
@@ -68,11 +68,15 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsUpdateCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryFirstEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
@@ -97,6 +101,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
@@ -108,7 +113,24 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFi
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionImpl;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryCntr;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryId;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTxAndQueryCntr;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTxAndQueryId;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccActiveQueriesMessage;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccFutureResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccWaitTxsRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -927,6 +949,116 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case 136:
+ msg = new MvccTxSnapshotRequest();
+
+ break;
+
+ case 137:
+ msg = new MvccAckRequestTx();
+
+ break;
+
+ case 138:
+ msg = new MvccFutureResponse();
+
+ break;
+
+ case 139:
+ msg = new MvccQuerySnapshotRequest();
+
+ break;
+
+ case 140:
+ msg = new MvccAckRequestQueryCntr();
+
+ break;
+
+ case 141:
+ msg = new MvccSnapshotResponse();
+
+ break;
+
+ case 142:
+ msg = new MvccWaitTxsRequest();
+
+ break;
+
+ case 143:
+ msg = new GridCacheMvccEntryInfo();
+
+ break;
+
+ case 144:
+ msg = new GridDhtTxQueryEnlistResponse();
+
+ break;
+
+ case 145:
+ msg = new MvccAckRequestQueryId();
+
+ break;
+
+ case 146:
+ msg = new MvccAckRequestTxAndQueryCntr();
+
+ break;
+
+ case 147:
+ msg = new MvccAckRequestTxAndQueryId();
+
+ break;
+
+ case 148:
+ msg = new MvccVersionImpl();
+
+ break;
+
+ case 149:
+ msg = new MvccActiveQueriesMessage();
+
+ break;
+
+ case 150:
+ msg = new MvccSnapshotWithoutTxs();
+
+ break;
+
+ case 151:
+ msg = new GridNearTxQueryEnlistRequest();
+
+ break;
+
+ case 152:
+ msg = new GridNearTxQueryEnlistResponse();
+
+ break;
+
+ case 153:
+ msg = new GridNearTxQueryResultsEnlistRequest();
+
+ break;
+
+ case 154:
+ msg = new GridNearTxQueryResultsEnlistResponse();
+
+ break;
+
+ case 155:
+ msg = new GridDhtTxQueryEnlistRequest();
+
+ break;
+
+ case 156:
+ msg = new GridDhtTxQueryFirstEnlistRequest();
+
+ break;
+
+ case 157:
+ msg = new GridDhtPartitionsUpdateCountersMap();
+
+ break;
+
// [-3..119] [124..129] [-23..-27] [-36..-55]- this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 8cdcbf3..84bcab1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -108,10 +109,14 @@ public class DiscoCache {
/** */
private final P1<ClusterNode> aliveNodePred;
+ /** */
+ private final MvccCoordinator mvccCrd;
+
/**
* @param topVer Topology version.
* @param state Current cluster state.
* @param loc Local node.
+ * @param mvccCrd MVCC coordinator node.
* @param rmtNodes Remote nodes.
* @param allNodes All nodes.
* @param srvNodes Server nodes.
@@ -130,6 +135,7 @@ public class DiscoCache {
AffinityTopologyVersion topVer,
DiscoveryDataClusterState state,
ClusterNode loc,
+ MvccCoordinator mvccCrd,
List<ClusterNode> rmtNodes,
List<ClusterNode> allNodes,
List<ClusterNode> srvNodes,
@@ -148,6 +154,7 @@ public class DiscoCache {
this.topVer = topVer;
this.state = state;
this.loc = loc;
+ this.mvccCrd = mvccCrd;
this.rmtNodes = rmtNodes;
this.allNodes = allNodes;
this.srvNodes = srvNodes;
@@ -157,7 +164,7 @@ public class DiscoCache {
this.allCacheNodes = allCacheNodes;
this.cacheGrpAffNodes = cacheGrpAffNodes;
this.nodeMap = nodeMap;
- alives.addAll(alives0);
+ this.alives.addAll(alives0);
this.minNodeVer = minNodeVer;
this.minSrvNodeVer = minSrvNodeVer;
this.nodeIdToConsIdx = nodeIdToConsIdx;
@@ -177,6 +184,13 @@ public class DiscoCache {
}
/**
+ * @return Mvcc coordinator node.
+ */
+ @Nullable public MvccCoordinator mvccCoordinator() {
+ return mvccCrd;
+ }
+
+ /**
* @return Topology version.
*/
public AffinityTopologyVersion version() {
@@ -461,6 +475,7 @@ public class DiscoCache {
ver,
state == null ? this.state : state,
loc,
+ mvccCrd,
rmtNodes,
allNodes,
srvNodes,
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index a13f31e..d19e08b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -84,6 +84,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
@@ -159,6 +160,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MVCC_ENABLED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_OFFHEAP_SIZE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PEER_CLASSLOADING;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PHY_RAM;
@@ -644,6 +646,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
updateClientNodes(node.id());
}
+ ctx.coordinators().onDiscoveryEvent(type, topSnapshot, topVer);
+
boolean locJoinEvt = type == EVT_NODE_JOINED && node.id().equals(locNode.id());
ChangeGlobalStateFinishMessage stateFinishMsg = null;
@@ -1204,6 +1208,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
Boolean locSrvcCompatibilityEnabled = locNode.attribute(ATTR_SERVICES_COMPATIBILITY_MODE);
Boolean locSecurityCompatibilityEnabled = locNode.attribute(ATTR_SECURITY_COMPATIBILITY_MODE);
+ Boolean locMvccEnabled = locNode.attribute(ATTR_MVCC_ENABLED);
+
for (ClusterNode n : nodes) {
int rmtJvmMajVer = nodeJavaMajorVersion(n);
@@ -1301,6 +1307,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
", locNodeId=" + locNode.id() + ", rmtNode=" + U.toShortString(n) + "]");
}
+ Boolean rmtMvccEnabled = n.attribute(ATTR_MVCC_ENABLED);
+
+ if (!F.eq(locMvccEnabled, rmtMvccEnabled)) {
+ throw new IgniteCheckedException("Remote node has MVCC mode different from local " +
+ "[locId8=" + U.id8(locNode.id()) +
+ ", locMvccMode=" + (Boolean.TRUE.equals(locMvccEnabled) ? "ENABLED" : "DISABLED") +
+ ", rmtId8=" + U.id8(n.id()) +
+ ", rmtMvccMode=" + (Boolean.TRUE.equals(rmtMvccEnabled) ? "ENABLED" : "DISABLED") +
+ ", rmtAddrs=" + U.addressesAsString(n) + ", rmtNode=" + U.toShortString(n) + "]");
+ }
+
if (n.version().compareToIgnoreTimestamp(SERVICE_PERMISSIONS_SINCE) >= 0
&& ctx.security().enabled() // Matters only if security enabled.
) {
@@ -2353,6 +2370,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
Collection<ClusterNode> topSnapshot) {
assert topSnapshot.contains(loc);
+ MvccCoordinator mvccCrd = ctx.coordinators().coordinatorFromDiscoveryEvent();
+
HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size());
@@ -2454,6 +2473,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
topVer,
state,
loc,
+ mvccCrd,
Collections.unmodifiableList(rmtNodes),
Collections.unmodifiableList(allNodes),
Collections.unmodifiableList(srvNodes),
@@ -3362,6 +3382,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
topVer,
discoCache.state(),
discoCache.localNode(),
+ discoCache.mvccCoordinator(),
discoCache.remoteNodes(),
allNodes,
discoCache.serverNodes(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
index 5475bef..ff0c66a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManager;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.persistence.AllocatedPageTracker;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
/**
@@ -43,6 +44,18 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh
public void finishRecover() throws IgniteCheckedException;
/**
+ * Initializes disk store structures.
+ *
+ * @param cacheId Cache id.
+ * @param partitions Partitions count.
+ * @param workingDir Working directory.
+ * @param tracker Allocation tracker.
+ * @throws IgniteCheckedException If failed.
+ */
+ void initialize(int cacheId, int partitions, String workingDir, AllocatedPageTracker tracker)
+ throws IgniteCheckedException;
+
+ /**
* Callback called when a cache is starting.
*
* @param grpDesc Cache group descriptor.
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 585336a..a555aae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -184,7 +184,16 @@ public abstract class WALRecord {
RESERVED,
/** Rotated id part record. */
- ROTATED_ID_PART_RECORD;
+ ROTATED_ID_PART_RECORD,
+
+ /** */
+ MVCC_DATA_PAGE_MARK_UPDATED_RECORD,
+
+ /** */
+ MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD,
+
+ /** */
+ MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD;
/** */
private static final RecordType[] VALS = RecordType.values();
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccMarkUpdatedRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccMarkUpdatedRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccMarkUpdatedRecord.java
new file mode 100644
index 0000000..5e89f8e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccMarkUpdatedRecord.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * MVCC mark updated record.
+ */
+public class DataPageMvccMarkUpdatedRecord extends PageDeltaRecord {
+ /** */
+ private int itemId;
+
+ /** */
+ private long newMvccCrd;
+
+ /** */
+ private long newMvccCntr;
+
+ /** */
+ private int newMvccOpCntr;
+
+ /**
+ * @param grpId Cache group ID.
+ * @param pageId Page ID.
+ * @param itemId Item id.
+ * @param newMvccCrd New MVCC coordinator version.
+ * @param newMvccCntr New MVCC counter version.
+ * @param newMvccOpCntr New MVCC operation counter.
+ */
+ public DataPageMvccMarkUpdatedRecord(int grpId, long pageId, int itemId, long newMvccCrd, long newMvccCntr, int newMvccOpCntr) {
+ super(grpId, pageId);
+
+ this.itemId = itemId;
+ this.newMvccCrd = newMvccCrd;
+ this.newMvccCntr = newMvccCntr;
+ this.newMvccOpCntr = newMvccOpCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
+ DataPageIO io = PageIO.getPageIO(pageAddr);
+
+ io.updateNewVersion(pageAddr, itemId, pageMem.pageSize(), newMvccCrd, newMvccCntr, newMvccOpCntr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return RecordType.MVCC_DATA_PAGE_MARK_UPDATED_RECORD;
+ }
+
+ /**
+ * @return Item id.
+ */
+ public int itemId() {
+ return itemId;
+ }
+
+ /**
+ * @return New MVCC coordinator version.
+ */
+ public long newMvccCrd() {
+ return newMvccCrd;
+ }
+
+ /**
+ * @return New MVCC counter version.
+ */
+ public long newMvccCntr() {
+ return newMvccCntr;
+ }
+
+ /**
+ * @return New MVCC operation counter.
+ */
+ public int newMvccOpCntr() {
+ return newMvccOpCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DataPageMvccMarkUpdatedRecord.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateNewTxStateHintRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateNewTxStateHintRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateNewTxStateHintRecord.java
new file mode 100644
index 0000000..4a244a1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateNewTxStateHintRecord.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * MVCC update tx state hint record.
+ */
+public class DataPageMvccUpdateNewTxStateHintRecord extends PageDeltaRecord {
+ /** */
+ private int itemId;
+
+ /** */
+ private byte txState;
+
+ /**
+ * @param grpId Cache group ID.
+ * @param pageId Page ID.
+ * @param itemId Item id.
+ * @param txState Tx state hint.
+ */
+ public DataPageMvccUpdateNewTxStateHintRecord(int grpId, long pageId, int itemId, byte txState) {
+ super(grpId, pageId);
+
+ this.itemId = itemId;
+ this.txState = txState;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
+ DataPageIO io = PageIO.getPageIO(pageAddr);
+
+ io.updateNewTxState(pageAddr, itemId, pageMem.pageSize(), txState);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return RecordType.MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD;
+ }
+
+ /**
+ * @return Item id.
+ */
+ public int itemId() {
+ return itemId;
+ }
+
+ /**
+ * @return Tx state hint.
+ */
+ public byte txState() {
+ return txState;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DataPageMvccUpdateNewTxStateHintRecord.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateTxStateHintRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateTxStateHintRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateTxStateHintRecord.java
new file mode 100644
index 0000000..7e53609
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateTxStateHintRecord.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * MVCC update tx state hint record.
+ */
+public class DataPageMvccUpdateTxStateHintRecord extends PageDeltaRecord {
+ /** */
+ private int itemId;
+
+ /** */
+ private byte txState;
+
+ /**
+ * @param grpId Cache group ID.
+ * @param pageId Page ID.
+ * @param itemId Item id.
+ * @param txState Tx state hint.
+ */
+ public DataPageMvccUpdateTxStateHintRecord(int grpId, long pageId, int itemId, byte txState) {
+ super(grpId, pageId);
+
+ this.itemId = itemId;
+ this.txState = txState;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
+ DataPageIO io = PageIO.getPageIO(pageAddr);
+
+ io.updateTxState(pageAddr, itemId, pageMem.pageSize(), txState);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return RecordType.MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD;
+ }
+
+ /**
+ * @return Item id.
+ */
+ public int itemId() {
+ return itemId;
+ }
+
+ /**
+ * @return Tx state hint.
+ */
+ public byte txState() {
+ return txState;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DataPageMvccUpdateTxStateHintRecord.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
index f78ab60..1c321d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.processors.affinity;
-import org.apache.ignite.cluster.ClusterNode;
-
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
/**
* Cached affinity calculations.
@@ -90,4 +90,9 @@ public interface AffinityAssignment {
* @return Backup partitions for specified node ID.
*/
public Set<Integer> backupPartitions(UUID nodeId);
+
+ /**
+ * @return Mvcc coordinator.
+ */
+ public MvccCoordinator mvccCoordinator();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index cbec1a1..f96bc9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
@@ -40,6 +41,9 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
/** Topology version. */
private final AffinityTopologyVersion topVer;
+ /** */
+ private final MvccCoordinator mvccCrd;
+
/** Collection of calculated affinity nodes. */
private List<List<ClusterNode>> assignment;
@@ -73,6 +77,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
this.topVer = topVer;
primary = new HashMap<>();
backup = new HashMap<>();
+ mvccCrd = null;
clientEvtChange = false;
}
@@ -83,7 +88,8 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
*/
GridAffinityAssignment(AffinityTopologyVersion topVer,
List<List<ClusterNode>> assignment,
- List<List<ClusterNode>> idealAssignment) {
+ List<List<ClusterNode>> idealAssignment,
+ MvccCoordinator mvccCrd) {
assert topVer != null;
assert assignment != null;
assert idealAssignment != null;
@@ -91,6 +97,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
this.topVer = topVer;
this.assignment = assignment;
this.idealAssignment = idealAssignment.equals(assignment) ? assignment : idealAssignment;
+ this.mvccCrd = mvccCrd;
primary = new HashMap<>();
backup = new HashMap<>();
@@ -110,6 +117,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
idealAssignment = aff.idealAssignment;
primary = aff.primary;
backup = aff.backup;
+ mvccCrd = aff.mvccCrd;
clientEvtChange = true;
}
@@ -283,6 +291,11 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
}
/** {@inheritDoc} */
+ @Override public MvccCoordinator mvccCoordinator() {
+ return mvccCrd;
+ }
+
+ /** {@inheritDoc} */
@Override public int hashCode() {
return topVer.hashCode();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 34e2b0a..cc2c17c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.NodeOrderComparator;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -202,11 +203,25 @@ public class GridAffinityAssignmentCache {
* @param affAssignment Affinity assignment for topology version.
*/
public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) {
+ MvccCoordinator mvccCrd = ctx.cache().context().coordinators().currentCoordinator(topVer);
+
+ initialize(topVer, affAssignment, mvccCrd);
+ }
+
+ /**
+ * Initializes affinity with given topology version and assignment.
+ *
+ * @param topVer Topology version.
+ * @param affAssignment Affinity assignment for topology version.
+ * @param mvccCrd Mvcc coordinator.
+ */
+ public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment, MvccCoordinator mvccCrd) {
assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + ", last=" + lastVersion() + ']';
assert idealAssignment != null;
+ assert mvccCrd == null || topVer.compareTo(mvccCrd.topologyVersion()) >= 0 : "[mvccCrd=" + mvccCrd + ", topVer=" + topVer + ']';
- GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment);
+ GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment, mvccCrd);
HistoryAffinityAssignment hAff = affCache.put(topVer, new HistoryAffinityAssignment(assignment));
@@ -745,7 +760,9 @@ public class GridAffinityAssignmentCache {
idealAssignment(aff.idealAssignment());
- initialize(aff.lastVersion(), aff.assignments(aff.lastVersion()));
+ AffinityAssignment assign = aff.cachedAffinity(aff.lastVersion());
+
+ initialize(aff.lastVersion(), assign.assignment(), assign.mvccCoordinator());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 08333c3..4a0908c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -423,7 +423,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
try {
GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ?
(GridAffinityAssignment)assign0 :
- new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment());
+ new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment(), assign0.mvccCoordinator());
AffinityInfo info = new AffinityInfo(
cctx.config().getAffinity(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
index abd5292..15d7e4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
@@ -184,7 +184,7 @@ class GridAffinityUtils {
GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ?
(GridAffinityAssignment)assign0 :
- new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment());
+ new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment(), assign0.mvccCoordinator());
return F.t(
affinityMessage(ctx, cctx.config().getAffinity()),
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
index 94eaab4..1a6b2ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.affinity;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -44,17 +45,26 @@ public class HistoryAffinityAssignment implements AffinityAssignment {
/** */
private final boolean clientEvtChange;
+ /** */
+ private final MvccCoordinator mvccCrd;
+
/**
* @param assign Assignment.
*/
- public HistoryAffinityAssignment(GridAffinityAssignment assign) {
+ HistoryAffinityAssignment(GridAffinityAssignment assign) {
this.topVer = assign.topologyVersion();
this.assignment = assign.assignment();
this.idealAssignment = assign.idealAssignment();
+ this.mvccCrd = assign.mvccCoordinator();
this.clientEvtChange = assign.clientEventChange();
}
/** {@inheritDoc} */
+ @Override public MvccCoordinator mvccCoordinator() {
+ return mvccCrd;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean clientEventChange() {
return clientEvtChange;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 0b448e1..5859452 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -58,6 +58,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.GridPartitionStateMap;
@@ -458,7 +460,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (grpHolder.client()) {
ClientCacheDhtTopologyFuture topFut = new ClientCacheDhtTopologyFuture(topVer);
- grp.topology().updateTopologyVersion(topFut, discoCache, -1, false);
+ grp.topology().updateTopologyVersion(topFut,
+ discoCache,
+ cctx.coordinators().currentCoordinator(),
+ -1,
+ false);
grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity());
@@ -506,6 +512,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert grp != null;
GridDhtAffinityAssignmentResponse res = fetchAffinity(topVer,
+ cctx.coordinators().currentCoordinator(),
null,
discoCache,
grp.affinity(),
@@ -528,7 +535,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
new ClusterTopologyServerNotFoundException("All server nodes left grid."));
}
- grp.topology().updateTopologyVersion(topFut, discoCache, -1, false);
+ grp.topology().updateTopologyVersion(topFut,
+ discoCache,
+ cctx.coordinators().currentCoordinator(),
+ -1,
+ false);
grp.topology().update(topVer, partMap, null, Collections.<Integer>emptySet(), null, null);
@@ -1284,7 +1295,12 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
fetchFut.init(false);
- fetchAffinity(evts.topologyVersion(), evts, evts.discoveryCache(), aff, fetchFut);
+ fetchAffinity(evts.topologyVersion(),
+ cctx.coordinators().currentCoordinator(),
+ evts,
+ evts.discoveryCache(),
+ aff,
+ fetchFut);
}
}
@@ -1682,6 +1698,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
int grpId = fetchFut.groupId();
fetchAffinity(topVer,
+ cctx.coordinators().currentCoordinator(),
fut.events(),
fut.events().discoveryCache(),
cctx.cache().cacheGroup(grpId).affinity(),
@@ -1691,6 +1708,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
/**
* @param topVer Topology version.
+ * @param mvccCrd Mvcc coordinator to set in affinity.
* @param events Discovery events.
* @param discoCache Discovery data cache.
* @param affCache Affinity.
@@ -1698,7 +1716,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @throws IgniteCheckedException If failed.
* @return Affinity assignment response.
*/
- private GridDhtAffinityAssignmentResponse fetchAffinity(AffinityTopologyVersion topVer,
+ private GridDhtAffinityAssignmentResponse fetchAffinity(
+ AffinityTopologyVersion topVer,
+ MvccCoordinator mvccCrd,
@Nullable ExchangeDiscoveryEvents events,
DiscoCache discoCache,
GridAffinityAssignmentCache affCache,
@@ -1711,7 +1731,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (res == null) {
List<List<ClusterNode>> aff = affCache.calculate(topVer, events, discoCache);
- affCache.initialize(topVer, aff);
+ affCache.initialize(topVer, aff, mvccCrd);
}
else {
List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(discoCache);
@@ -1728,7 +1748,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert aff != null : res;
- affCache.initialize(topVer, aff);
+ affCache.initialize(topVer, aff, mvccCrd);
}
return res;
@@ -1847,6 +1867,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
@Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
throws IgniteCheckedException {
fetchAffinity(prev.topologyVersion(),
+ null, // Pass null mvcc coordinator, this affinity version should be used for queries.
prev.events(),
prev.events().discoveryCache(),
aff,
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInfoCollection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInfoCollection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInfoCollection.java
index 49f77fa..614d7c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInfoCollection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInfoCollection.java
@@ -37,6 +37,18 @@ public class CacheEntryInfoCollection implements Message {
@GridDirectCollection(GridCacheEntryInfo.class)
private List<GridCacheEntryInfo> infos;
+ /** */
+ public CacheEntryInfoCollection() {
+ // No-op
+ }
+
+ /**
+ * @param infos List of cache entry info.
+ */
+ public CacheEntryInfoCollection(List<GridCacheEntryInfo> infos) {
+ this.infos = infos;
+ }
+
/**
*
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index c8fe283..c100d16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -30,6 +30,7 @@ import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TopologyValidator;
import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -60,6 +61,7 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
@@ -147,6 +149,9 @@ public class CacheGroupContext {
/** */
private boolean qryEnabled;
+ /** */
+ private boolean mvccEnabled;
+
/** MXBean. */
private CacheGroupMetricsMXBean mxBean;
@@ -218,6 +223,28 @@ public class CacheGroupContext {
caches = new ArrayList<>();
mxBean = new CacheGroupMetricsMXBeanImpl(this);
+
+ mvccEnabled = mvccEnabled(ctx.gridConfig(), ccfg, cacheType);
+ }
+
+ /**
+ * @param cfg Ignite configuration.
+ * @param ccfg Cache configuration.
+ * @param cacheType Cache typr.
+ * @return {@code True} if mvcc is enabled for given cache.
+ */
+ public static boolean mvccEnabled(IgniteConfiguration cfg, CacheConfiguration ccfg, CacheType cacheType) {
+ return cfg.isMvccEnabled() &&
+ cacheType == CacheType.USER &&
+ ccfg.getCacheMode() != LOCAL &&
+ ccfg.getAtomicityMode() == TRANSACTIONAL;
+ }
+
+ /**
+ * @return Mvcc flag.
+ */
+ public boolean mvccEnabled() {
+ return mvccEnabled;
}
/**
@@ -396,6 +423,13 @@ public class CacheGroupContext {
}
/**
+ * @return {@code True} if cache created by user.
+ */
+ public boolean userCache() {
+ return cacheType.userCache();
+ }
+
+ /**
* Adds rebalancing event.
*
* @param part Partition.
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
index 8a7afe7..53f9e22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
@@ -163,7 +163,7 @@ public class CacheOperationContext implements Serializable {
/**
* Gets data center ID.
*
- * @return Client ID.
+ * @return Datacenter ID.
*/
@Nullable public Byte dataCenterId() {
return dataCenterId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 3aaf7f3..8eab9c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -147,7 +147,7 @@ class ClusterCachesInfo {
if (ccfg == null)
grpCfgs.put(info.cacheData().config().getGroupName(), info.cacheData().config());
else
- validateCacheGroupConfiguration(ccfg, info.cacheData().config());
+ validateCacheGroupConfiguration(ccfg, info.cacheData().config(), info.cacheType());
}
String conflictErr = processJoiningNode(joinDiscoData, ctx.localNodeId(), true);
@@ -220,7 +220,7 @@ class ClusterCachesInfo {
}
if (checkConsistency)
- validateStartCacheConfiguration(locCfg);
+ validateStartCacheConfiguration(locCfg, cacheData.cacheType());
}
}
@@ -1864,16 +1864,17 @@ class ClusterCachesInfo {
/**
* @param ccfg Cache configuration to start.
+ * @param cacheType Cache type.
* @throws IgniteCheckedException If failed.
*/
- public void validateStartCacheConfiguration(CacheConfiguration ccfg) throws IgniteCheckedException {
+ void validateStartCacheConfiguration(CacheConfiguration ccfg, CacheType cacheType) throws IgniteCheckedException {
if (ccfg.getGroupName() != null) {
CacheGroupDescriptor grpDesc = cacheGroupByName(ccfg.getGroupName());
if (grpDesc != null) {
assert ccfg.getGroupName().equals(grpDesc.groupName());
- validateCacheGroupConfiguration(grpDesc.config(), ccfg);
+ validateCacheGroupConfiguration(grpDesc.config(), ccfg, cacheType);
}
}
}
@@ -1881,9 +1882,10 @@ class ClusterCachesInfo {
/**
* @param cfg Existing configuration.
* @param startCfg Cache configuration to start.
+ * @param cacheType Cache type.
* @throws IgniteCheckedException If validation failed.
*/
- private void validateCacheGroupConfiguration(CacheConfiguration cfg, CacheConfiguration startCfg)
+ private void validateCacheGroupConfiguration(CacheConfiguration cfg, CacheConfiguration startCfg, CacheType cacheType)
throws IgniteCheckedException {
GridCacheAttributes attr1 = new GridCacheAttributes(cfg);
GridCacheAttributes attr2 = new GridCacheAttributes(startCfg);
@@ -1891,6 +1893,11 @@ class ClusterCachesInfo {
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "cacheMode", "Cache mode",
cfg.getCacheMode(), startCfg.getCacheMode(), true);
+ CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "mvccEnabled", "MVCC mode",
+ CacheGroupContext.mvccEnabled(ctx.config(), cfg, cacheType),
+ CacheGroupContext.mvccEnabled(ctx.config(), startCfg, cacheType),
+ true);
+
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "affinity", "Affinity function",
attr1.cacheAffinityClassName(), attr2.cacheAffinityClassName(), true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index 4046c98..34ed048 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -17,11 +17,15 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
@@ -51,11 +55,20 @@ public class ExchangeContext {
/** */
private final boolean compatibilityNode = getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, false);
+ /** */
+ private final boolean newMvccCrd;
+
+ /** Currently running mvcc queries, initialized when mvcc coordinator is changed. */
+ private Map<UUID, GridLongList> activeQueries;
+
/**
* @param crd Coordinator flag.
+ * @param newMvccCrd {@code True} if new coordinator assigned during this exchange.
* @param fut Exchange future.
*/
- public ExchangeContext(boolean crd, GridDhtPartitionsExchangeFuture fut) {
+ public ExchangeContext(boolean crd, boolean newMvccCrd, GridDhtPartitionsExchangeFuture fut) {
+ this.newMvccCrd = newMvccCrd;
+
int protocolVer = exchangeProtocolVersion(fut.firstEventCache().minimumNodeVersion());
if (compatibilityNode || (crd && fut.localJoinExchange())) {
@@ -124,6 +137,34 @@ public class ExchangeContext {
return merge;
}
+ /**
+ * @return {@code True} if new node assigned as mvcc coordinator node during this exchange.
+ */
+ public boolean newMvccCoordinator() {
+ return newMvccCrd;
+ }
+
+ /**
+ * @return Active queries.
+ */
+ public Map<UUID, GridLongList> activeQueries() {
+ return activeQueries;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param nodeQueries Node queries.
+ */
+ public void addActiveQueries(UUID nodeId, @Nullable GridLongList nodeQueries) {
+ if (nodeQueries == null)
+ return;
+
+ if (activeQueries == null)
+ activeQueries = new HashMap<>();
+
+ activeQueries.put(nodeId, nodeQueries);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ExchangeContext.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
index 96c5e29..c99eb00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
@@ -49,6 +49,7 @@ import org.apache.ignite.cache.query.QueryMetrics;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.internal.AsyncSupportAdapter;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.GridKernalState;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
@@ -145,6 +146,8 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
CacheOperationGate opGate = onEnter();
try {
+ MvccUtils.verifyMvccOperationSupport(delegate.context(), "withExpiryPolicy");
+
return new GatewayProtectedCacheProxy<>(delegate, opCtx.withExpiryPolicy(plc), lock);
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index bc14c54..159d681 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -92,6 +92,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
@@ -138,7 +140,6 @@ import org.apache.ignite.mxbean.CacheMetricsMXBean;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.JobContextResource;
-import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -816,6 +817,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.checkSecurity(SecurityPermission.CACHE_READ);
+ //TODO IGNITE-7955
+ MvccUtils.verifyMvccOperationSupport(ctx, "Peek");
+
PeekModes modes = parsePeekModes(peekModes, false);
KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
@@ -1121,6 +1125,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
@Override public void clearLocally(boolean srv, boolean near, boolean readers) {
ctx.checkSecurity(SecurityPermission.CACHE_REMOVE);
+ //TODO IGNITE-7952
+ MvccUtils.verifyMvccOperationSupport(ctx, "Clear");
+
List<GridCacheClearAllRunnable<K, V>> jobs = splitClearLocally(srv, near, readers);
if (!F.isEmpty(jobs)) {
@@ -1190,6 +1197,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @throws IgniteCheckedException In case of error.
*/
private void clear(@Nullable Set<? extends K> keys) throws IgniteCheckedException {
+ //TODO IGNITE-7952
+ MvccUtils.verifyMvccOperationSupport(ctx, "Clear");
+
if (isLocal()) {
if (keys == null)
clearLocally(true, false, false);
@@ -1207,6 +1217,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Future.
*/
private IgniteInternalFuture<?> clearAsync(@Nullable final Set<? extends K> keys) {
+ //TODO IGNITE-7952
+ MvccUtils.verifyMvccOperationSupport(ctx, "Clear");
+
if (isLocal())
return clearLocallyAsync(keys);
else
@@ -1262,6 +1275,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (F.isEmpty(keys))
return;
+ //TODO IGNITE-7952
+ MvccUtils.verifyMvccOperationSupport(ctx, "Clear");
+
GridCacheVersion obsoleteVer = ctx.versions().next();
for (KeyCacheObject key : keys) {
@@ -1881,7 +1897,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
skipVals,
/*keep cache objects*/false,
recovery,
- needVer);
+ needVer,
+ null); // TODO IGNITE-7371
}
/**
@@ -1896,6 +1913,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param skipVals Skip values flag.
* @param keepCacheObjects Keep cache objects.
* @param needVer If {@code true} returns values as tuples containing value and version.
+ * @param mvccSnapshot MVCC snapshot.
* @return Future.
*/
protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
@@ -1910,7 +1928,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final boolean skipVals,
final boolean keepCacheObjects,
final boolean recovery,
- final boolean needVer
+ final boolean needVer,
+ MvccSnapshot mvccSnapshot
) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap());
@@ -1967,7 +1986,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
boolean skipEntry = readNoEntry;
if (readNoEntry) {
- CacheDataRow row = ctx.offheap().read(ctx, key);
+ CacheDataRow row = mvccSnapshot != null ? ctx.offheap().mvccRead(ctx, key, mvccSnapshot) :
+ ctx.offheap().read(ctx, key);
if (row != null) {
long expireTime = row.expireTime();
@@ -2030,6 +2050,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
expiry,
!deserializeBinary,
+ mvccSnapshot,
readerArgs);
assert res != null;
@@ -2054,6 +2075,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
expiry,
!deserializeBinary,
+ mvccSnapshot,
readerArgs);
if (res == null)
@@ -3072,7 +3094,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
List<K> keys = new ArrayList<>(Math.min(REMOVE_ALL_KEYS_BATCH, size()));
do {
- for (Iterator<CacheDataRow> it = ctx.offheap().cacheIterator(ctx.cacheId(), true, true, null);
+ for (Iterator<CacheDataRow> it = ctx.offheap().cacheIterator(ctx.cacheId(), true, true, null, null);
it.hasNext() && keys.size() < REMOVE_ALL_KEYS_BATCH; )
keys.add((K)it.next().key());
@@ -3397,6 +3419,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (keyCheck)
validateCacheKeys(keys);
+ //TODO IGNITE-7764
+ MvccUtils.verifyMvccOperationSupport(ctx, "Lock");
+
IgniteInternalFuture<Boolean> fut = lockAllAsync(keys, timeout);
boolean isInterrupted = false;
@@ -3425,6 +3450,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (keyCheck)
validateCacheKey(key);
+ //TODO IGNITE-7764
+ MvccUtils.verifyMvccOperationSupport(ctx, "Lock");
+
return lockAllAsync(Collections.singletonList(key), timeout);
}
@@ -3534,6 +3562,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public void localLoadCache(final IgniteBiPredicate<K, V> p, Object[] args)
throws IgniteCheckedException {
+ //TODO IGNITE-7954
+ MvccUtils.verifyMvccOperationSupport(ctx, "Load");
+
final boolean replicate = ctx.isDrEnabled();
final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
@@ -3671,6 +3702,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (!ctx.store().configured())
return new GridFinishedFuture<>();
+ //TODO IGNITE-7954
+ MvccUtils.verifyMvccOperationSupport(ctx, "Load");
+
CacheOperationContext opCtx = ctx.operationContextPerCall();
ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null;
@@ -3755,6 +3789,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
public void localLoad(Collection<? extends K> keys, @Nullable ExpiryPolicy plc, final boolean keepBinary)
throws IgniteCheckedException {
+ //TODO IGNITE-7954
+ MvccUtils.verifyMvccOperationSupport(ctx, "Load");
+
final boolean replicate = ctx.isDrEnabled();
final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
@@ -3827,6 +3864,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
assert !F.isEmpty(nodes) : "There are not datanodes fo cache: " + ctx.name();
+ //TODO IGNITE-7954
+ MvccUtils.verifyMvccOperationSupport(ctx, "Load");
+
final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
ComputeTaskInternalFuture fut = ctx.kernalContext().closure().callAsync(BROADCAST,
@@ -4185,6 +4225,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
READ_COMMITTED,
tCfg.getDefaultTxTimeout(),
!ctx.skipStore(),
+ false,
0,
null
);
@@ -4286,6 +4327,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
READ_COMMITTED,
txCfg.getDefaultTxTimeout(),
!skipStore,
+ false,
0,
null);
@@ -4556,6 +4598,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param readers Whether to clear readers.
*/
private boolean clearLocally0(K key, boolean readers) {
+ //TODO IGNITE-7952
+ MvccUtils.verifyMvccOperationSupport(ctx, "Clear");
+
ctx.checkSecurity(SecurityPermission.CACHE_REMOVE);
if (keyCheck)
@@ -4593,6 +4638,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (keyCheck)
validateCacheKey(key);
+ //TODO IGNITE-7956
+ MvccUtils.verifyMvccOperationSupport(ctx, "Evict");
+
return evictx(key, ctx.versions().next(), CU.empty0());
}
@@ -4606,6 +4654,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (keyCheck)
validateCacheKey(keys);
+ //TODO IGNITE-7956
+ MvccUtils.verifyMvccOperationSupport(ctx, "Evict");
+
GridCacheVersion obsoleteVer = ctx.versions().next();
try {
@@ -4890,7 +4941,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/*transformClo*/null,
/*taskName*/null,
/*expiryPlc*/null,
- !deserializeBinary);
+ !deserializeBinary,
+ null); // TODO IGNITE-7371
if (val == null)
return null;
@@ -4950,6 +5002,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
READ_COMMITTED,
CU.transactionConfiguration(ctx, ctx.kernalContext().config()).getDefaultTxTimeout(),
opCtx == null || !opCtx.skipStore(),
+ false,
0,
null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index c9ee38c..cf4344d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
@@ -243,6 +244,10 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
return aff0.cachedAffinity(topVer);
}
+ public MvccCoordinator mvccCoordinator(AffinityTopologyVersion topVer) {
+ return assignment(topVer).mvccCoordinator();
+ }
+
/**
* @param key Key to check.
* @param topVer Topology version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 61b1878..ac9de7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2114,6 +2114,13 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * @return {@code True} if mvcc is enabled for cache.
+ */
+ public boolean mvccEnabled() {
+ return grp.mvccEnabled();
+ }
+
+ /**
* @param part Partition.
* @param topVer Topology version.
* @return {@code True} if partition is available locally.