You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2024/02/08 16:57:05 UTC
(ignite) branch master updated: IGNITE-21436 Removal of MVCC *Enlist* classes (#11217)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6286f67735b IGNITE-21436 Removal of MVCC *Enlist* classes (#11217)
6286f67735b is described below
commit 6286f67735b2eb7312bc61d5e0549b22f424348f
Author: Ilya Shishkov <sh...@gmail.com>
AuthorDate: Thu Feb 8 19:56:58 2024 +0300
IGNITE-21436 Removal of MVCC *Enlist* classes (#11217)
---
.../ignite/codegen/MessageCodeGenerator.java | 2 -
.../communication/GridIoMessageFactory.java | 20 -
.../processors/cache/GridCacheIoManager.java | 68 --
.../cache/distributed/dht/ExceptionAware.java | 30 -
.../dht/GridDhtTransactionalCacheAdapter.java | 508 ---------
.../dht/GridDhtTxAbstractEnlistFuture.java | 1156 --------------------
.../distributed/dht/GridDhtTxEnlistFuture.java | 157 ---
.../distributed/dht/GridDhtTxLocalAdapter.java | 14 -
.../dht/GridDhtTxQueryAbstractEnlistFuture.java | 77 --
.../dht/GridDhtTxQueryEnlistFuture.java | 180 ---
.../dht/GridDhtTxQueryEnlistRequest.java | 408 -------
.../dht/GridDhtTxQueryEnlistResponse.java | 205 ----
.../dht/GridDhtTxQueryFirstEnlistRequest.java | 336 ------
.../dht/GridDhtTxQueryResultsEnlistFuture.java | 104 --
.../cache/distributed/dht/GridInvokeValue.java | 238 ----
.../dht/NearTxQueryEnlistResultHandler.java | 162 ---
.../cache/distributed/dht/NearTxResultHandler.java | 131 ---
.../near/GridNearTxAbstractEnlistFuture.java | 491 ---------
.../distributed/near/GridNearTxEnlistFuture.java | 691 ------------
.../distributed/near/GridNearTxEnlistRequest.java | 679 ------------
.../distributed/near/GridNearTxEnlistResponse.java | 379 -------
.../near/GridNearTxFinishAndAckFuture.java | 87 --
.../near/GridNearTxQueryAbstractEnlistFuture.java | 36 -
.../near/GridNearTxQueryEnlistFuture.java | 412 -------
.../near/GridNearTxQueryEnlistRequest.java | 599 ----------
.../near/GridNearTxQueryEnlistResponse.java | 332 ------
.../near/GridNearTxQueryResultsEnlistFuture.java | 644 -----------
.../near/GridNearTxQueryResultsEnlistRequest.java | 550 ----------
.../near/GridNearTxQueryResultsEnlistResponse.java | 181 ---
.../cache/transactions/IgniteTxHandler.java | 165 ---
.../cache/transactions/IgniteTxLocalAdapter.java | 5 -
.../cache/transactions/IgniteTxLocalEx.java | 8 -
.../cache/transactions/IgniteTxLocalState.java | 16 -
.../transactions/IgniteTxLocalStateAdapter.java | 27 -
.../processors/cache/transactions/TxCounters.java | 18 -
.../main/resources/META-INF/classnames.properties | 18 -
.../twostep/AbstractPartitionPruningBaseTest.java | 14 -
37 files changed, 9148 deletions(-)
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index b5e5dee1e36..78bddeb0d2e 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -240,8 +240,6 @@ public class MessageCodeGenerator {
// gen.generateAndWrite(GridCacheVersionEx.class);
// gen.generateAndWrite(GridH2DmlRequest.class);
// gen.generateAndWrite(GridH2DmlResponse.class);
-// gen.generateAndWrite(GridNearTxEnlistRequest.class);
-// gen.generateAndWrite(GridNearTxEnlistResponse.class);
// gen.generateAndWrite(GenerateEncryptionKeyRequest.class);
// gen.generateAndWrite(GenerateEncryptionKeyResponse.class);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 0a2eb4bda98..7d0b63aca42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -72,11 +72,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFini
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryFirstEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
@@ -110,16 +106,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionImpl;
@@ -329,20 +319,10 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
factory.register((short)134, ContinuousRoutineStartResultMessage::new);
factory.register((short)135, LatchAckMessage::new);
factory.register((short)143, GridCacheMvccEntryInfo::new);
- factory.register((short)144, GridDhtTxQueryEnlistResponse::new);
factory.register((short)148, MvccVersionImpl::new);
factory.register((short)150, MvccSnapshotWithoutTxs::new);
- factory.register((short)151, GridNearTxQueryEnlistRequest::new);
- factory.register((short)152, GridNearTxQueryEnlistResponse::new);
- factory.register((short)153, GridNearTxQueryResultsEnlistRequest::new);
- factory.register((short)154, GridNearTxQueryResultsEnlistResponse::new);
- factory.register((short)155, GridDhtTxQueryEnlistRequest::new);
- factory.register((short)156, GridDhtTxQueryFirstEnlistRequest::new);
factory.register((short)157, PartitionUpdateCountersMessage::new);
factory.register((short)158, GridDhtPartitionSupplyMessageV2::new);
- factory.register((short)159, GridNearTxEnlistRequest::new);
- factory.register((short)160, GridNearTxEnlistResponse::new);
- factory.register((short)161, GridInvokeValue::new);
factory.register((short)162, GenerateEncryptionKeyRequest::new);
factory.register((short)163, GenerateEncryptionKeyResponse::new);
factory.register((short)167, ServiceDeploymentProcessId::new);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index c9457994b16..b4f291977ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -52,8 +52,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFini
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
@@ -76,16 +74,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
@@ -1060,66 +1052,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
- case 151: {
- GridNearTxQueryEnlistRequest req = (GridNearTxQueryEnlistRequest)msg;
-
- GridNearTxQueryEnlistResponse res = new GridNearTxQueryEnlistResponse(
- req.cacheId(),
- req.futureId(),
- req.miniId(),
- req.version(),
- req.classError());
-
- sendResponseOnFailedMessage(nodeId, res, cctx, plc);
-
- break;
- }
-
- case 153: {
- GridNearTxQueryResultsEnlistRequest req = (GridNearTxQueryResultsEnlistRequest)msg;
-
- GridNearTxQueryEnlistResponse res = new GridNearTxQueryResultsEnlistResponse(
- req.cacheId(),
- req.futureId(),
- req.miniId(),
- req.version(),
- req.classError());
-
- sendResponseOnFailedMessage(nodeId, res, cctx, plc);
-
- break;
- }
-
- case 155: /* GridDhtTxQueryEnlistRequest */
- case 156: /* GridDhtTxQueryFirstEnlistRequest */ {
- GridDhtTxQueryEnlistRequest req = (GridDhtTxQueryEnlistRequest)msg;
-
- GridDhtTxQueryEnlistResponse res = new GridDhtTxQueryEnlistResponse(
- req.cacheId(),
- req.dhtFutureId(),
- req.batchId(),
- req.classError());
-
- sendResponseOnFailedMessage(nodeId, res, cctx, plc);
-
- break;
- }
-
- case 159: {
- GridNearTxEnlistRequest req = (GridNearTxEnlistRequest)msg;
-
- GridNearTxEnlistResponse res = new GridNearTxEnlistResponse(
- req.cacheId(),
- req.futureId(),
- req.miniId(),
- req.version(),
- req.classError());
-
- sendResponseOnFailedMessage(nodeId, res, cctx, plc);
-
- break;
- }
-
case -36: {
GridDhtAtomicSingleUpdateRequest req = (GridDhtAtomicSingleUpdateRequest)msg;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ExceptionAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ExceptionAware.java
deleted file mode 100644
index 036492fc7c2..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ExceptionAware.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public interface ExceptionAware {
- /**
- * @return Exception.
- */
- @Nullable Throwable error();
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index e25fff47960..9bca9f187a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -28,9 +28,7 @@ import java.util.ListIterator;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -62,15 +60,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistFuture;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistFuture;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistFuture;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxRemote;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -92,7 +81,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
@@ -102,7 +90,6 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOO
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
import static org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
/**
@@ -164,12 +151,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDhtUnlockRequest.class,
(CI2<UUID, GridDhtUnlockRequest>)this::processDhtUnlockRequest);
- ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearTxQueryEnlistRequest.class,
- (CI2<UUID, GridNearTxQueryEnlistRequest>)this::processNearTxQueryEnlistRequest);
-
- ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearTxQueryEnlistResponse.class,
- (CI2<UUID, GridNearTxQueryEnlistResponse>)this::processNearTxQueryEnlistResponse);
-
ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDhtForceKeysRequest.class,
new MessageHandler<GridDhtForceKeysRequest>() {
@Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) {
@@ -183,35 +164,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
processForceKeyResponse(node, msg);
}
});
-
- ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearTxQueryResultsEnlistRequest.class,
- (CI2<UUID, GridNearTxQueryResultsEnlistRequest>)this::processNearTxQueryResultsEnlistRequest);
-
- ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearTxQueryResultsEnlistResponse.class,
- (CI2<UUID, GridNearTxQueryResultsEnlistResponse>)this::processNearTxQueryResultsEnlistResponse);
-
- ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearTxEnlistRequest.class,
- (CI2<UUID, GridNearTxEnlistRequest>)this::processNearTxEnlistRequest);
-
- ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearTxEnlistResponse.class,
- (CI2<UUID, GridNearTxEnlistResponse>)this::processNearTxEnlistResponse);
-
- ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDhtTxQueryEnlistRequest.class,
- new CI2<UUID, GridDhtTxQueryEnlistRequest>() {
- @Override public void apply(UUID nodeId, GridDhtTxQueryEnlistRequest msg) {
- processDhtTxQueryEnlistRequest(nodeId, msg, false);
- }
- });
-
- ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDhtTxQueryFirstEnlistRequest.class,
- new CI2<UUID, GridDhtTxQueryEnlistRequest>() {
- @Override public void apply(UUID nodeId, GridDhtTxQueryEnlistRequest msg) {
- processDhtTxQueryEnlistRequest(nodeId, msg, true);
- }
- });
-
- ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDhtTxQueryEnlistResponse.class,
- (CI2<UUID, GridDhtTxQueryEnlistResponse>)this::processDhtTxQueryEnlistResponse);
}
/** {@inheritDoc} */
@@ -657,72 +609,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
near().clearLocks(nodeId, req);
}
- /**
- * @param nodeId Node ID.
- * @param req Request.
- */
- private void processNearTxQueryEnlistRequest(UUID nodeId, final GridNearTxQueryEnlistRequest req) {
- assert nodeId != null;
- assert req != null;
-
- ClusterNode nearNode = ctx.discovery().node(nodeId);
-
- GridDhtTxLocal tx;
-
- try {
- tx = initTxTopologyVersion(nodeId,
- nearNode,
- req.version(),
- req.futureId(),
- req.miniId(),
- req.firstClientRequest(),
- req.topologyVersion(),
- req.threadId(),
- req.txTimeout(),
- req.taskNameHash());
- }
- catch (IgniteCheckedException | IgniteException ex) {
- GridNearTxQueryEnlistResponse res = new GridNearTxQueryEnlistResponse(req.cacheId(),
- req.futureId(),
- req.miniId(),
- req.version(),
- ex);
-
- try {
- ctx.io().send(nearNode, res, ctx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send near enlist response [" +
- "txId=" + req.version() +
- ", node=" + nodeId +
- ", res=" + res + ']', e);
- }
-
- return;
- }
-
- GridDhtTxQueryEnlistFuture fut = new GridDhtTxQueryEnlistFuture(
- nodeId,
- req.version(),
- req.mvccSnapshot(),
- req.futureId(),
- req.miniId(),
- tx,
- req.cacheIds(),
- req.partitions(),
- req.schemaName(),
- req.query(),
- req.parameters(),
- req.flags(),
- req.pageSize(),
- req.timeout(),
- ctx);
-
- fut.listen(NearTxQueryEnlistResultHandler.instance());
-
- fut.init();
- }
-
/**
* @param nodeId Node ID.
* @param req Request.
@@ -1891,398 +1777,4 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (nearEntry != null)
nearEntry.markObsolete(nextVersion());
}
-
- /**
- * @param nodeId Node ID.
- * @param req Request.
- */
- private void processNearTxQueryResultsEnlistRequest(UUID nodeId, final GridNearTxQueryResultsEnlistRequest req) {
- assert nodeId != null;
- assert req != null;
-
- ClusterNode nearNode = ctx.discovery().node(nodeId);
-
- GridDhtTxLocal tx;
-
- try {
- tx = initTxTopologyVersion(nodeId,
- nearNode,
- req.version(),
- req.futureId(),
- req.miniId(),
- req.firstClientRequest(),
- req.topologyVersion(),
- req.threadId(),
- req.txTimeout(),
- req.taskNameHash());
- }
- catch (Throwable e) {
- GridNearTxQueryResultsEnlistResponse res = new GridNearTxQueryResultsEnlistResponse(req.cacheId(),
- req.futureId(),
- req.miniId(),
- req.version(),
- e);
-
- try {
- ctx.io().send(nearNode, res, ctx.ioPolicy());
- }
- catch (IgniteCheckedException ioEx) {
- U.error(log, "Failed to send near enlist response " +
- "[txId=" + req.version() + ", node=" + nodeId + ", res=" + res + ']', ioEx);
- }
-
- if (e instanceof Error)
- throw (Error)e;
-
- return;
- }
-
- GridDhtTxQueryResultsEnlistFuture fut = new GridDhtTxQueryResultsEnlistFuture(
- nodeId,
- req.version(),
- req.mvccSnapshot(),
- req.futureId(),
- req.miniId(),
- tx,
- req.timeout(),
- ctx,
- req.rows(),
- req.operation());
-
- fut.listen(NearTxQueryEnlistResultHandler.instance());
-
- fut.init();
- }
-
- /**
- * @param nodeId Node ID.
- * @param req Request.
- */
- private void processNearTxEnlistRequest(UUID nodeId, final GridNearTxEnlistRequest req) {
- assert nodeId != null;
- assert req != null;
-
- ClusterNode nearNode = ctx.discovery().node(nodeId);
-
- GridDhtTxLocal tx;
-
- try {
- tx = initTxTopologyVersion(nodeId,
- nearNode,
- req.version(),
- req.futureId(),
- req.miniId(),
- req.firstClientRequest(),
- req.topologyVersion(),
- req.threadId(),
- req.txTimeout(),
- req.taskNameHash());
- }
- catch (IgniteCheckedException | IgniteException ex) {
- GridNearTxEnlistResponse res = new GridNearTxEnlistResponse(req.cacheId(),
- req.futureId(),
- req.miniId(),
- req.version(),
- ex);
-
- try {
- ctx.io().send(nearNode, res, ctx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send near enlist response [" +
- "txId=" + req.version() +
- ", node=" + nodeId +
- ", res=" + res + ']', e);
- }
-
- return;
- }
-
- GridDhtTxEnlistFuture fut = new GridDhtTxEnlistFuture(
- nodeId,
- req.version(),
- req.mvccSnapshot(),
- req.futureId(),
- req.miniId(),
- tx,
- req.timeout(),
- ctx,
- req.rows(),
- req.operation(),
- req.filter(),
- req.needRes(),
- req.keepBinary());
-
- fut.listen(NearTxResultHandler.instance());
-
- fut.init();
- }
-
- /**
- * @param nodeId Near node id.
- * @param nearNode Near node.
- * @param nearLockVer Near lock version.
- * @param nearFutId Near future id.
- * @param nearMiniId Near mini-future id.
- * @param firstClientReq First client request flag.
- * @param topVer Topology version.
- * @param nearThreadId Near node thread id.
- * @param timeout Timeout.
- * @param txTaskNameHash Transaction task name hash.
- * @return Transaction.
- */
- public GridDhtTxLocal initTxTopologyVersion(UUID nodeId,
- ClusterNode nearNode,
- GridCacheVersion nearLockVer,
- IgniteUuid nearFutId,
- int nearMiniId,
- boolean firstClientReq,
- AffinityTopologyVersion topVer,
- long nearThreadId,
- long timeout,
- int txTaskNameHash
- ) throws IgniteException, IgniteCheckedException {
-
- assert ctx.affinityNode();
-
- if (txLockMsgLog.isDebugEnabled()) {
- txLockMsgLog.debug("Received near enlist request [txId=" + nearLockVer +
- ", node=" + nodeId + ']');
- }
-
- if (nearNode == null) {
- U.warn(txLockMsgLog, "Received near enlist request from unknown node (will ignore) [txId=" + nearLockVer +
- ", node=" + nodeId + ']');
-
- return null;
- }
-
- GridDhtTxLocal tx = null;
-
- GridCacheVersion dhtVer = ctx.tm().mappedVersion(nearLockVer);
-
- if (dhtVer != null)
- tx = ctx.tm().tx(dhtVer);
-
- GridDhtPartitionTopology top = null;
-
- if (tx == null) {
- if (firstClientReq) {
- assert nearNode.isClient();
-
- top = topology();
-
- top.readLock();
-
- GridDhtTopologyFuture topFut = top.topologyVersionFuture();
-
- boolean done = topFut.isDone();
-
- if (!done || !(topFut.topologyVersion().compareTo(topVer) >= 0
- && ctx.shared().exchange().lastAffinityChangedTopologyVersion(topFut.initialVersion()).compareTo(topVer) <= 0)) {
- // TODO IGNITE-7164 Wait for topology change, remap client TX in case affinity was changed.
- top.readUnlock();
-
- throw new ClusterTopologyException("Topology was changed. Please retry on stable topology.");
- }
- }
-
- try {
- tx = new GridDhtTxLocal(
- ctx.shared(),
- topVer,
- nearNode.id(),
- nearLockVer,
- nearFutId,
- nearMiniId,
- nearThreadId,
- false,
- false,
- ctx.systemTx(),
- false,
- ctx.ioPolicy(),
- PESSIMISTIC,
- REPEATABLE_READ,
- timeout,
- false,
- false,
- false,
- -1,
- null,
- securitySubjectId(ctx),
- txTaskNameHash,
- null,
- null);
-
- // if (req.syncCommit())
- tx.syncMode(FULL_SYNC);
-
- tx = ctx.tm().onCreated(null, tx);
-
- if (tx == null || !tx.init()) {
- String msg = "Failed to acquire lock (transaction has been completed): " +
- nearLockVer;
-
- U.warn(log, msg);
-
- try {
- if (tx != null)
- tx.rollbackDhtLocal();
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to rollback the transaction: " + tx, ex);
- }
-
- throw new IgniteCheckedException(msg);
- }
-
- tx.topologyVersion(topVer);
- }
- finally {
- if (top != null)
- top.readUnlock();
- }
- }
-
- ctx.tm().txContext(tx);
-
- return tx;
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- */
- private void processNearTxEnlistResponse(UUID nodeId, final GridNearTxEnlistResponse res) {
- GridNearTxEnlistFuture fut = (GridNearTxEnlistFuture)
- ctx.mvcc().versionedFuture(res.version(), res.futureId());
-
- if (fut != null)
- fut.onResult(nodeId, res);
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- */
- private void processNearTxQueryEnlistResponse(UUID nodeId, final GridNearTxQueryEnlistResponse res) {
- GridNearTxQueryEnlistFuture fut = (GridNearTxQueryEnlistFuture)ctx.mvcc().versionedFuture(res.version(), res.futureId());
-
- if (fut != null)
- fut.onResult(res);
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- */
- private void processNearTxQueryResultsEnlistResponse(UUID nodeId, final GridNearTxQueryResultsEnlistResponse res) {
- GridNearTxQueryResultsEnlistFuture fut = (GridNearTxQueryResultsEnlistFuture)
- ctx.mvcc().versionedFuture(res.version(), res.futureId());
-
- if (fut != null)
- fut.onResult(nodeId, res);
- }
-
- /**
- * @param primary Primary node.
- * @param req Message.
- * @param first Flag if this is a first request in current operation.
- */
- private void processDhtTxQueryEnlistRequest(UUID primary, GridDhtTxQueryEnlistRequest req, boolean first) {
- try {
- assert req.version() != null && req.op() != null;
-
- GridDhtTxRemote tx = ctx.tm().tx(req.version());
-
- if (tx == null) {
- if (!first)
- throw new IgniteCheckedException("Can not find a transaction for version [version="
- + req.version() + ']');
-
- GridDhtTxQueryFirstEnlistRequest req0 = (GridDhtTxQueryFirstEnlistRequest)req;
-
- tx = new GridDhtTxRemote(ctx.shared(),
- req0.nearNodeId(),
- primary,
- req0.nearXidVersion(),
- req0.topologyVersion(),
- req0.version(),
- null,
- ctx.systemTx(),
- ctx.ioPolicy(),
- PESSIMISTIC,
- REPEATABLE_READ,
- false,
- req0.timeout(),
- -1,
- securitySubjectId(ctx),
- req0.taskNameHash(),
- false,
- null);
-
- tx = ctx.tm().onCreated(null, tx);
-
- if (tx == null || !ctx.tm().onStarted(tx)) {
- throw new IgniteTxRollbackCheckedException("Failed to update backup " +
- "(transaction has been completed): " + req0.version());
- }
- }
-
- assert tx != null;
-
- ctx.tm().txHandler().mvccEnlistBatch(tx, ctx, req.op(), req.keys(), req.values(), null,
- req.dhtFutureId(), req.batchId());
-
- GridDhtTxQueryEnlistResponse res = new GridDhtTxQueryEnlistResponse(req.cacheId(),
- req.dhtFutureId(),
- req.batchId(),
- null);
-
- try {
- ctx.io().send(primary, res, ctx.ioPolicy());
- }
- catch (IgniteCheckedException ioEx) {
- U.error(log, "Failed to send DHT enlist reply to primary node [node: " + primary + ", req=" +
- req + ']', ioEx);
- }
- }
- catch (Throwable e) {
- GridDhtTxQueryEnlistResponse res = new GridDhtTxQueryEnlistResponse(ctx.cacheId(),
- req.dhtFutureId(),
- req.batchId(),
- e);
-
- try {
- ctx.io().send(primary, res, ctx.ioPolicy());
- }
- catch (IgniteCheckedException ioEx) {
- U.error(log, "Failed to send DHT enlist reply to primary node " +
- "[node: " + primary + ", req=" + req + ']', ioEx);
- }
-
- if (e instanceof Error)
- throw (Error)e;
- }
- }
-
- /**
- * @param backup Backup node.
- * @param res Response message.
- */
- private void processDhtTxQueryEnlistResponse(UUID backup, GridDhtTxQueryEnlistResponse res) {
- GridDhtTxAbstractEnlistFuture fut = (GridDhtTxAbstractEnlistFuture)
- ctx.mvcc().future(res.futureId());
-
- if (fut == null) {
- U.warn(log, "Received dht enlist response for unknown future [futId=" + res.futureId() +
- ", batchId=" + res.batchId() +
- ", node=" + backup + ']');
-
- return;
- }
-
- fut.onResult(backup, res);
- }
-
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
deleted file mode 100644
index 401d966df62..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ /dev/null
@@ -1,1156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import javax.cache.processor.EntryProcessor;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxAbstractEnlistFuture;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
-import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
-import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.EnlistOperation;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.lang.GridPlainRunnable;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Abstract future processing transaction enlisting and locking.
- */
-public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAdapter<T>
- implements DhtLockFuture<T> {
- /** Done field updater. */
- private static final AtomicIntegerFieldUpdater<GridDhtTxAbstractEnlistFuture> DONE_UPD =
- AtomicIntegerFieldUpdater.newUpdater(GridDhtTxAbstractEnlistFuture.class, "done");
-
- /** SkipCntr field updater. */
- private static final AtomicIntegerFieldUpdater<GridDhtTxAbstractEnlistFuture> SKIP_UPD =
- AtomicIntegerFieldUpdater.newUpdater(GridDhtTxAbstractEnlistFuture.class, "skipCntr");
-
- /** Marker object. */
- private static final Object FINISHED = new Object();
-
- /** */
- private static final int BATCH_SIZE = 1024;
-
- /** In-flight batches per node limit. */
- private static final int BATCHES_PER_NODE = 5;
-
- /** */
- private static final int FIRST_BATCH_ID = 0;
-
- /** Future ID. */
- protected final IgniteUuid futId;
-
- /** Cache registry. */
- @GridToStringExclude
- protected final GridCacheContext<?, ?> cctx;
-
- /** Logger. */
- @GridToStringExclude
- protected final IgniteLogger log;
-
- /** Future ID. */
- protected final IgniteUuid nearFutId;
-
- /** Future ID. */
- protected final int nearMiniId;
-
- /** Transaction. */
- protected final GridDhtTxLocalAdapter tx;
-
- /** Lock version. */
- protected final GridCacheVersion lockVer;
-
- /** */
- protected final MvccSnapshot mvccSnapshot;
-
- /** New DHT nodes. */
- protected final Set<UUID> newDhtNodes = new HashSet<>();
-
- /** Near node ID. */
- protected final UUID nearNodeId;
-
- /** Near lock version. */
- protected final GridCacheVersion nearLockVer;
-
- /** Filter. */
- private final CacheEntryPredicate filter;
-
- /** Keep binary flag. */
- protected boolean keepBinary;
-
- /** Timeout object. */
- @GridToStringExclude
- protected LockTimeoutObject timeoutObj;
-
- /** Lock timeout. */
- protected final long timeout;
-
- /** Query iterator */
- private UpdateSourceIterator<?> it;
-
- /** Row extracted from iterator but not yet used. */
- private Object peek;
-
- /** */
- @GridToStringExclude
- private volatile int skipCntr;
-
- /** */
- @SuppressWarnings("unused")
- @GridToStringExclude
- private volatile int done;
-
- /** */
- @GridToStringExclude
- private int batchIdCntr;
-
- /** Batches for sending to remote nodes. */
- private Map<UUID, Batch> batches;
-
- /** Batches already sent to remotes, but their acks are not received yet. */
- private ConcurrentMap<UUID, ConcurrentMap<Integer, Batch>> pending;
-
- /** Do not send DHT requests to near node. */
- protected boolean skipNearNodeUpdates;
-
- /** There are keys belonging to backup partitions on near node. */
- protected boolean hasNearNodeUpdates;
-
- /** Moving partitions. */
- private Map<Integer, Boolean> movingParts;
-
- /** Map for tracking nodes to which first request was already sent in order to send smaller subsequent requests. */
- private final Set<ClusterNode> firstReqSent = new HashSet<>();
-
- /** Deployment class loader id which will be used for deserialization of entries on a distributed task. */
- @GridToStringExclude
- protected final IgniteUuid deploymentLdrId;
-
- /**
- * @param nearNodeId Near node ID.
- * @param nearLockVer Near lock version.
- * @param mvccSnapshot Mvcc snapshot.
- * @param nearFutId Near future id.
- * @param nearMiniId Near mini future id.
- * @param tx Transaction.
- * @param timeout Lock acquisition timeout.
- * @param cctx Cache context.
- * @param filter Filter.
- * @param keepBinary Keep binary flag.
- */
- protected GridDhtTxAbstractEnlistFuture(UUID nearNodeId,
- GridCacheVersion nearLockVer,
- MvccSnapshot mvccSnapshot,
- IgniteUuid nearFutId,
- int nearMiniId,
- GridDhtTxLocalAdapter tx,
- long timeout,
- GridCacheContext<?, ?> cctx,
- @Nullable CacheEntryPredicate filter,
- boolean keepBinary) {
- assert tx != null;
- assert timeout >= 0;
- assert nearNodeId != null;
- assert nearLockVer != null;
-
- this.cctx = cctx;
- this.nearNodeId = nearNodeId;
- this.nearLockVer = nearLockVer;
- this.nearFutId = nearFutId;
- this.nearMiniId = nearMiniId;
- this.mvccSnapshot = mvccSnapshot;
- this.timeout = timeout;
- this.tx = tx;
- this.filter = filter;
- this.keepBinary = keepBinary;
- deploymentLdrId = U.contextDeploymentClassLoaderId(cctx.kernalContext());
-
- lockVer = tx.xidVersion();
-
- futId = IgniteUuid.randomUuid();
-
- log = cctx.logger(GridDhtTxAbstractEnlistFuture.class);
- }
-
- /**
- * Gets source to be updated iterator.
- *
- * @return iterator.
- * @throws IgniteCheckedException If failed.
- */
- protected abstract UpdateSourceIterator<?> createIterator() throws IgniteCheckedException;
-
- /**
- * Gets query result.
- *
- * @return Query result.
- */
- protected abstract T result0();
-
- /**
- * Gets need previous value flag.
- *
- * @return {@code True} if previous value is required.
- */
- public boolean needResult() {
- return false;
- }
-
- /**
- * Entry processed callback.
- *
- * @param key Entry key.
- * @param res Update result.
- */
- protected abstract void onEntryProcessed(KeyCacheObject key, GridCacheUpdateTxResult res);
-
- /**
- *
- */
- public void init() {
- if (timeout < 0) {
- // Time is out.
- onDone(timeoutException());
-
- return;
- }
- else if (timeout > 0)
- timeoutObj = new LockTimeoutObject();
-
- while (true) {
- IgniteInternalFuture<?> fut = tx.lockFut;
-
- if (fut == GridDhtTxLocalAdapter.ROLLBACK_FUT) {
- onDone(tx.timedOut() ? tx.timeoutException() : tx.rollbackException());
-
- return;
- }
- else if (fut != null) {
- // Wait for previous future.
- assert fut instanceof GridNearTxAbstractEnlistFuture
- || fut instanceof GridDhtTxAbstractEnlistFuture : fut;
-
- // Terminate this future if parent future is terminated by rollback.
- if (!fut.isDone()) {
- fut.listen(() -> {
- if (fut.error() != null)
- onDone(fut.error());
- });
- }
- else if (fut.error() != null)
- onDone(fut.error());
-
- break;
- }
- else if (tx.updateLockFuture(null, this))
- break;
- }
-
- boolean added = cctx.mvcc().addFuture(this, futId);
-
- if (isDone()) {
- cctx.mvcc().removeFuture(futId);
-
- return;
- }
-
- assert added;
-
- if (timeoutObj != null)
- cctx.time().addTimeoutObject(timeoutObj);
-
- try {
- UpdateSourceIterator<?> it = createIterator();
-
- if (!it.hasNext()) {
- U.close(it, log);
-
- onDone(result0());
-
- return;
- }
-
- if (!tx.implicitSingle())
- tx.addActiveCache(cctx, false);
- else // Nothing to do for single update.
- assert tx.txState().cacheIds().contains(cctx.cacheId()) && tx.txState().cacheIds().size() == 1;
-
- this.it = it;
- }
- catch (Throwable e) {
- onDone(e);
-
- if (e instanceof Error)
- throw (Error)e;
-
- return;
- }
-
- continueLoop(false);
- }
-
- /**
- * Clears lock future.
- */
- protected void clearLockFuture() {
- tx.clearLockFuture(this);
- }
-
- /**
- * Iterates over iterator, applies changes locally and sends it on backups.
- *
- * @param ignoreCntr {@code True} if need to ignore skip counter.
- */
- private void continueLoop(boolean ignoreCntr) {
- if (isDone() || (!ignoreCntr && (SKIP_UPD.getAndIncrement(this) != 0)))
- return;
-
- GridDhtCacheAdapter cache = cctx.dhtCache();
- EnlistOperation op = it.operation();
- AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
-
- try {
- while (true) {
- int curPart = -1;
- List<ClusterNode> backups = null;
-
- while (hasNext0()) {
- Object cur = next0();
-
- KeyCacheObject key = toKey(op, cur);
-
- if (curPart != key.partition())
- backups = backupNodes(curPart = key.partition());
-
- assert backups != null;
-
- if (!ensureFreeSlot(key, backups)) {
- // Can't advance further at the moment.
- peek = cur;
-
- it.beforeDetach();
-
- break;
- }
-
- GridDhtCacheEntry entry = cache.entryExx(key);
-
- if (log.isDebugEnabled())
- log.debug("Adding entry: " + entry);
-
- assert !entry.detached();
-
- CacheObject val = op.isDeleteOrLock() || op.isInvoke()
- ? null : cctx.toCacheObject(((Map.Entry<?, ?>)cur).getValue());
-
- GridInvokeValue invokeVal = null;
- EntryProcessor entryProc = null;
- Object[] invokeArgs = null;
-
- if (op.isInvoke()) {
- assert needResult();
-
- invokeVal = (GridInvokeValue)((Map.Entry<?, ?>)cur).getValue();
-
- entryProc = invokeVal.entryProcessor();
- invokeArgs = invokeVal.invokeArgs();
- }
-
- assert entryProc != null || !op.isInvoke();
-
- GridCacheUpdateTxResult res;
-
- while (true) {
- cctx.shared().database().checkpointReadLock();
-
- try {
- switch (op) {
- case DELETE:
- res = entry.mvccRemove(
- tx,
- cctx.localNodeId(),
- topVer,
- mvccSnapshot,
- isMoving(key.partition(), backups),
- false,
- filter,
- needResult());
-
- break;
-
- case INSERT:
- case TRANSFORM:
- case UPSERT:
- case UPDATE:
- res = entry.mvccSet(
- tx,
- cctx.localNodeId(),
- val,
- entryProc,
- invokeArgs,
- 0,
- topVer,
- mvccSnapshot,
- op.cacheOperation(),
- isMoving(key.partition(), backups),
- op.noCreate(),
- false,
- filter,
- needResult(),
- keepBinary);
-
- break;
-
- case LOCK:
- res = entry.mvccLock(
- tx,
- mvccSnapshot);
-
- break;
-
- default:
- throw new IgniteSQLException("Cannot acquire lock for operation [op= " + op + "]" +
- "Operation is unsupported at the moment ", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
- }
-
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- entry = cache.entryExx(entry.key(), topVer);
- }
- finally {
- cctx.shared().database().checkpointReadUnlock();
- }
- }
-
- IgniteInternalFuture<GridCacheUpdateTxResult> updateFut = res.updateFuture();
-
- final Message val0 = invokeVal != null ? invokeVal : val;
-
- if (updateFut != null) {
- if (updateFut.isDone())
- res = updateFut.get();
- else {
- GridDhtCacheEntry entry0 = entry;
- List<ClusterNode> backups0 = backups;
-
- it.beforeDetach();
-
- updateFut.listen(() -> {
- try {
- tx.incrementLockCounter();
-
- processEntry(entry0, op, updateFut.get(), val0, backups0);
-
- continueLoop(true);
- }
- catch (Throwable e) {
- onDone(e);
- }
- });
-
- // Can't move further. Exit loop without decrementing the counter.
- return;
- }
- }
-
- tx.incrementLockCounter();
-
- processEntry(entry, op, res, val0, backups);
- }
-
- if (!hasNext0()) {
- if (!F.isEmpty(batches)) {
- // Flush incomplete batches.
- // Need to skip batches for nodes where first request (contains tx info) is still in-flight.
- // Otherwise, the regular enlist request (without tx info) may beat it to the primary node.
- Iterator<Map.Entry<UUID, Batch>> it = batches.entrySet().iterator();
-
- while (it.hasNext()) {
- Map.Entry<UUID, Batch> e = it.next();
-
- ConcurrentMap<Integer, Batch> pending0 =
- pending == null ? null : pending.get(e.getKey());
-
- if (pending0 == null || !pending0.containsKey(FIRST_BATCH_ID)) {
- it.remove();
-
- sendBatch(e.getValue());
- }
- }
- }
-
- if (noPendingRequests()) {
- onDone(result0());
-
- return;
- }
- }
-
- if (SKIP_UPD.decrementAndGet(this) == 0)
- break;
-
- skipCntr = 1;
- }
- }
- catch (Throwable e) {
- onDone(e);
-
- if (e instanceof Error)
- throw (Error)e;
- }
- }
-
- /** */
- private Object next0() {
- if (!hasNext0())
- throw new NoSuchElementException();
-
- Object cur;
-
- if ((cur = peek) != null)
- peek = null;
- else
- cur = it.next();
-
- return cur;
- }
-
- /** */
- private boolean hasNext0() {
- if (peek == null && !it.hasNext())
- peek = FINISHED;
-
- return peek != FINISHED;
- }
-
- /** */
- private KeyCacheObject toKey(EnlistOperation op, Object cur) {
- KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((Map.Entry<?, ?>)cur).getKey());
-
- if (key.partition() == -1)
- key.partition(cctx.affinity().partition(key));
-
- return key;
- }
-
- /**
- * @return {@code True} if in-flight batches map is empty.
- */
- private boolean noPendingRequests() {
- if (F.isEmpty(pending))
- return true;
-
- for (ConcurrentMap<Integer, Batch> e : pending.values()) {
- if (!e.isEmpty())
- return false;
- }
-
- return true;
- }
-
- /**
- * @param entry Cache entry.
- * @param op Operation.
- * @param updRes Update result.
- * @param val New value.
- * @param backups Backup nodes
- * @throws IgniteCheckedException If failed.
- */
- private void processEntry(GridDhtCacheEntry entry, EnlistOperation op,
- GridCacheUpdateTxResult updRes, Message val, List<ClusterNode> backups) throws IgniteCheckedException {
- checkCompleted();
-
- assert updRes != null && updRes.updateFuture() == null;
-
- if (op != EnlistOperation.LOCK)
- onEntryProcessed(entry.key(), updRes);
-
- if (!updRes.success()
- || updRes.filtered()
- || op == EnlistOperation.LOCK)
- return;
-
- addToBatch(entry.key(), val, updRes.mvccHistory(), entry.context().cacheId(), backups);
- }
-
- /**
- * Adds row to batch.
- * <b>IMPORTANT:</b> This method should be called from the critical section in {@link this.sendNextBatches()}
- * @param key Key.
- * @param val Value.
- * @param hist History rows.
- * @param cacheId Cache Id.
- * @param backups Backup nodes
- */
- private void addToBatch(KeyCacheObject key, Message val, List<MvccLinkAwareSearchRow> hist,
- int cacheId, List<ClusterNode> backups) throws IgniteCheckedException {
- int part = key.partition();
-
- tx.touchPartition(cacheId, part);
-
- if (F.isEmpty(backups))
- return;
-
- CacheEntryInfoCollection hist0 = null;
-
- for (ClusterNode node : backups) {
- assert !node.isLocal();
-
- boolean moving = isMoving(node, part);
-
- if (skipNearLocalUpdate(node, moving)) {
- updateMappings(node);
-
- if (newRemoteTx(node))
- addNewRemoteTxNode(node);
-
- hasNearNodeUpdates = true;
-
- continue;
- }
-
- Batch batch = null;
-
- if (batches == null)
- batches = new HashMap<>();
- else
- batch = batches.get(node.id());
-
- if (batch == null)
- batches.put(node.id(), batch = new Batch(node));
-
- if (moving && hist0 == null) {
- assert !F.isEmpty(hist) || val == null;
-
- hist0 = fetchHistoryInfo(key, hist);
- }
-
- batch.add(key, moving ? hist0 : val);
-
- if (batch.size() == BATCH_SIZE) {
- assert batches != null;
-
- batches.remove(node.id());
-
- sendBatch(batch);
- }
- }
- }
-
- /**
- *
- * @param key Key.
- * @param hist History rows.
- * @return History entries.
- */
- private CacheEntryInfoCollection fetchHistoryInfo(KeyCacheObject key, List<MvccLinkAwareSearchRow> hist) {
- List<GridCacheEntryInfo> res = new ArrayList<>();
-
- for (int i = 0; i < hist.size(); i++) {
- MvccLinkAwareSearchRow row0 = hist.get(i);
-
- MvccDataRow row = new MvccDataRow(cctx.group(),
- row0.hash(),
- row0.link(),
- key.partition(),
- CacheDataRowAdapter.RowData.NO_KEY_WITH_HINTS,
- row0.mvccCoordinatorVersion(),
- row0.mvccCounter(),
- row0.mvccOperationCounter(),
- false
- );
-
- GridCacheMvccEntryInfo entry = new GridCacheMvccEntryInfo();
-
- entry.cacheId(cctx.cacheId());
- entry.version(row.version());
- entry.value(row.value());
- entry.expireTime(row.expireTime());
-
- // Row should be retrieved with actual hints.
- entry.mvccVersion(row);
- entry.newMvccVersion(row);
-
- if (MvccUtils.compare(mvccSnapshot, row.mvccCoordinatorVersion(), row.mvccCounter()) != 0)
- entry.mvccTxState(row.mvccTxState());
-
- if (row.newMvccCoordinatorVersion() != MvccUtils.MVCC_CRD_COUNTER_NA
- && MvccUtils.compare(mvccSnapshot, row.newMvccCoordinatorVersion(), row.newMvccCounter()) != 0)
- entry.newMvccTxState(row.newMvccTxState());
-
- assert mvccSnapshot.coordinatorVersion() != MvccUtils.MVCC_CRD_COUNTER_NA;
-
- res.add(entry);
- }
-
- return new CacheEntryInfoCollection(res);
- }
-
- /** */
- private boolean newRemoteTx(ClusterNode node) {
- Set<ClusterNode> nodes = tx.lockTransactionNodes();
-
- return nodes == null || !nodes.contains(node);
- }
-
- /**
- * Add new involved DHT node.
- *
- * @param node Node.
- */
- private void addNewRemoteTxNode(ClusterNode node) {
- tx.addLockTransactionNode(node);
-
- newDhtNodes.add(node.id());
- }
-
- /**
- * Checks if there free space in batches or free slot in in-flight batches is available for the given key.
- *
- * @param key Key.
- * @param backups Backup nodes.
- * @return {@code True} if there is possible to add this key to batch or send ready batch.
- */
- private boolean ensureFreeSlot(KeyCacheObject key, List<ClusterNode> backups) {
- if (F.isEmpty(batches) || F.isEmpty(pending))
- return true;
-
- int part = key.partition();
-
- // Check possibility of adding to batch and sending.
- for (ClusterNode node : backups) {
- if (skipNearLocalUpdate(node, isMoving(node, part)))
- continue;
-
- Batch batch = batches.get(node.id());
-
- // We can add key if batch is not full.
- if (batch == null || batch.size() < BATCH_SIZE - 1)
- continue;
-
- ConcurrentMap<Integer, Batch> pending0 = pending.get(node.id());
-
- assert pending0 == null || pending0.size() <= BATCHES_PER_NODE;
-
- if (pending0 != null && (pending0.containsKey(FIRST_BATCH_ID) || pending0.size() == BATCHES_PER_NODE))
- return false;
- }
-
- return true;
- }
-
- /**
- * Send batch request to remote data node.
- *
- * @param batch Batch.
- */
- private void sendBatch(Batch batch) throws IgniteCheckedException {
- assert batch != null && !batch.node().isLocal();
-
- ClusterNode node = batch.node();
-
- updateMappings(node);
-
- GridDhtTxQueryEnlistRequest req;
-
- if (newRemoteTx(node))
- addNewRemoteTxNode(node);
-
- if (firstReqSent.add(node)) {
- // If this is a first request to this node, send full info.
- req = new GridDhtTxQueryFirstEnlistRequest(cctx.cacheId(),
- futId,
- tx.topologyVersionSnapshot(),
- lockVer,
- mvccSnapshot.withoutActiveTransactions(),
- tx.remainingTime(),
- tx.taskNameHash(),
- nearNodeId,
- nearLockVer,
- it.operation(),
- FIRST_BATCH_ID,
- batch.keys(),
- batch.values()
- );
- }
- else {
- // Send only keys, values, LockVersion and batchId if this is not a first request to this backup.
- req = new GridDhtTxQueryEnlistRequest(cctx.cacheId(),
- futId,
- lockVer,
- it.operation(),
- ++batchIdCntr,
- mvccSnapshot.operationCounter(),
- batch.keys(),
- batch.values()
- );
- }
-
- ConcurrentMap<Integer, Batch> pending0 = null;
-
- if (pending == null)
- pending = new ConcurrentHashMap<>();
- else
- pending0 = pending.get(node.id());
-
- if (pending0 == null)
- pending.put(node.id(), pending0 = new ConcurrentHashMap<>());
-
- Batch prev = pending0.put(req.batchId(), batch);
-
- assert prev == null;
-
- try {
- cctx.io().send(node, req, cctx.ioPolicy());
- }
- catch (ClusterTopologyCheckedException e) {
- // backup node left the grid, will continue.
- onNodeLeft(node.id());
- }
- }
-
- /** */
- private synchronized void updateMappings(ClusterNode node) throws IgniteCheckedException {
- checkCompleted();
-
- Map<UUID, GridDistributedTxMapping> m = tx.dhtMap;
-
- GridDistributedTxMapping mapping = m.get(node.id());
-
- if (mapping == null)
- m.put(node.id(), mapping = new GridDistributedTxMapping(node));
-
- mapping.markQueryUpdate();
-
- checkCompleted();
- }
-
- /** */
- private boolean skipNearLocalUpdate(ClusterNode node, boolean moving) {
- return skipNearNodeUpdates && node.id().equals(nearNodeId) && !moving;
- }
-
- /**
- * @param part Partition.
- * @return Backup nodes for the given partition.
- */
- @NotNull private List<ClusterNode> backupNodes(int part) {
- List<ClusterNode> nodes = cctx.topology().nodes(part, tx.topologyVersion());
-
- assert !nodes.isEmpty() && nodes.get(0).isLocal();
-
- return nodes.subList(1, nodes.size());
- }
-
- /**
- * @param part Partition.
- * @param backups Backup nodes.
- * @return {@code true} if the given partition is rebalancing to any backup node.
- */
- private boolean isMoving(int part, List<ClusterNode> backups) {
- Boolean res;
-
- if (movingParts == null)
- movingParts = new HashMap<>();
-
- if ((res = movingParts.get(part)) == null)
- movingParts.put(part, res = isMoving0(part, backups));
-
- return res == Boolean.TRUE;
- }
-
- /**
- * @param part Partition.
- * @param backups Backup nodes.
- * @return {@code true} if the given partition is rebalancing to any backup node.
- */
- private Boolean isMoving0(int part, List<ClusterNode> backups) {
- for (ClusterNode node : backups) {
- if (isMoving(node, part))
- return Boolean.TRUE;
- }
-
- return Boolean.FALSE;
- }
-
- /**
- * @param node Cluster node.
- * @param part Partition.
- * @return {@code true} if the given partition is rebalancing to the given node.
- */
- private boolean isMoving(ClusterNode node, int part) {
- return cctx.topology().partitionState(node.id(), part) == GridDhtPartitionState.MOVING;
- }
-
- /** */
- private void checkCompleted() throws IgniteCheckedException {
- if (isDone())
- throw new IgniteCheckedException("Future is done.");
- }
-
- /**
- * Callback on backup response.
- *
- * @param nodeId Backup node.
- * @param res Response.
- */
- public void onResult(UUID nodeId, GridDhtTxQueryEnlistResponse res) {
- if (res.error() != null) {
- onDone(new IgniteCheckedException("Failed to update backup node: [localNodeId=" + cctx.localNodeId() +
- ", remoteNodeId=" + nodeId + ']', res.error()));
-
- return;
- }
-
- assert pending != null;
-
- ConcurrentMap<Integer, Batch> pending0 = pending.get(nodeId);
-
- assert pending0 != null;
-
- Batch rmv = pending0.remove(res.batchId());
-
- assert rmv != null;
-
- continueLoop(false);
- }
-
- /** {@inheritDoc} */
- @Override public boolean trackable() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- return futId;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- try {
- if (nearNodeId.equals(nodeId))
- onDone(new ClusterTopologyCheckedException("Requesting node left the grid [nodeId=" + nodeId + ']'));
- else if (pending != null && pending.remove(nodeId) != null)
- cctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> continueLoop(false));
- }
- catch (Exception e) {
- onDone(e);
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onDone(@Nullable T res, @Nullable Throwable err) {
- assert res != null || err != null;
-
- if (!DONE_UPD.compareAndSet(this, 0, 1))
- return false;
-
- if (err == null)
- clearLockFuture();
-
- // To prevent new remote transactions creation
- // after future is cancelled by rollback.
- synchronized (this) {
- boolean done = super.onDone(res, err);
-
- assert done;
-
- if (log.isDebugEnabled())
- log.debug("Completing future: " + this);
-
- // Clean up.
- cctx.mvcc().removeFuture(futId);
-
- if (timeoutObj != null)
- cctx.time().removeTimeoutObject(timeoutObj);
-
- U.close(it, log);
-
- return true;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onError(Throwable error) {
- onDone(error);
- }
-
- /**
- * @return Timeout exception.
- */
- @NotNull protected IgniteTxTimeoutCheckedException timeoutException() {
- return new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout for " +
- "transaction [timeout=" + timeout + ", tx=" + tx + ']');
- }
-
- /**
- * A batch of rows
- */
- private static class Batch {
- /** Node ID. */
- @GridToStringExclude
- private final ClusterNode node;
-
- /** */
- private List<KeyCacheObject> keys;
-
- /**
- * Values collection.
- * Items can be either {@link CacheObject} or preload entries collection {@link CacheEntryInfoCollection}.
- */
- private List<Message> vals;
-
- /**
- * @param node Cluster node.
- */
- private Batch(ClusterNode node) {
- this.node = node;
- }
-
- /**
- * @return Node.
- */
- public ClusterNode node() {
- return node;
- }
-
- /**
- * Adds a row to batch.
- *
- * @param key Key.
- * @param val Value or preload entries collection.
- */
- public void add(KeyCacheObject key, Message val) {
- assert val == null || val instanceof GridInvokeValue || val instanceof CacheObject
- || val instanceof CacheEntryInfoCollection;
-
- if (keys == null)
- keys = new ArrayList<>();
-
- if (vals == null && val != null) {
- vals = new ArrayList<>(U.ceilPow2(keys.size() + 1));
-
- while (vals.size() != keys.size())
- vals.add(null); // Init vals with missed 'nulls'.
- }
-
- keys.add(key);
-
- if (vals != null)
- vals.add(val);
- }
-
- /**
- * @return number of rows.
- */
- public int size() {
- return keys == null ? 0 : keys.size();
- }
-
- /**
- * @return Collection of row keys.
- */
- public List<KeyCacheObject> keys() {
- return keys;
- }
-
- /**
- * @return Collection of row values.
- */
- public List<Message> values() {
- return vals;
- }
- }
-
- /**
- * Lock request timeout object.
- */
- protected class LockTimeoutObject extends GridTimeoutObjectAdapter {
- /**
- * Default constructor.
- */
- LockTimeoutObject() {
- super(timeout);
- }
-
- /** {@inheritDoc} */
- @Override public void onTimeout() {
- if (log.isDebugEnabled())
- log.debug("Timed out waiting for lock response: " + this);
-
- onDone(timeoutException());
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(LockTimeoutObject.class, this);
- }
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java
deleted file mode 100644
index 72ad4d85767..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.UUID;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.CacheInvokeResult;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.EnlistOperation;
-import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Future processing transaction enlisting and locking of entries produces by cache API operations.
- */
-public final class GridDhtTxEnlistFuture extends GridDhtTxAbstractEnlistFuture<GridCacheReturn> implements UpdateSourceIterator<Object> {
- /** Enlist operation. */
- private final EnlistOperation op;
-
- /** Source iterator. */
- private final Iterator<Object> it;
-
- /** Future result. */
- private final GridCacheReturn res;
-
- /** Need result flag. If {@code True} previous value should be returned as well. */
- private final boolean needRes;
-
- /**
- * Constructor.
- *
- * @param nearNodeId Near node ID.
- * @param nearLockVer Near lock version.
- * @param mvccSnapshot Mvcc snapshot.
- * @param nearFutId Near future id.
- * @param nearMiniId Near mini future id.
- * @param tx Transaction.
- * @param timeout Lock acquisition timeout.
- * @param cctx Cache context.
- * @param rows Collection of rows.
- * @param op Operation.
- * @param filter Filter.
- * @param needRes Return previous value flag.
- * @param keepBinary Keep binary flag.
- */
- public GridDhtTxEnlistFuture(UUID nearNodeId,
- GridCacheVersion nearLockVer,
- MvccSnapshot mvccSnapshot,
- IgniteUuid nearFutId,
- int nearMiniId,
- GridDhtTxLocalAdapter tx,
- long timeout,
- GridCacheContext<?, ?> cctx,
- Collection<Object> rows,
- EnlistOperation op,
- @Nullable CacheEntryPredicate filter,
- boolean needRes,
- boolean keepBinary) {
- super(nearNodeId,
- nearLockVer,
- mvccSnapshot,
- nearFutId,
- nearMiniId,
- tx,
- timeout,
- cctx,
- filter,
- keepBinary);
-
- this.op = op;
- this.needRes = needRes;
-
- it = rows.iterator();
-
- res = new GridCacheReturn(cctx.localNodeId().equals(nearNodeId), false);
-
- skipNearNodeUpdates = true;
- }
-
- /** {@inheritDoc} */
- @Override protected UpdateSourceIterator<?> createIterator() {
- return this;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable protected GridCacheReturn result0() {
- return res;
- }
-
- /** {@inheritDoc} */
- @Override protected void onEntryProcessed(KeyCacheObject key, GridCacheUpdateTxResult txRes) {
- assert txRes.invokeResult() == null || needRes;
-
- res.success(txRes.success());
-
- if (txRes.invokeResult() != null) {
- res.invokeResult(true);
-
- CacheInvokeResult invokeRes = txRes.invokeResult();
-
- if (invokeRes.result() != null || invokeRes.error() != null)
- res.addEntryProcessResult(cctx, key, null, invokeRes.result(), invokeRes.error(), keepBinary);
- }
- else if (needRes)
- res.set(cctx, txRes.prevValue(), txRes.success(), keepBinary, U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId));
- }
-
- /** {@inheritDoc} */
- @Override public boolean needResult() {
- return needRes;
- }
-
- /** {@inheritDoc} */
- @Override public EnlistOperation operation() {
- return op;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasNextX() {
- return it.hasNext();
- }
-
- /** {@inheritDoc} */
- @Override public Object nextX() {
- return it.next();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxEnlistFuture.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 1de184b9d60..174e091ac5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -842,13 +842,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
return onePhaseCommit() && !near() && !nearOnOriginatingNode;
}
- /**
- * @return Lock future.
- */
- public IgniteInternalFuture<?> lockFuture() {
- return lockFut;
- }
-
/**
* Atomically updates lock future.
*
@@ -931,11 +924,4 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(),
"dhtNodes", dhtMap.keySet(), "explicitLock", explicitLock, "super", super.toString());
}
-
- /**
- * Increments lock counter.
- */
- public void incrementLockCounter() {
- txCounters(true).incrementLockCounter();
- }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java
deleted file mode 100644
index 2236818fae0..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.util.UUID;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.lang.IgniteUuid;
-
-/**
- * Abstract future processing transaction enlisting and locking of entries produced with DML and SELECT FOR UPDATE
- * queries.
- */
-public abstract class GridDhtTxQueryAbstractEnlistFuture extends GridDhtTxAbstractEnlistFuture<Long> {
- /** Processed entries count. */
- protected long cnt;
-
- /**
- * Constructor.
- * @param nearNodeId Near node ID.
- * @param nearLockVer Near lock version.
- * @param mvccSnapshot Mvcc snapshot.
- * @param nearFutId Near future id.
- * @param nearMiniId Near mini future id.
- * @param tx Transaction.
- * @param timeout Lock acquisition timeout.
- * @param cctx Cache context.
- */
- protected GridDhtTxQueryAbstractEnlistFuture(UUID nearNodeId,
- GridCacheVersion nearLockVer,
- MvccSnapshot mvccSnapshot,
- IgniteUuid nearFutId,
- int nearMiniId,
- GridDhtTxLocalAdapter tx,
- long timeout,
- GridCacheContext<?, ?> cctx) {
- super(nearNodeId,
- nearLockVer,
- mvccSnapshot,
- nearFutId,
- nearMiniId,
- tx,
- timeout,
- cctx,
- null,
- cctx.keepBinary());
- }
-
- /** {@inheritDoc} */
- @Override protected Long result0() {
- return cnt;
- }
-
- /** {@inheritDoc} */
- @Override protected void onEntryProcessed(KeyCacheObject key, GridCacheUpdateTxResult res) {
- if (res.success())
- cnt++;
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
deleted file mode 100644
index bf4cb5db3cc..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-
-/**
- * Cache query lock future.
- */
-public final class GridDhtTxQueryEnlistFuture extends GridDhtTxQueryAbstractEnlistFuture {
- /** Involved cache ids. */
- private final int[] cacheIds;
-
- /** Schema name. */
- private final String schema;
-
- /** Query string. */
- private final String qry;
-
- /** Query partitions. */
- private final int[] parts;
-
- /** Query parameters. */
- private final Object[] params;
-
- /** Flags. */
- private final int flags;
-
- /** Fetch page size. */
- private final int pageSize;
-
- /**
- * @param nearNodeId Near node ID.
- * @param nearLockVer Near lock version.
- * @param mvccSnapshot Mvcc snapshot.
- * @param nearFutId Near future id.
- * @param nearMiniId Near mini future id.
- * @param tx Transaction.
- * @param cacheIds Involved cache ids.
- * @param parts Partitions.
- * @param schema Schema name.
- * @param qry Query string.
- * @param params Query parameters.
- * @param flags Flags.
- * @param pageSize Fetch page size.
- * @param timeout Lock acquisition timeout.
- * @param cctx Cache context.
- */
- public GridDhtTxQueryEnlistFuture(
- UUID nearNodeId,
- GridCacheVersion nearLockVer,
- MvccSnapshot mvccSnapshot,
- IgniteUuid nearFutId,
- int nearMiniId,
- GridDhtTxLocalAdapter tx,
- int[] cacheIds,
- int[] parts,
- String schema,
- String qry,
- Object[] params,
- int flags,
- int pageSize,
- long timeout,
- GridCacheContext<?, ?> cctx) {
- super(nearNodeId,
- nearLockVer,
- mvccSnapshot,
- nearFutId,
- nearMiniId,
- tx,
- timeout,
- cctx);
-
- assert timeout >= 0;
- assert nearNodeId != null;
- assert nearLockVer != null;
-
- this.cacheIds = cacheIds;
- this.schema = schema;
- this.qry = qry;
- this.params = params;
- this.flags = flags;
- this.pageSize = pageSize;
-
- this.parts = calculatePartitions(tx, parts, cctx);
- }
-
- /** {@inheritDoc} */
- @Override protected UpdateSourceIterator<?> createIterator() throws IgniteCheckedException {
- checkPartitions(parts);
-
- return cctx.kernalContext().query().executeUpdateOnDataNodeTransactional(
- cctx,
- cacheIds,
- parts,
- schema,
- qry,
- params,
- flags,
- pageSize,
- (int)timeout,
- tx.topologyVersionSnapshot(),
- mvccSnapshot,
- new GridQueryCancel()
- );
- }
-
- /**
- * Checks whether all the necessary partitions are in {@link GridDhtPartitionState#OWNING} state.
- *
- * @param parts Partitions.
- * @throws ClusterTopologyCheckedException If failed.
- */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- private void checkPartitions(int[] parts) throws ClusterTopologyCheckedException {
- if (!cctx.rebalanceEnabled())
- return;
-
- GridDhtPartitionTopology top = cctx.topology();
-
- try {
- top.readLock();
-
- for (int i = 0; i < parts.length; i++) {
- GridDhtLocalPartition p = top.localPartition(parts[i]);
-
- if (p == null || p.state() != GridDhtPartitionState.OWNING) {
- throw new ClusterTopologyCheckedException("Cannot run update query. " +
- "Node must own all the necessary partitions.");
- }
- }
- }
- finally {
- top.readUnlock();
- }
- }
-
- /** */
- private int[] calculatePartitions(GridDhtTxLocalAdapter tx, int[] parts, GridCacheContext<?, ?> cctx) {
- if (parts == null)
- parts = U.toIntArray(
- cctx.affinity()
- .primaryPartitions(cctx.localNodeId(), tx.topologyVersionSnapshot()));
-
- return parts;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxQueryEnlistFuture.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistRequest.java
deleted file mode 100644
index 6a20029d35b..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistRequest.java
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CacheObjectContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.EnlistOperation;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class GridDhtTxQueryEnlistRequest extends GridCacheIdMessage implements GridCacheDeployable {
- /** */
- private static final long serialVersionUID = 5103887309729425173L;
-
- /** */
- private IgniteUuid dhtFutId;
-
- /** */
- private int batchId;
-
- /** DHT tx version. */
- private GridCacheVersion lockVer;
-
- /** */
- private EnlistOperation op;
-
- /** */
- private int mvccOpCnt;
-
- /** */
- @GridDirectCollection(KeyCacheObject.class)
- private List<KeyCacheObject> keys;
-
- /** */
- @GridDirectCollection(Message.class)
- private List<Message> vals;
-
- /**
- *
- */
- public GridDhtTxQueryEnlistRequest() {
- }
-
- /**
- * @param cacheId Cache id.
- * @param dhtFutId DHT future id.
- * @param lockVer Lock version.
- * @param op Operation.
- * @param batchId Batch id.
- * @param mvccOpCnt Mvcc operation counter.
- * @param keys Keys.
- * @param vals Values.
- */
- GridDhtTxQueryEnlistRequest(int cacheId,
- IgniteUuid dhtFutId,
- GridCacheVersion lockVer,
- EnlistOperation op,
- int batchId,
- int mvccOpCnt,
- List<KeyCacheObject> keys,
- List<Message> vals) {
- this.cacheId = cacheId;
- this.dhtFutId = dhtFutId;
- this.lockVer = lockVer;
- this.op = op;
- this.batchId = batchId;
- this.mvccOpCnt = mvccOpCnt;
- this.keys = keys;
- this.vals = vals;
- }
-
- /**
- * Returns request rows number.
- *
- * @return Request rows number.
- */
- public int batchSize() {
- return keys == null ? 0 : keys.size();
- }
-
- /**
- * @return Dht future id.
- */
- public IgniteUuid dhtFutureId() {
- return dhtFutId;
- }
-
- /**
- * @return Lock version.
- */
- public GridCacheVersion version() {
- return lockVer;
- }
-
- /**
- * @return Mvcc operation counter.
- */
- public int operationCounter() {
- return mvccOpCnt;
- }
-
- /**
- * @return Operation.
- */
- public EnlistOperation op() {
- return op;
- }
-
- /**
- * @return Keys.
- */
- public List<KeyCacheObject> keys() {
- return keys;
- }
-
- /**
- * @return Values.
- */
- public List<Message> values() {
- return vals;
- }
-
- /**
- * @return Batch id.
- */
- public int batchId() {
- return batchId;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 155;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
- CacheObjectContext objCtx = cctx.cacheObjectContext();
-
- if (!addDepInfo && cctx.deploymentEnabled())
- addDepInfo = true;
-
- if (keys != null) {
- for (int i = 0; i < keys.size(); i++) {
-
- keys.get(i).prepareMarshal(objCtx);
-
- if (vals != null) {
- Message val = vals.get(i);
-
- if (val instanceof CacheObject)
- ((CacheObject)val).prepareMarshal(objCtx);
- else if (val instanceof CacheEntryInfoCollection) {
- for (GridCacheEntryInfo entry : ((CacheEntryInfoCollection)val).infos()) {
- CacheObject entryVal = entry.value();
-
- if (entryVal != null)
- entryVal.prepareMarshal(objCtx);
- }
- }
- else if (val instanceof GridInvokeValue)
- prepareInvokeValue(cctx, (GridInvokeValue)val);
- }
- }
- }
- }
-
- /**
- *
- * @param cctx Cache context.
- * @param val0 Invoke value.
- * @throws IgniteCheckedException If failed.
- */
- private void prepareInvokeValue(GridCacheContext cctx, GridInvokeValue val0) throws IgniteCheckedException {
- assert val0 != null;
-
- forceAddDepInfo = true;
-
- prepareObject(val0.entryProcessor(), cctx.shared());
-
- if (!F.isEmpty(val0.invokeArgs())) {
- for (Object o : val0.invokeArgs())
- prepareObject(o, cctx.shared());
- }
-
- val0.prepareMarshal(cctx);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- CacheObjectContext objCtx = ctx.cacheContext(cacheId).cacheObjectContext();
-
- if (keys != null) {
- for (int i = 0; i < keys.size(); i++) {
- keys.get(i).finishUnmarshal(objCtx, ldr);
-
- if (vals != null) {
- Message val = vals.get(i);
-
- if (val instanceof CacheObject)
- ((CacheObject)val).finishUnmarshal(objCtx, ldr);
- else if (val instanceof CacheEntryInfoCollection) {
- for (GridCacheEntryInfo entry : ((CacheEntryInfoCollection)val).infos()) {
- CacheObject entryVal = entry.value();
-
- if (entryVal != null)
- entryVal.finishUnmarshal(objCtx, ldr);
- }
- }
- else if (val instanceof GridInvokeValue)
- ((GridInvokeValue)val).finishUnmarshal(ctx, ldr);
- }
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeInt("batchId", batchId))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeIgniteUuid("dhtFutId", dhtFutId))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeMessage("lockVer", lockVer))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeInt("mvccOpCnt", mvccOpCnt))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- batchId = reader.readInt("batchId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- dhtFutId = reader.readIgniteUuid("dhtFutId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- lockVer = reader.readMessage("lockVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- mvccOpCnt = reader.readInt("mvccOpCnt");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- byte opOrd;
-
- opOrd = reader.readByte("op");
-
- if (!reader.isLastRead())
- return false;
-
- op = EnlistOperation.fromOrdinal(opOrd);
-
- reader.incrementState();
-
- case 10:
- vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridDhtTxQueryEnlistRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 11;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxQueryEnlistRequest.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistResponse.java
deleted file mode 100644
index 42554362199..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistResponse.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class GridDhtTxQueryEnlistResponse extends GridCacheIdMessage {
- /** */
- private static final long serialVersionUID = -1510546400896574705L;
-
- /** Future ID. */
- private IgniteUuid futId;
-
- /** */
- private int batchId;
-
- /** Error. */
- @GridDirectTransient
- private Throwable err;
-
- /** Serialized error. */
- private byte[] errBytes;
-
- /**
- *
- */
- public GridDhtTxQueryEnlistResponse() {
- }
-
- /**
- * @param cacheId Cache id.
- * @param futId Future id.
- * @param batchId Batch id.
- * @param err Error.
- */
- public GridDhtTxQueryEnlistResponse(int cacheId, IgniteUuid futId, int batchId,
- Throwable err) {
- this.cacheId = cacheId;
- this.futId = futId;
- this.batchId = batchId;
- this.err = err;
- }
-
- /**
- * @return Future id.
- */
- public IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Batch id.
- */
- public int batchId() {
- return batchId;
- }
-
- /**
- * @return Error.
- */
- @Override public Throwable error() {
- return err;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- if (err != null && errBytes == null)
- errBytes = U.marshal(ctx.marshaller(), err);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- if (errBytes != null)
- err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 144;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 7;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeInt("batchId", batchId))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeByteArray("errBytes", errBytes))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeIgniteUuid("futId", futId))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- batchId = reader.readInt("batchId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- errBytes = reader.readByteArray("errBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- futId = reader.readIgniteUuid("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridDhtTxQueryEnlistResponse.class);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxQueryEnlistResponse.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryFirstEnlistRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryFirstEnlistRequest.java
deleted file mode 100644
index 88eebcca350..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryFirstEnlistRequest.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.UUID;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.EnlistOperation;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * First enlist request.
- */
-public class GridDhtTxQueryFirstEnlistRequest extends GridDhtTxQueryEnlistRequest {
- /** */
- private static final long serialVersionUID = -7494735627739420176L;
-
- /** */
- private AffinityTopologyVersion topVer;
-
- /** */
- private long crdVer;
-
- /** */
- private long cntr;
-
- /** */
- private long cleanupVer;
-
- /** */
- private long timeout;
-
- /** */
- private int taskNameHash;
-
- /** */
- private UUID nearNodeId;
-
- /** Near tx version. */
- private GridCacheVersion nearXidVer;
-
- /**
- *
- */
- public GridDhtTxQueryFirstEnlistRequest() {
- }
-
- /**
- * @param cacheId Cache id.
- * @param dhtFutId DHT future id.
- * @param topVer Topology version.
- * @param lockVer Lock version.
- * @param snapshot Mvcc snapshot.
- * @param timeout Timeout.
- * @param taskNameHash Task name hash.
- * @param nearNodeId Near node id.
- * @param nearXidVer Near xid version.
- * @param op Operation.
- * @param batchId Batch id.
- * @param keys Keys.
- * @param vals Values.
- */
- GridDhtTxQueryFirstEnlistRequest(int cacheId,
- IgniteUuid dhtFutId,
- AffinityTopologyVersion topVer,
- GridCacheVersion lockVer,
- MvccSnapshot snapshot,
- long timeout,
- int taskNameHash,
- UUID nearNodeId,
- GridCacheVersion nearXidVer,
- EnlistOperation op,
- int batchId,
- List<KeyCacheObject> keys,
- List<Message> vals) {
- super(cacheId, dhtFutId, lockVer, op, batchId, snapshot.operationCounter(), keys, vals);
-
- this.cacheId = cacheId;
- this.topVer = topVer;
- this.crdVer = snapshot.coordinatorVersion();
- this.cntr = snapshot.counter();
- this.cleanupVer = snapshot.cleanupVersion();
- this.timeout = timeout;
- this.taskNameHash = taskNameHash;
- this.nearNodeId = nearNodeId;
- this.nearXidVer = nearXidVer;
- }
-
- /** {@inheritDoc} */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
- * @return Near node id.
- */
- public UUID nearNodeId() {
- return nearNodeId;
- }
-
- /**
- * @return Near transaction ID.
- */
- public GridCacheVersion nearXidVersion() {
- return nearXidVer;
- }
-
- /**
- * @return Max lock wait time.
- */
- public long timeout() {
- return timeout;
- }
-
- /**
- * @return Task name hash.
- */
- public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
- * @return MVCC snapshot.
- */
- public MvccSnapshot mvccSnapshot() {
- return new MvccSnapshotWithoutTxs(crdVer, cntr, operationCounter(), cleanupVer);
- }
-
- /**
- * @return Coordinator version.
- */
- public long coordinatorVersion() {
- return crdVer;
- }
-
- /**
- * @return Counter.
- */
- public long counter() {
- return cntr;
- }
-
- /**
- * @return Cleanup version.
- */
- public long cleanupVersion() {
- return cleanupVer;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 156;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 11:
- if (!writer.writeLong("cleanupVer", cleanupVer))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeLong("cntr", cntr))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeLong("crdVer", crdVer))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeUuid("nearNodeId", nearNodeId))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeMessage("nearXidVer", nearXidVer))
- return false;
-
- writer.incrementState();
-
- case 16:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 17:
- if (!writer.writeLong("timeout", timeout))
- return false;
-
- writer.incrementState();
-
- case 18:
- if (!writer.writeAffinityTopologyVersion("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 11:
- cleanupVer = reader.readLong("cleanupVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- cntr = reader.readLong("cntr");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- crdVer = reader.readLong("crdVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- nearNodeId = reader.readUuid("nearNodeId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- nearXidVer = reader.readMessage("nearXidVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 17:
- timeout = reader.readLong("timeout");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 18:
- topVer = reader.readAffinityTopologyVersion("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridDhtTxQueryFirstEnlistRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 19;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxQueryFirstEnlistRequest.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java
deleted file mode 100644
index 2164ab9ca2b..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.UUID;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.EnlistOperation;
-import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
-
-/**
- * Future processing transaction enlisting and locking of entries
- * produces by complex DML queries with reduce step.
- */
-public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxQueryAbstractEnlistFuture implements UpdateSourceIterator<Object> {
- /** Enlist operation. */
- private final EnlistOperation op;
-
- /** Source iterator. */
- private final Iterator<Object> it;
-
- /**
- * @param nearNodeId Near node ID.
- * @param nearLockVer Near lock version.
- * @param mvccSnapshot Mvcc snapshot.
- * @param nearFutId Near future id.
- * @param nearMiniId Near mini future id.
- * @param tx Transaction.
- * @param timeout Lock acquisition timeout.
- * @param cctx Cache context.
- * @param rows Collection of rows.
- * @param op Operation.
- */
- public GridDhtTxQueryResultsEnlistFuture(UUID nearNodeId,
- GridCacheVersion nearLockVer,
- MvccSnapshot mvccSnapshot,
- IgniteUuid nearFutId,
- int nearMiniId,
- GridDhtTxLocalAdapter tx,
- long timeout,
- GridCacheContext<?, ?> cctx,
- Collection<Object> rows,
- EnlistOperation op) {
- super(nearNodeId,
- nearLockVer,
- mvccSnapshot,
- nearFutId,
- nearMiniId,
- tx,
- timeout,
- cctx);
-
- this.op = op;
-
- it = rows.iterator();
-
- skipNearNodeUpdates = true;
- }
-
- /** {@inheritDoc} */
- @Override protected UpdateSourceIterator<?> createIterator() {
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public EnlistOperation operation() {
- return op;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasNextX() {
- return it.hasNext();
- }
-
- /** {@inheritDoc} */
- @Override public Object nextX() {
- return it.next();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxQueryResultsEnlistFuture.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridInvokeValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridInvokeValue.java
deleted file mode 100644
index 5c9b0353e7f..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridInvokeValue.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.nio.ByteBuffer;
-import javax.cache.processor.EntryProcessor;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public class GridInvokeValue implements Message {
- /** */
- private static final long serialVersionUID = 1L;
-
- /** Optional arguments for entry processor. */
- @GridDirectTransient
- private Object[] invokeArgs;
-
- /** Entry processor arguments bytes. */
- private byte[][] invokeArgsBytes;
-
- /** Entry processors. */
- @GridDirectTransient
- private EntryProcessor<Object, Object, Object> entryProcessor;
-
- /** Entry processors bytes. */
- private byte[] entryProcessorBytes;
-
- /**
- * Constructor.
- */
- public GridInvokeValue() {
- }
-
- /**
- * Constructor.
- *
- * @param entryProcessor Entry processor.
- * @param invokeArgs Entry processor invoke arguments.
- */
- public GridInvokeValue(EntryProcessor<Object, Object, Object> entryProcessor, Object[] invokeArgs) {
- this.invokeArgs = invokeArgs;
- this.entryProcessor = entryProcessor;
- }
-
- /**
- * @return Invoke arguments.
- */
- public Object[] invokeArgs() {
- return invokeArgs;
- }
-
- /**
- * @return Entry processor.
- */
- public EntryProcessor<Object, Object, Object> entryProcessor() {
- return entryProcessor;
- }
-
- /**
- * Marshalls invoke value.
- *
- * @param ctx Context.
- * @throws IgniteCheckedException If failed.
- */
- public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException {
- if (entryProcessor != null && entryProcessorBytes == null)
- entryProcessorBytes = CU.marshal(ctx, entryProcessor);
-
- if (invokeArgsBytes == null)
- invokeArgsBytes = marshalInvokeArguments(invokeArgs, ctx);
- }
-
- /**
- * Unmarshalls invoke value.
- *
- * @param ctx Cache context.
- * @param ldr Class loader.
- * @throws IgniteCheckedException If un-marshalling failed.
- */
- public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- if (entryProcessorBytes != null && entryProcessor == null)
- entryProcessor = U.unmarshal(ctx.marshaller(), entryProcessorBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
-
- if (invokeArgs == null)
- invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, U.resolveClassLoader(ldr, ctx.gridConfig()));
- }
-
- /**
- * @param byteCol Collection to unmarshal.
- * @param ctx Context.
- * @param ldr Loader.
- * @return Unmarshalled collection.
- * @throws IgniteCheckedException If failed.
- */
- @Nullable protected final Object[] unmarshalInvokeArguments(@Nullable byte[][] byteCol,
- GridCacheSharedContext ctx,
- ClassLoader ldr) throws IgniteCheckedException {
- assert ldr != null;
- assert ctx != null;
-
- if (byteCol == null)
- return null;
-
- Object[] args = new Object[byteCol.length];
-
- Marshaller marsh = ctx.marshaller();
-
- for (int i = 0; i < byteCol.length; i++)
- args[i] = byteCol[i] == null ? null : U.unmarshal(marsh, byteCol[i], ldr);
-
- return args;
- }
-
- /**
- * @param args Arguments to marshal.
- * @param ctx Context.
- * @return Marshalled collection.
- * @throws IgniteCheckedException If failed.
- */
- @Nullable protected final byte[][] marshalInvokeArguments(@Nullable Object[] args, GridCacheContext ctx)
- throws IgniteCheckedException {
- assert ctx != null;
-
- if (args == null || args.length == 0)
- return null;
-
- byte[][] argsBytes = new byte[args.length][];
-
- for (int i = 0; i < args.length; i++) {
- Object arg = args[i];
-
- argsBytes[i] = arg == null ? null : CU.marshal(ctx, arg);
- }
-
- return argsBytes;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByteArray("entryProcessorBytes", entryProcessorBytes))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- entryProcessorBytes = reader.readByteArray("entryProcessorBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridInvokeValue.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 161;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 2;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxQueryEnlistResultHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxQueryEnlistResultHandler.java
deleted file mode 100644
index 0329b0e31b2..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxQueryEnlistResultHandler.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistResponse;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.lang.GridClosureException;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.NotNull;
-
-/**
- *
- */
-public final class NearTxQueryEnlistResultHandler implements CI1<IgniteInternalFuture<Long>> {
- /** */
- private static final long serialVersionUID = 5189735824793607906L;
-
- /** */
- private static final NearTxQueryEnlistResultHandler INSTANCE = new NearTxQueryEnlistResultHandler();
-
- /** */
- private NearTxQueryEnlistResultHandler() {}
-
- /**
- * @return Handler instance.
- */
- public static NearTxQueryEnlistResultHandler instance() {
- return INSTANCE;
- }
-
- /**
- * @param future Enlist future.
- * @return Enlist response.
- */
- public static <T extends GridNearTxQueryEnlistResponse> T createResponse(IgniteInternalFuture<?> future) {
- assert future != null;
-
- Class<?> clazz = future.getClass();
-
- if (clazz == GridDhtTxQueryResultsEnlistFuture.class)
- return (T)createResponse((GridDhtTxQueryResultsEnlistFuture)future);
- else if (clazz == GridDhtTxQueryEnlistFuture.class)
- return (T)createResponse((GridDhtTxQueryEnlistFuture)future);
- else
- throw new IllegalStateException();
- }
-
- /**
- * @param future Enlist future.
- * @return Enlist response.
- */
- @NotNull private static GridNearTxQueryEnlistResponse createResponse(GridDhtTxQueryEnlistFuture future) {
- try {
- future.get();
-
- assert future.cnt == 0;
-
- return new GridNearTxQueryEnlistResponse(future.cctx.cacheId(), future.nearFutId, future.nearMiniId,
- future.nearLockVer, future.cnt, future.tx.empty(), future.newDhtNodes);
- }
- catch (IgniteCheckedException e) {
- return new GridNearTxQueryEnlistResponse(future.cctx.cacheId(), future.nearFutId, future.nearMiniId, future.nearLockVer, e);
- }
- }
-
- /**
- * @param fut Enlist future.
- * @return Enlist response.
- */
- @NotNull private static GridNearTxQueryResultsEnlistResponse createResponse(GridDhtTxQueryResultsEnlistFuture fut) {
- try {
- fut.get();
-
- GridCacheVersion ver = null;
- IgniteUuid id = null;
-
- if (fut.hasNearNodeUpdates) {
- ver = fut.cctx.tm().mappedVersion(fut.nearLockVer);
-
- id = fut.futId;
- }
-
- return new GridNearTxQueryResultsEnlistResponse(fut.cctx.cacheId(), fut.nearFutId, fut.nearMiniId,
- fut.nearLockVer, fut.cnt, ver, id, fut.newDhtNodes);
- }
- catch (IgniteCheckedException e) {
- return new GridNearTxQueryResultsEnlistResponse(fut.cctx.cacheId(), fut.nearFutId, fut.nearMiniId,
- fut.nearLockVer, e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void apply(IgniteInternalFuture<Long> fut0) {
- GridDhtTxAbstractEnlistFuture fut = (GridDhtTxAbstractEnlistFuture)fut0;
-
- GridCacheContext<?, ?> cctx = fut.cctx;
- GridDhtTxLocal tx = (GridDhtTxLocal)fut.tx;
- UUID nearNodeId = fut.nearNodeId;
-
- GridNearTxQueryEnlistResponse res = createResponse(fut);
-
- if (res.removeMapping()) {
- cctx.tm().forgetTx(tx);
-
- try {
- cctx.io().send(nearNodeId, res, cctx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- U.error(fut.log, "Failed to send near enlist response [" +
- "tx=" + CU.txString(tx) +
- ", node=" + nearNodeId +
- ", res=" + res + ']', e);
-
- throw new GridClosureException(e);
- }
-
- return;
- }
-
- try {
- cctx.io().send(nearNodeId, res, cctx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- U.error(fut.log, "Failed to send near enlist response (will rollback transaction) [" +
- "tx=" + CU.txString(tx) +
- ", node=" + nearNodeId +
- ", res=" + res + ']', e);
-
- try {
- tx.rollbackDhtLocalAsync();
- }
- catch (Throwable e1) {
- e.addSuppressed(e1);
- }
-
- throw new GridClosureException(e);
- }
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java
deleted file mode 100644
index 8510da40b6e..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.lang.GridClosureException;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-
-/**
- * Response factory.
- */
-public final class NearTxResultHandler implements CI1<IgniteInternalFuture<GridCacheReturn>> {
- /** */
- private static final long serialVersionUID = 0;
-
- /** Singleton instance.*/
- private static final NearTxResultHandler INSTANCE = new NearTxResultHandler();
-
- /** Constructor. */
- private NearTxResultHandler() {
- }
-
- /**
- * @return Handler instance.
- */
- public static NearTxResultHandler instance() {
- return INSTANCE;
- }
-
- /**
- * Response factory method.
- *
- * @param future Enlist future.
- * @return Enlist response.
- */
- public static <T> T createResponse(IgniteInternalFuture<?> future) {
- assert future != null;
-
- Class<?> clazz = future.getClass();
-
- if (clazz == GridDhtTxEnlistFuture.class)
- return (T)createResponse((GridDhtTxEnlistFuture)future);
- else
- throw new IllegalStateException();
- }
-
- /**
- * Response factory method.
- *
- * @param fut Enlist future.
- * @return Enlist response.
- */
- public static GridNearTxEnlistResponse createResponse(GridDhtTxEnlistFuture fut) {
- try {
- GridCacheReturn res = fut.get();
-
- GridCacheVersion ver = null;
- IgniteUuid id = null;
-
- if (fut.hasNearNodeUpdates) {
- ver = fut.cctx.tm().mappedVersion(fut.nearLockVer);
-
- id = fut.futId;
- }
-
- return new GridNearTxEnlistResponse(fut.cctx.cacheId(), fut.nearFutId, fut.nearMiniId,
- fut.nearLockVer, res, ver, id, fut.newDhtNodes);
- }
- catch (IgniteCheckedException e) {
- return new GridNearTxEnlistResponse(fut.cctx.cacheId(), fut.nearFutId, fut.nearMiniId, fut.nearLockVer, e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void apply(IgniteInternalFuture<GridCacheReturn> fut0) {
- GridDhtTxAbstractEnlistFuture fut = (GridDhtTxAbstractEnlistFuture)fut0;
-
- GridCacheContext<?, ?> cctx = fut.cctx;
- GridDhtTxLocal tx = (GridDhtTxLocal)fut.tx;
- UUID nearNodeId = fut.nearNodeId;
-
- GridNearTxEnlistResponse res = createResponse(fut);
-
- try {
- cctx.io().send(nearNodeId, res, cctx.ioPolicy());
- }
- catch (ClusterTopologyCheckedException e) {
- fut.onNodeLeft(nearNodeId);
- }
- catch (IgniteCheckedException e) {
- U.error(fut.log, "Failed to send near enlist response (will rollback transaction) [" +
- "tx=" + CU.txString(tx) +
- ", node=" + nearNodeId +
- ", res=" + res + ']', e);
-
- try {
- tx.rollbackDhtLocalAsync();
- }
- catch (Throwable e1) {
- e.addSuppressed(e1);
- }
-
- throw new GridClosureException(e);
- }
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
deleted file mode 100644
index e5e534ac565..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
+++ /dev/null
@@ -1,491 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import org.apache.ignite.IgniteCacheRestartingException;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheStoppedException;
-import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
-import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
-import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxAbstractEnlistFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
-import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteReducer;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoundIdentityFuture<T> implements
- GridCacheVersionedFuture<T> {
- /** Done field updater. */
- private static final AtomicIntegerFieldUpdater<GridNearTxAbstractEnlistFuture> DONE_UPD =
- AtomicIntegerFieldUpdater.newUpdater(GridNearTxAbstractEnlistFuture.class, "done");
-
- /** Cache context. */
- @GridToStringExclude
- protected final GridCacheContext<?, ?> cctx;
-
- /** Transaction. */
- protected final GridNearTxLocal tx;
-
- /** */
- protected AffinityTopologyVersion topVer;
-
- /** Logger. */
- @GridToStringExclude
- protected final IgniteLogger log;
-
- /** */
- private final long timeout;
-
- /** Initiated thread id. */
- protected final long threadId;
-
- /** Mvcc future id. */
- protected final IgniteUuid futId;
-
- /** Lock version. */
- protected final GridCacheVersion lockVer;
-
- /** */
- @GridToStringExclude
- private GridDhtTxAbstractEnlistFuture locEnlistFut;
-
- /** */
- @SuppressWarnings("unused")
- @GridToStringExclude
- private volatile int done;
-
- /** Timeout object. */
- @GridToStringExclude
- private LockTimeoutObject timeoutObj;
-
- /**
- * @param cctx Cache context.
- * @param tx Transaction.
- * @param timeout Timeout.
- * @param rdc Compound future reducer.
- */
- protected GridNearTxAbstractEnlistFuture(
- GridCacheContext<?, ?> cctx, GridNearTxLocal tx, long timeout, @Nullable IgniteReducer<T, T> rdc) {
- super(rdc);
-
- assert cctx != null;
- assert tx != null;
-
- this.cctx = cctx;
- this.tx = tx;
- this.timeout = timeout;
-
- threadId = tx.threadId();
- lockVer = tx.xidVersion();
- futId = IgniteUuid.randomUuid();
-
- log = cctx.logger(getClass());
- }
-
- /**
- *
- */
- public void init() {
- if (timeout < 0) {
- // Time is out.
- onDone(timeoutException());
-
- return;
- }
- else if (timeout > 0)
- timeoutObj = new LockTimeoutObject();
-
- while (true) {
- IgniteInternalFuture<?> fut = tx.lockFuture();
-
- if (fut == GridDhtTxLocalAdapter.ROLLBACK_FUT) {
- onDone(tx.timedOut() ? tx.timeoutException() : tx.rollbackException());
-
- return;
- }
- else if (fut != null) {
- // Wait for previous future.
- assert fut instanceof GridNearTxAbstractEnlistFuture
- || fut instanceof GridDhtTxAbstractEnlistFuture : fut;
-
- // Terminate this future if parent future is terminated by rollback.
- if (!fut.isDone()) {
- fut.listen(() -> {
- if (fut.error() != null)
- onDone(fut.error());
- });
- }
- else if (fut.error() != null)
- onDone(fut.error());
-
- break;
- }
- else if (tx.updateLockFuture(null, this))
- break;
- }
-
- boolean added = cctx.mvcc().addFuture(this);
-
- assert added : this;
-
- if (isDone()) {
- cctx.mvcc().removeFuture(futId);
-
- return;
- }
-
- try {
- tx.addActiveCache(cctx, false);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
-
- return;
- }
-
- if (timeoutObj != null)
- cctx.time().addTimeoutObject(timeoutObj);
-
- // Obtain the topology version to use.
- long threadId = Thread.currentThread().getId();
-
- AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
-
- // If there is another system transaction in progress, use it's topology version to prevent deadlock.
- if (topVer == null && tx.system())
- topVer = cctx.tm().lockedTopologyVersion(threadId, tx);
-
- if (topVer != null)
- tx.topologyVersion(topVer);
-
- if (topVer == null)
- topVer = tx.topologyVersionSnapshot();
-
- if (topVer != null) {
- for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()) {
- if (fut.exchangeDone() && fut.topologyVersion().equals(topVer)) {
- Throwable err = null;
-
- // Before cache validation, make sure that this topology future is already completed.
- try {
- fut.get();
- }
- catch (IgniteCheckedException e) {
- err = fut.error();
- }
-
- if (err == null)
- err = fut.validateCache(cctx, false, false, null, null);
-
- if (err != null) {
- onDone(err);
-
- return;
- }
-
- break;
- }
- }
-
- if (this.topVer == null)
- this.topVer = topVer;
-
- map(true);
-
- return;
- }
-
- mapOnTopology();
- }
-
- /**
- * @param node Primary node.
- * @throws IgniteCheckedException if future is already completed.
- */
- protected synchronized void updateMappings(ClusterNode node) throws IgniteCheckedException {
- checkCompleted();
-
- IgniteTxMappings m = tx.mappings();
-
- GridDistributedTxMapping mapping = m.get(node.id());
-
- if (mapping == null)
- m.put(mapping = new GridDistributedTxMapping(node));
-
- mapping.markQueryUpdate();
-
- if (node.isLocal())
- tx.colocatedLocallyMapped(true);
-
- checkCompleted();
- }
-
- /**
- * @param fut Local enlist future.
- * @throws IgniteCheckedException if future is already completed.
- */
- protected synchronized void updateLocalFuture(GridDhtTxAbstractEnlistFuture fut) throws IgniteCheckedException {
- checkCompleted();
-
- assert locEnlistFut == null;
-
- locEnlistFut = fut;
- }
-
- /**
- * @param fut Local enlist future.
- * @throws IgniteCheckedException if future is already completed.
- */
- protected synchronized void clearLocalFuture(GridDhtTxAbstractEnlistFuture fut) throws IgniteCheckedException {
- checkCompleted();
-
- if (locEnlistFut == fut)
- locEnlistFut = null;
- }
-
- /**
- * @throws IgniteCheckedException if future is already completed.
- */
- protected void checkCompleted() throws IgniteCheckedException {
- if (isDone())
- throw new IgniteCheckedException("Future is done.");
- }
-
- /**
- */
- private void mapOnTopology() {
- cctx.topology().readLock(); boolean topLocked = true;
-
- try {
- if (cctx.topology().stopping()) {
- onDone(
- cctx.shared().cache().isCacheRestarting(cctx.name()) ?
- new IgniteCacheRestartingException(cctx.name()) :
- new CacheStoppedException(cctx.name()));
-
- return;
- }
-
- GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
-
- cctx.topology().readUnlock(); topLocked = false;
-
- if (fut.isDone()) {
- Throwable err = fut.validateCache(cctx, false, false, null, null);
-
- if (err != null) {
- onDone(err);
-
- return;
- }
-
- AffinityTopologyVersion topVer = fut.topologyVersion();
-
- tx.topologyVersion(topVer);
-
- if (this.topVer == null)
- this.topVer = topVer;
-
- map(false);
- }
- else {
- cctx.time().waitAsync(fut, tx.remainingTime(), (e, timedOut) -> {
- try {
- if (e != null || timedOut)
- onDone(timedOut ? tx.timeoutException() : e);
- else
- mapOnTopology();
- }
- finally {
- cctx.shared().txContextReset();
- }
- });
- }
- }
- finally {
- if (topLocked)
- cctx.topology().readUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean onCancelled() {
- return onDone(null, asyncRollbackException(), false);
- }
-
- /** {@inheritDoc} */
- @Override public boolean onDone(@Nullable T res, @Nullable Throwable err, boolean cancelled) {
- if (!DONE_UPD.compareAndSet(this, 0, 1))
- return false;
-
- cctx.tm().txContext(tx);
-
- if (!cancelled && err == null)
- tx.clearLockFuture(this);
- else
- tx.setRollbackOnly();
-
- synchronized (this) {
- GridDhtTxAbstractEnlistFuture locFut0 = locEnlistFut;
-
- if (locFut0 != null && (err != null || cancelled))
- locFut0.onDone(cancelled ? new IgniteFutureCancelledCheckedException("Future was cancelled: " + locFut0) : err);
-
- boolean done = super.onDone(res, err, cancelled);
-
- assert done;
-
- // Clean up.
- cctx.mvcc().removeVersionedFuture(this);
-
- if (timeoutObj != null)
- cctx.time().removeTimeoutObject(timeoutObj);
-
- return true;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void logError(IgniteLogger log, String msg, Throwable e) {
- // no-op
- }
-
- /** {@inheritDoc} */
- @Override protected void logDebug(IgniteLogger log, String msg) {
- // no-op
- }
-
- /** {@inheritDoc} */
- @Override public boolean trackable() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return lockVer;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * Gets remaining allowed time.
- *
- * @return Remaining time. {@code 0} if timeout isn't specified. {@code -1} if time is out.
- * @throws IgniteTxTimeoutCheckedException If timed out.
- */
- protected long remainingTime() throws IgniteTxTimeoutCheckedException {
- if (timeout <= 0)
- return 0;
-
- long timeLeft = timeout - (U.currentTimeMillis() - startTime());
-
- if (timeLeft <= 0)
- throw timeoutException();
-
- return timeLeft;
- }
-
- /**
- * @return Timeout exception.
- */
- @NotNull protected IgniteTxTimeoutCheckedException timeoutException() {
- return new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout for " +
- "transaction [timeout=" + timeout + ", tx=" + tx + ']');
- }
-
- /**
- * @return Async rollback exception.
- */
- @NotNull private IgniteTxRollbackCheckedException asyncRollbackException() {
- return new IgniteTxRollbackCheckedException("Transaction was asynchronously rolled back [tx=" + tx + ']');
- }
-
- /**
- * Start iterating the data rows and form batches.
- *
- * @param topLocked Whether topology was already locked.
- */
- protected abstract void map(boolean topLocked);
-
- /**
- * @return Nodes from which current future waits responses.
- */
- public abstract Set<UUID> pendingResponseNodes();
-
- /**
- * Lock request timeout object.
- */
- private class LockTimeoutObject extends GridTimeoutObjectAdapter {
- /**
- * Default constructor.
- */
- LockTimeoutObject() {
- super(timeout);
- }
-
- /** {@inheritDoc} */
- @Override public void onTimeout() {
- if (log.isDebugEnabled())
- log.debug("Timed out waiting for lock response: " + this);
-
- onDone(timeoutException());
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(LockTimeoutObject.class, this);
- }
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
deleted file mode 100644
index 285dced9750..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
+++ /dev/null
@@ -1,691 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxEnlistFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.EnlistOperation;
-import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
-import org.apache.ignite.internal.processors.security.SecurityUtils;
-import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.internal.processors.cache.distributed.dht.NearTxResultHandler.createResponse;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
-/**
- * A future tracking requests for remote nodes transaction enlisting and locking produces by cache API operations.
- */
-public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridCacheReturn> {
- /** Default batch size. */
- public static final int DFLT_BATCH_SIZE = 1024;
-
- /** SkipCntr field updater. */
- private static final AtomicIntegerFieldUpdater<GridNearTxEnlistFuture> SKIP_UPD =
- AtomicIntegerFieldUpdater.newUpdater(GridNearTxEnlistFuture.class, "skipCntr");
-
- /** Res field updater. */
- private static final AtomicReferenceFieldUpdater<GridNearTxEnlistFuture, GridCacheReturn> RES_UPD =
- AtomicReferenceFieldUpdater.newUpdater(GridNearTxEnlistFuture.class, GridCacheReturn.class, "res");
-
- /** Marker object. */
- private static final Object FINISHED = new Object();
-
- /** Source iterator. */
- @GridToStringExclude
- private final UpdateSourceIterator<?> it;
-
- /** Batch size. */
- private final int batchSize;
-
- /** */
- private final AtomicInteger batchCntr = new AtomicInteger();
-
- /** */
- @SuppressWarnings("unused")
- @GridToStringExclude
- private volatile int skipCntr;
-
- /** Future result. */
- @GridToStringExclude
- private volatile GridCacheReturn res;
-
- /** */
- private final Map<UUID, Batch> batches = new ConcurrentHashMap<>();
-
- /** Row extracted from iterator but not yet used. */
- private Object peek;
-
- /** Topology locked flag. */
- private boolean topLocked;
-
- /** Ordered batch sending flag. */
- private final boolean sequential;
-
- /** Filter. */
- private final CacheEntryPredicate filter;
-
- /** Need previous value flag. */
- private final boolean needRes;
-
- /** Keep binary flag. */
- private final boolean keepBinary;
-
- /**
- * @param cctx Cache context.
- * @param tx Transaction.
- * @param timeout Timeout.
- * @param it Rows iterator.
- * @param batchSize Batch size.
- * @param sequential Sequential locking flag.
- * @param filter Filter.
- * @param needRes Need previous value flag.
- * @param keepBinary Keep binary flag.
- */
- public GridNearTxEnlistFuture(GridCacheContext<?, ?> cctx,
- GridNearTxLocal tx,
- long timeout,
- UpdateSourceIterator<?> it,
- int batchSize,
- boolean sequential,
- @Nullable CacheEntryPredicate filter,
- boolean needRes,
- boolean keepBinary) {
- super(cctx, tx, timeout, null);
-
- this.it = it;
- this.batchSize = batchSize > 0 ? batchSize : DFLT_BATCH_SIZE;
- this.sequential = sequential;
- this.filter = filter;
- this.needRes = needRes;
- this.keepBinary = keepBinary;
- }
-
- /** {@inheritDoc} */
- @Override protected void map(boolean topLocked) {
- this.topLocked = topLocked;
-
- // Update write version to match current topology, otherwise version can lag behind local node's init version.
- // Reproduced by IgniteCacheEntryProcessorNodeJoinTest.testAllEntryProcessorNodeJoin.
- if (tx.local() && !topLocked)
- tx.writeVersion(cctx.versions().next(tx.topologyVersion().topologyVersion()));
-
- sendNextBatches(null);
- }
-
- /**
- * Continue iterating the data rows and form new batches.
- *
- * @param nodeId Node that is ready for a new batch.
- */
- private void sendNextBatches(@Nullable UUID nodeId) {
- try {
- Collection<Batch> next = continueLoop(nodeId);
-
- if (next == null)
- return;
-
- boolean first = (nodeId != null);
-
- for (Batch batch : next) {
- ClusterNode node = batch.node();
-
- sendBatch(node, batch, first);
-
- if (!node.isLocal())
- first = false;
- }
- }
- catch (Throwable e) {
- onDone(e);
-
- if (e instanceof Error)
- throw (Error)e;
- }
- }
-
- /**
- * Iterate data rows and form batches.
- *
- * @param nodeId Id of node acknowledged the last batch.
- * @return Collection of newly completed batches.
- * @throws IgniteCheckedException If failed.
- */
- private Collection<Batch> continueLoop(@Nullable UUID nodeId) throws IgniteCheckedException {
- if (nodeId != null)
- batches.remove(nodeId);
-
- // Accumulate number of batches released since we got here.
- // Let only one thread do the looping.
- if (isDone() || SKIP_UPD.getAndIncrement(this) != 0)
- return null;
-
- ArrayList<Batch> res = null;
- Batch batch = null;
-
- boolean flush = false;
-
- EnlistOperation op = it.operation();
-
- while (true) {
- while (hasNext0()) {
- checkCompleted();
-
- Object cur = next0();
-
- KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((Map.Entry<?, ?>)cur).getKey());
-
- ClusterNode node = cctx.affinity().primaryByKey(key, topVer);
-
- if (node == null)
- throw new ClusterTopologyServerNotFoundException("Failed to get primary node " +
- "[topVer=" + topVer + ", key=" + key + ']');
-
- if (!sequential)
- batch = batches.get(node.id());
- else if (batch != null && !batch.node().equals(node))
- res = markReady(res, batch);
-
- if (batch == null)
- batches.put(node.id(), batch = new Batch(node));
-
- if (batch.ready()) {
- // Can't advance further at the moment.
- batch = null;
-
- peek = cur;
-
- it.beforeDetach();
-
- flush = true;
-
- break;
- }
-
- batch.add(op.isDeleteOrLock() ? key : cur, !node.isLocal() && isLocalBackup(op, key));
-
- if (batch.size() == batchSize)
- res = markReady(res, batch);
- }
-
- if (SKIP_UPD.decrementAndGet(this) == 0)
- break;
-
- skipCntr = 1;
- }
-
- if (flush)
- return res;
-
- // No data left - flush incomplete batches.
- for (Batch batch0 : batches.values()) {
- if (!batch0.ready()) {
- if (res == null)
- res = new ArrayList<>();
-
- batch0.ready(true);
-
- res.add(batch0);
- }
- }
-
- if (batches.isEmpty())
- onDone(this.res);
-
- return res;
- }
-
- /** */
- private Object next0() {
- if (!hasNext0())
- throw new NoSuchElementException();
-
- Object cur;
-
- if ((cur = peek) != null)
- peek = null;
- else
- cur = it.next();
-
- return cur;
- }
-
- /** */
- private boolean hasNext0() {
- if (peek == null && !it.hasNext())
- peek = FINISHED;
-
- return peek != FINISHED;
- }
-
- /** */
- private boolean isLocalBackup(EnlistOperation op, KeyCacheObject key) {
- if (!cctx.affinityNode() || op == EnlistOperation.LOCK)
- return false;
- else if (cctx.isReplicated())
- return true;
-
- return cctx.topology().nodes(key.partition(), tx.topologyVersion()).indexOf(cctx.localNode()) > 0;
- }
-
- /**
- * Add batch to batch collection if it is ready.
- *
- * @param batches Collection of batches.
- * @param batch Batch to be added.
- */
- private ArrayList<Batch> markReady(ArrayList<Batch> batches, Batch batch) {
- if (!batch.ready()) {
- batch.ready(true);
-
- if (batches == null)
- batches = new ArrayList<>();
-
- batches.add(batch);
- }
-
- return batches;
- }
-
- /**
- * @param primaryId Primary node id.
- * @param rows Rows.
- * @param dhtVer Dht version assigned at primary node.
- * @param dhtFutId Dht future id assigned at primary node.
- */
- private void processBatchLocalBackupKeys(UUID primaryId, List<Object> rows, GridCacheVersion dhtVer,
- IgniteUuid dhtFutId) {
- assert dhtVer != null;
- assert dhtFutId != null;
-
- EnlistOperation op = it.operation();
-
- assert op != EnlistOperation.LOCK;
-
- boolean keysOnly = op.isDeleteOrLock();
-
- final ArrayList<KeyCacheObject> keys = new ArrayList<>(rows.size());
- final ArrayList<Message> vals = keysOnly ? null : new ArrayList<>(rows.size());
-
- for (Object row : rows) {
- if (keysOnly)
- keys.add(cctx.toCacheKeyObject(row));
- else {
- keys.add(cctx.toCacheKeyObject(((Map.Entry<?, ?>)row).getKey()));
-
- if (op.isInvoke())
- vals.add((Message)((Map.Entry<?, ?>)row).getValue());
- else
- vals.add(cctx.toCacheObject(((Map.Entry<?, ?>)row).getValue()));
- }
- }
-
- try {
- GridDhtTxRemote dhtTx = cctx.tm().tx(dhtVer);
-
- if (dhtTx == null) {
- dhtTx = new GridDhtTxRemote(cctx.shared(),
- cctx.localNodeId(),
- primaryId,
- lockVer,
- topVer,
- dhtVer,
- null,
- cctx.systemTx(),
- cctx.ioPolicy(),
- PESSIMISTIC,
- REPEATABLE_READ,
- false,
- tx.remainingTime(),
- -1,
- SecurityUtils.securitySubjectId(cctx),
- tx.taskNameHash(),
- false,
- null);
-
- dhtTx = cctx.tm().onCreated(null, dhtTx);
-
- if (dhtTx == null || !cctx.tm().onStarted(dhtTx)) {
- throw new IgniteTxRollbackCheckedException("Failed to update backup " +
- "(transaction has been completed): " + dhtVer);
- }
- }
-
- cctx.tm().txHandler().mvccEnlistBatch(dhtTx, cctx, it.operation(), keys, vals,
- null, null, -1);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
-
- return;
- }
-
- sendNextBatches(primaryId);
- }
-
- /**
- *
- * @param node Node.
- * @param batch Batch.
- * @param first First mapping flag.
- */
- private void sendBatch(ClusterNode node, Batch batch, boolean first) throws IgniteCheckedException {
- updateMappings(node);
-
- boolean clientFirst = first && cctx.localNode().isClient() && !topLocked && !tx.hasRemoteLocks();
-
- int batchId = batchCntr.incrementAndGet();
-
- if (node.isLocal())
- enlistLocal(batchId, node.id(), batch);
- else
- sendBatch(batchId, node.id(), batch, clientFirst);
- }
-
- /**
- * Send batch request to remote data node.
- *
- * @param batchId Id of a batch mini-future.
- * @param nodeId Node id.
- * @param batchFut Mini-future for the batch.
- * @param clientFirst {@code true} if originating node is client and it is a first request to any data node.
- */
- private void sendBatch(int batchId, UUID nodeId, Batch batchFut, boolean clientFirst) throws IgniteCheckedException {
- assert batchFut != null;
-
- GridNearTxEnlistRequest req = new GridNearTxEnlistRequest(cctx.cacheId(),
- threadId,
- futId,
- batchId,
- topVer,
- lockVer,
- null,
- clientFirst,
- remainingTime(),
- tx.remainingTime(),
- tx.taskNameHash(),
- batchFut.rows(),
- it.operation(),
- needRes,
- keepBinary,
- filter
- );
-
- sendRequest(req, nodeId);
- }
-
- /**
- * @param req Request.
- * @param nodeId Remote node ID
- * @throws IgniteCheckedException if failed to send.
- */
- private void sendRequest(GridCacheMessage req, UUID nodeId) throws IgniteCheckedException {
- cctx.io().send(nodeId, req, cctx.ioPolicy());
- }
-
- /**
- * Enlist batch of entries to the transaction on local node.
- *
- * @param batchId Id of a batch mini-future.
- * @param nodeId Node id.
- * @param batch Batch.
- */
- private void enlistLocal(int batchId, UUID nodeId, Batch batch) throws IgniteCheckedException {
- Collection<Object> rows = batch.rows();
-
- GridDhtTxEnlistFuture fut = new GridDhtTxEnlistFuture(nodeId,
- lockVer,
- null,
- futId,
- batchId,
- tx,
- remainingTime(),
- cctx,
- rows,
- it.operation(),
- filter,
- needRes,
- keepBinary);
-
- updateLocalFuture(fut);
-
- fut.listen(() -> {
- try {
- clearLocalFuture(fut);
-
- GridNearTxEnlistResponse res = fut.error() == null ? createResponse(fut) : null;
-
- if (checkResponse(nodeId, res, fut.error()))
- sendNextBatches(nodeId);
- }
- catch (IgniteCheckedException e) {
- checkResponse(nodeId, null, e);
- }
- finally {
- CU.unwindEvicts(cctx);
- }
- });
-
- fut.init();
- }
-
- /**
- * @param nodeId Sender node id.
- * @param res Response.
- */
- public void onResult(UUID nodeId, GridNearTxEnlistResponse res) {
- if (checkResponse(nodeId, res, res.error())) {
-
- Batch batch = batches.get(nodeId);
-
- if (batch != null && !F.isEmpty(batch.localBackupRows()) && res.dhtFutureId() != null)
- processBatchLocalBackupKeys(nodeId, batch.localBackupRows(), res.dhtVersion(), res.dhtFutureId());
- else
- sendNextBatches(nodeId);
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- if (batches.containsKey(nodeId)) {
- if (log.isDebugEnabled())
- log.debug("Found unacknowledged batch for left node [nodeId=" + nodeId + ", fut=" +
- this + ']');
-
- ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to enlist keys " +
- "(primary node left grid, retry transaction if possible) [node=" + nodeId + ']');
-
- topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
-
- onDone(topEx);
- }
-
- if (log.isDebugEnabled())
- log.debug("Future does not have mapping for left node (ignoring) [nodeId=" + nodeId +
- ", fut=" + this + ']');
-
- return false;
- }
-
- /**
- * @param nodeId Originating node ID.
- * @param res Response.
- * @param err Exception.
- * @return {@code True} if future was completed by this call.
- */
- public boolean checkResponse(UUID nodeId, GridNearTxEnlistResponse res, Throwable err) {
- assert res != null || err != null : this;
-
- if (err == null && res.error() != null)
- err = res.error();
-
- if (res != null)
- tx.mappings().get(nodeId).addBackups(res.newDhtNodes());
-
- if (err != null) {
- onDone(err);
-
- return false;
- }
-
- assert res != null;
-
- if (this.res != null || !RES_UPD.compareAndSet(this, null, res.result())) {
- GridCacheReturn res0 = this.res;
-
- if (res.result().invokeResult())
- res0.mergeEntryProcessResults(res.result());
- else if (res0.success() && !res.result().success())
- res0.success(false);
- }
-
- assert this.res != null && (this.res.emptyResult() || needRes || this.res.invokeResult() || !this.res.success());
-
- tx.hasRemoteLocks(true);
-
- return !isDone();
- }
-
- /** {@inheritDoc} */
- @Override public Set<UUID> pendingResponseNodes() {
- return batches.entrySet().stream()
- .filter(e -> e.getValue().ready())
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearTxEnlistFuture.class, this, super.toString());
- }
-
- /**
- * A batch of rows
- */
- private static class Batch {
- /** Node ID. */
- @GridToStringExclude
- private final ClusterNode node;
-
- /** Rows. */
- private final List<Object> rows = new ArrayList<>();
-
- /** Local backup rows. */
- private List<Object> locBkpRows;
-
- /** Readiness flag. Set when batch is full or no new rows are expected. */
- private boolean ready;
-
- /**
- * @param node Cluster node.
- */
- private Batch(ClusterNode node) {
- this.node = node;
- }
-
- /**
- * @return Node.
- */
- public ClusterNode node() {
- return node;
- }
-
- /**
- * Adds a row.
- *
- * @param row Row.
- * @param locBackup {@code true}, when the row key has local backup.
- */
- public void add(Object row, boolean locBackup) {
- rows.add(row);
-
- if (locBackup) {
- if (locBkpRows == null)
- locBkpRows = new ArrayList<>();
-
- locBkpRows.add(row);
- }
- }
-
- /**
- * @return number of rows.
- */
- public int size() {
- return rows.size();
- }
-
- /**
- * @return Collection of rows.
- */
- public Collection<Object> rows() {
- return rows;
- }
-
- /**
- * @return Collection of local backup rows.
- */
- public List<Object> localBackupRows() {
- return locBkpRows;
- }
-
- /**
- * @return Readiness flag.
- */
- public boolean ready() {
- return ready;
- }
-
- /**
- * Sets readiness flag.
- *
- * @param ready Flag value.
- */
- public void ready(boolean ready) {
- this.ready = ready;
- }
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java
deleted file mode 100644
index 1227335f78c..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java
+++ /dev/null
@@ -1,679 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CacheObjectContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.EnlistOperation;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Request to enlist into transaction and acquire locks for entries produced with Cache API operations.
- *
- * One request per batch of entries is used.
- */
-public class GridNearTxEnlistRequest extends GridCacheIdMessage implements GridCacheDeployable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private long threadId;
-
- /** Future id. */
- private IgniteUuid futId;
-
- /** */
- private boolean clientFirst;
-
- /** */
- private int miniId;
-
- /** */
- private AffinityTopologyVersion topVer;
-
- /** */
- private GridCacheVersion lockVer;
-
- /** Mvcc snapshot. */
- private MvccSnapshot mvccSnapshot;
-
- /** */
- private long timeout;
-
- /** */
- private long txTimeout;
-
- /** */
- private int taskNameHash;
-
- /** Rows to enlist. */
- @GridDirectTransient
- private Collection<Object> rows;
-
- /** Serialized rows keys. */
- @GridToStringExclude
- private KeyCacheObject[] keys;
-
- /** Serialized rows values. */
- @GridToStringExclude
- private Message[] values;
-
- /** Enlist operation. */
- private EnlistOperation op;
-
- /** Keep binary flag. */
- private boolean keepBinary;
-
- /** Filter. */
- @GridToStringExclude
- private CacheEntryPredicate filter;
-
- /** Need previous value flag. */
- private boolean needRes;
-
- /**
- * Default constructor.
- */
- public GridNearTxEnlistRequest() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param cacheId Cache id.
- * @param threadId Thread id.
- * @param futId Future id.
- * @param miniId Mini-future id.
- * @param topVer Topology version.
- * @param lockVer Lock version.
- * @param mvccSnapshot Mvcc snapshot.
- * @param clientFirst First client request flag.
- * @param timeout Timeout.
- * @param txTimeout Tx timeout.
- * @param taskNameHash Task name hash.
- * @param rows Rows.
- * @param op Operation.
- * @param filter Filter.
- */
- GridNearTxEnlistRequest(int cacheId,
- long threadId,
- IgniteUuid futId,
- int miniId,
- AffinityTopologyVersion topVer,
- GridCacheVersion lockVer,
- MvccSnapshot mvccSnapshot,
- boolean clientFirst,
- long timeout,
- long txTimeout,
- int taskNameHash,
- Collection<Object> rows,
- EnlistOperation op,
- boolean needRes,
- boolean keepBinary,
- @Nullable CacheEntryPredicate filter) {
- this.txTimeout = txTimeout;
- this.keepBinary = keepBinary;
- this.filter = filter;
- this.cacheId = cacheId;
- this.threadId = threadId;
- this.futId = futId;
- this.miniId = miniId;
- this.topVer = topVer;
- this.lockVer = lockVer;
- this.mvccSnapshot = mvccSnapshot;
- this.clientFirst = clientFirst;
- this.timeout = timeout;
- this.taskNameHash = taskNameHash;
- this.rows = rows;
- this.op = op;
- this.needRes = needRes;
- }
-
- /**
- * @return Thread id.
- */
- public long threadId() {
- return threadId;
- }
-
- /**
- * @return Future id.
- */
- public IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Mini future ID.
- */
- public int miniId() {
- return miniId;
- }
-
- /**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
- * @return Lock version.
- */
- public GridCacheVersion version() {
- return lockVer;
- }
-
- /**
- * @return MVCC snapshot.
- */
- public MvccSnapshot mvccSnapshot() {
- return mvccSnapshot;
- }
-
- /**
- * @return Timeout milliseconds.
- */
- public long timeout() {
- return timeout;
- }
-
- /**
- * @return Tx timeout milliseconds.
- */
- public long txTimeout() {
- return txTimeout;
- }
-
- /**
- * @return Task name hash.
- */
- public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
- * @return {@code True} if this is the first client request.
- */
- public boolean firstClientRequest() {
- return clientFirst;
- }
-
- /**
- * @return Collection of rows.
- */
- public Collection<Object> rows() {
- return rows;
- }
-
- /**
- * @return Operation.
- */
- public EnlistOperation operation() {
- return op;
- }
-
- /**
- * @return Need result flag.
- */
- public boolean needRes() {
- return needRes;
- }
-
- /**
- * @return Keep binary flag.
- */
- public boolean keepBinary() {
- return keepBinary;
- }
-
- /**
- * @return Filter.
- */
- public CacheEntryPredicate filter() {
- return filter;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
- CacheObjectContext objCtx = cctx.cacheObjectContext();
-
- if (rows != null && keys == null) {
- if (!addDepInfo && ctx.deploymentEnabled())
- addDepInfo = true;
-
- keys = new KeyCacheObject[rows.size()];
-
- int i = 0;
-
- boolean keysOnly = op.isDeleteOrLock();
-
- values = keysOnly ? null : new Message[keys.length];
-
- for (Object row : rows) {
- Object key, val = null;
-
- if (keysOnly)
- key = row;
- else {
- key = ((IgniteBiTuple)row).getKey();
- val = ((IgniteBiTuple)row).getValue();
- }
-
- assert key != null && (keysOnly || val != null) : "key=" + key + ", val=" + val;
-
- KeyCacheObject key0 = cctx.toCacheKeyObject(key);
-
- assert key0 != null;
-
- key0.prepareMarshal(objCtx);
-
- keys[i] = key0;
-
- if (!keysOnly) {
- if (op.isInvoke()) {
- GridInvokeValue val0 = (GridInvokeValue)val;
-
- prepareInvokeValue(cctx, val0);
-
- values[i] = val0;
- }
- else {
- if (addDepInfo)
- prepareObject(val, cctx);
-
- CacheObject val0 = cctx.toCacheObject(val);
-
- assert val0 != null;
-
- val0.prepareMarshal(objCtx);
-
- values[i] = val0;
- }
- }
-
- i++;
- }
- }
-
- if (filter != null)
- filter.prepareMarshal(cctx);
- }
-
- /**
- *
- * @param cctx Cache context.
- * @param val0 Invoke value.
- * @throws IgniteCheckedException If failed.
- */
- private void prepareInvokeValue(GridCacheContext cctx, GridInvokeValue val0) throws IgniteCheckedException {
- assert val0 != null && addDepInfo;
-
- prepareObject(val0.entryProcessor(), cctx.shared());
-
- for (Object o : val0.invokeArgs())
- prepareObject(o, cctx.shared());
-
- val0.prepareMarshal(cctx);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- if (keys != null) {
- rows = new ArrayList<>(keys.length);
-
- CacheObjectContext objCtx = ctx.cacheContext(cacheId).cacheObjectContext();
-
- for (int i = 0; i < keys.length; i++) {
- keys[i].finishUnmarshal(objCtx, ldr);
-
- if (op.isDeleteOrLock())
- rows.add(keys[i]);
- else {
- if (values[i] != null) {
- if (op.isInvoke())
- ((GridInvokeValue)values[i]).finishUnmarshal(ctx, ldr);
- else
- ((CacheObject)values[i]).finishUnmarshal(objCtx, ldr);
- }
-
- rows.add(new IgniteBiTuple<>(keys[i], values[i]));
- }
- }
-
- keys = null;
- values = null;
- }
-
- if (filter != null)
- filter.finishUnmarshal(ctx.cacheContext(cacheId), ldr);
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeBoolean("clientFirst", clientFirst))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeMessage("filter", filter))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeIgniteUuid("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeBoolean("keepBinary", keepBinary))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeObjectArray("keys", keys, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeMessage("lockVer", lockVer))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeInt("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeMessage("mvccSnapshot", mvccSnapshot))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeBoolean("needRes", needRes))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeLong("threadId", threadId))
- return false;
-
- writer.incrementState();
-
- case 16:
- if (!writer.writeLong("timeout", timeout))
- return false;
-
- writer.incrementState();
-
- case 17:
- if (!writer.writeAffinityTopologyVersion("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 18:
- if (!writer.writeLong("txTimeout", txTimeout))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeObjectArray("values", values, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- clientFirst = reader.readBoolean("clientFirst");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- filter = reader.readMessage("filter");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- futId = reader.readIgniteUuid("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- keepBinary = reader.readBoolean("keepBinary");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- keys = reader.readObjectArray("keys", MessageCollectionItemType.MSG, KeyCacheObject.class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- lockVer = reader.readMessage("lockVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- miniId = reader.readInt("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- mvccSnapshot = reader.readMessage("mvccSnapshot");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- needRes = reader.readBoolean("needRes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- byte opOrd;
-
- opOrd = reader.readByte("op");
-
- if (!reader.isLastRead())
- return false;
-
- op = EnlistOperation.fromOrdinal(opOrd);
-
- reader.incrementState();
-
- case 14:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- threadId = reader.readLong("threadId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
- timeout = reader.readLong("timeout");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 17:
- topVer = reader.readAffinityTopologyVersion("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 18:
- txTimeout = reader.readLong("txTimeout");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- values = reader.readObjectArray("values", MessageCollectionItemType.MSG, Message.class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridNearTxEnlistRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 20;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 159;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearTxEnlistRequest.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistResponse.java
deleted file mode 100644
index 590748608b3..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistResponse.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.distributed.dht.ExceptionAware;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * A response to {@link GridNearTxEnlistRequest}.
- */
-public class GridNearTxEnlistResponse extends GridCacheIdMessage implements ExceptionAware {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Future ID. */
- private IgniteUuid futId;
-
- /** Error. */
- @GridDirectTransient
- private Throwable err;
-
- /** Serialized error. */
- private byte[] errBytes;
-
- /** Mini future id. */
- private int miniId;
-
- /** Result. */
- private GridCacheReturn res;
-
- /** */
- private GridCacheVersion lockVer;
-
- /** */
- private GridCacheVersion dhtVer;
-
- /** */
- private IgniteUuid dhtFutId;
-
- /** New DHT nodes involved into transaction. */
- @GridDirectCollection(UUID.class)
- private Collection<UUID> newDhtNodes;
-
- /**
- * Default constructor.
- */
- public GridNearTxEnlistResponse() {
- // No-op.
- }
-
- /**
- * Constructor for normal result.
- *
- * @param cacheId Cache id.
- * @param futId Future id.
- * @param miniId Mini future id.
- * @param lockVer Lock version.
- * @param res Result.
- * @param dhtVer Dht version.
- * @param dhtFutId Dht future id.
- * @param newDhtNodes New DHT nodes involved into transaction.
- */
- public GridNearTxEnlistResponse(int cacheId,
- IgniteUuid futId,
- int miniId,
- GridCacheVersion lockVer,
- GridCacheReturn res,
- GridCacheVersion dhtVer,
- IgniteUuid dhtFutId,
- Set<UUID> newDhtNodes) {
- this.cacheId = cacheId;
- this.futId = futId;
- this.miniId = miniId;
- this.lockVer = lockVer;
- this.res = res;
- this.dhtVer = dhtVer;
- this.dhtFutId = dhtFutId;
- this.newDhtNodes = newDhtNodes;
- }
-
- /**
- * Constructor for error result.
- *
- * @param cacheId Cache id.
- * @param futId Future id.
- * @param miniId Mini future id.
- * @param lockVer Lock version.
- * @param err Error.
- */
- public GridNearTxEnlistResponse(int cacheId, IgniteUuid futId, int miniId, GridCacheVersion lockVer,
- Throwable err) {
- this.cacheId = cacheId;
- this.futId = futId;
- this.miniId = miniId;
- this.lockVer = lockVer;
- this.err = err;
- }
-
- /**
- * @return Loc version.
- */
- public GridCacheVersion version() {
- return lockVer;
- }
-
- /**
- * @return Future id.
- */
- public IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Mini future id.
- */
- public int miniId() {
- return miniId;
- }
-
- /**
- * @return Result.
- */
- public GridCacheReturn result() {
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public @Nullable Throwable error() {
- return err;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
-
- /**
- * @return Dht version.
- */
- public GridCacheVersion dhtVersion() {
- return dhtVer;
- }
-
- /**
- * @return Dht future id.
- */
- public IgniteUuid dhtFutureId() {
- return dhtFutId;
- }
-
- /**
- * @return New DHT nodes involved into transaction.
- */
- public Collection<UUID> newDhtNodes() {
- return newDhtNodes;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 12;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeIgniteUuid("dhtFutId", dhtFutId))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeMessage("dhtVer", dhtVer))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeByteArray("errBytes", errBytes))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeIgniteUuid("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeMessage("lockVer", lockVer))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeInt("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeCollection("newDhtNodes", newDhtNodes, MessageCollectionItemType.UUID))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeMessage("res", res))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- dhtFutId = reader.readIgniteUuid("dhtFutId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- dhtVer = reader.readMessage("dhtVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- errBytes = reader.readByteArray("errBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- futId = reader.readIgniteUuid("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- lockVer = reader.readMessage("lockVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- miniId = reader.readInt("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- newDhtNodes = reader.readCollection("newDhtNodes", MessageCollectionItemType.UUID);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- res = reader.readMessage("res");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridNearTxEnlistResponse.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 160;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
- if (err != null && errBytes == null)
- errBytes = U.marshal(ctx.marshaller(), err);
-
- if (res != null)
- res.prepareMarshal(cctx);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
- if (errBytes != null)
- err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
-
- if (res != null)
- res.finishUnmarshal(cctx, ldr);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearTxEnlistResponse.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
deleted file mode 100644
index f1f29c673bb..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.transactions.TransactionState;
-
-/**
- *
- */
-public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteInternalTx> implements NearTxFinishFuture {
- /** */
- private final NearTxFinishFuture finishFut;
-
- /**
- * @param finishFut Finish future.
- */
- GridNearTxFinishAndAckFuture(NearTxFinishFuture finishFut) {
- finishFut.listen(this::onFinishFutureDone);
-
- this.finishFut = finishFut;
- }
-
- /** {@inheritDoc} */
- @Override public boolean commit() {
- return finishFut.commit();
- }
-
- /** {@inheritDoc} */
- @Override public GridNearTxLocal tx() {
- return finishFut.tx();
- }
-
- /** {@inheritDoc} */
- @Override public void onNodeStop(IgniteCheckedException e) {
- finishFut.onNodeStop(e);
- }
-
- /** {@inheritDoc} */
- @Override public void finish(boolean commit, boolean clearThreadMap, boolean onTimeout) {
- finishFut.finish(commit, clearThreadMap, onTimeout);
- }
-
- /** */
- private void onFinishFutureDone(IgniteInternalFuture<IgniteInternalTx> fut) {
- GridNearTxLocal tx = tx(); Throwable err = fut.error();
-
- if (tx.state() != TransactionState.COMMITTED)
- onDone(tx, err);
- }
-
- /** */
- private Throwable addSuppressed(Throwable to, Throwable ex) {
- if (ex == null)
- return to;
- else if (to == null)
- return ex;
- else
- to.addSuppressed(ex);
-
- return to;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearTxFinishAndAckFuture.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryAbstractEnlistFuture.java
deleted file mode 100644
index 6bce203d606..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryAbstractEnlistFuture.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-
-/**
- *
- */
-public abstract class GridNearTxQueryAbstractEnlistFuture extends GridNearTxAbstractEnlistFuture<Long> {
- /**
- * @param cctx Cache context.
- * @param tx Transaction.
- * @param timeout Timeout.
- */
- protected GridNearTxQueryAbstractEnlistFuture(
- GridCacheContext<?, ?> cctx, GridNearTxLocal tx, long timeout) {
- super(cctx, tx, timeout, CU.longReducer());
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
deleted file mode 100644
index 0bcd8649086..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxAbstractEnlistFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.NearTxQueryEnlistResultHandler.createResponse;
-
-/**
- * Cache lock future.
- */
-@SuppressWarnings("ForLoopReplaceableByForEach")
-public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFuture {
- /** Involved cache ids. */
- private final int[] cacheIds;
-
- /** Partitions. */
- private final int[] parts;
-
- /** Schema name. */
- private final String schema;
-
- /** Query string. */
- private final String qry;
-
- /** Query parameters. */
- private final Object[] params;
-
- /** Flags. */
- private final int flags;
-
- /** Fetch page size. */
- private final int pageSize;
-
- /**
- * @param cctx Cache context.
- * @param tx Transaction.
- * @param cacheIds Involved cache ids.
- * @param parts Partitions.
- * @param schema Schema name.
- * @param qry Query string.
- * @param params Query parameters.
- * @param flags Flags.
- * @param pageSize Fetch page size.
- * @param timeout Timeout.
- */
- protected GridNearTxQueryEnlistFuture(
- GridCacheContext<?, ?> cctx, GridNearTxLocal tx, int[] cacheIds, int[] parts, String schema, String qry,
- Object[] params, int flags, int pageSize, long timeout) {
- super(cctx, tx, timeout);
-
- this.cacheIds = cacheIds;
- this.parts = parts;
- this.schema = schema;
- this.qry = qry;
- this.params = params;
- this.flags = flags;
- this.pageSize = pageSize;
- }
-
- /**
- * @param topLocked Topology locked flag.
- */
- @Override protected void map(final boolean topLocked) {
- try {
- Map<ClusterNode, IntArrayHolder> map; boolean locallyMapped = false;
-
- AffinityAssignment assignment = cctx.affinity().assignment(topVer);
-
- if (parts != null) {
- map = U.newHashMap(parts.length);
-
- for (int i = 0; i < parts.length; i++) {
- ClusterNode pNode = assignment.get(parts[i]).get(0);
-
- map.computeIfAbsent(pNode, n -> new IntArrayHolder()).add(parts[i]);
-
- updateMappings(pNode);
-
- if (!locallyMapped && pNode.isLocal())
- locallyMapped = true;
- }
- }
- else {
- Set<ClusterNode> nodes = assignment.primaryPartitionNodes();
-
- map = U.newHashMap(nodes.size());
-
- for (ClusterNode pNode : nodes) {
- map.put(pNode, null);
-
- updateMappings(pNode);
-
- if (!locallyMapped && pNode.isLocal())
- locallyMapped = true;
- }
- }
-
- if (map.isEmpty())
- throw new ClusterTopologyServerNotFoundException("Failed to find data nodes for cache (all partition " +
- "nodes left the grid). [fut=" + this + ']');
-
- int idx = 0; boolean first = true, clientFirst = false;
-
- GridDhtTxQueryEnlistFuture locFut = null;
-
- for (Map.Entry<ClusterNode, IntArrayHolder> entry : map.entrySet()) {
- MiniFuture mini; ClusterNode node = entry.getKey(); IntArrayHolder parts = entry.getValue();
-
- add(mini = new MiniFuture(node));
-
- if (node.isLocal()) {
- locFut = new GridDhtTxQueryEnlistFuture(
- cctx.localNode().id(),
- lockVer,
- null,
- futId,
- -(++idx), // The common tx logic expects non-zero mini-future ids (negative local and positive non-local).
- tx,
- cacheIds,
- parts == null ? null : parts.array(),
- schema,
- qry,
- params,
- flags,
- pageSize,
- remainingTime(),
- cctx);
-
- updateLocalFuture(locFut);
-
- locFut.listen(fut -> {
- assert fut.error() != null || fut.result() != null : fut;
-
- try {
- clearLocalFuture((GridDhtTxAbstractEnlistFuture)fut);
-
- GridNearTxQueryEnlistResponse res = fut.error() == null ? createResponse(fut) : null;
-
- mini.onResult(res, fut.error());
- }
- catch (IgniteCheckedException e) {
- mini.onResult(null, e);
- }
- finally {
- CU.unwindEvicts(cctx);
- }
- });
- }
- else {
- if (first) {
- clientFirst = cctx.localNode().isClient() && !topLocked && !tx.hasRemoteLocks();
-
- first = false;
- }
-
- GridNearTxQueryEnlistRequest req = new GridNearTxQueryEnlistRequest(
- cctx.cacheId(),
- threadId,
- futId,
- ++idx, // The common tx logic expects non-zero mini-future ids (negative local and positive non-local).
- topVer,
- lockVer,
- null,
- cacheIds,
- parts == null ? null : parts.array(),
- schema,
- qry,
- params,
- flags,
- pageSize,
- remainingTime(),
- tx.remainingTime(),
- tx.taskNameHash(),
- clientFirst
- );
-
- sendRequest(req, node.id());
- }
- }
-
- markInitialized();
-
- if (locFut != null)
- locFut.init();
- }
- catch (Throwable e) {
- onDone(e);
-
- if (e instanceof Error)
- throw (Error)e;
- }
- }
-
- /**
- *
- * @param req Request.
- * @param nodeId Remote node ID.
- * @throws IgniteCheckedException if failed to send.
- */
- private void sendRequest(GridCacheMessage req, UUID nodeId) throws IgniteCheckedException {
- cctx.io().send(nodeId, req, QUERY_POOL); // Process query requests in query pool.
- }
-
- /**
- * @param nodeId Left node ID
- * @return {@code True} if node was in the list.
- */
- @Override public synchronized boolean onNodeLeft(UUID nodeId) {
- for (IgniteInternalFuture<?> fut : futures()) {
- MiniFuture f = (MiniFuture)fut;
-
- if (f.node.id().equals(nodeId)) {
- if (log.isDebugEnabled())
- log.debug("Found mini-future for left node [nodeId=" + nodeId + ", mini=" + f + ", fut=" +
- this + ']');
-
- return f.onResult(null, newTopologyException(nodeId));
- }
- }
-
- if (log.isDebugEnabled())
- log.debug("Future does not have mapping for left node (ignoring) [nodeId=" + nodeId +
- ", fut=" + this + ']');
-
- return false;
- }
-
- /**
- * Finds pending mini future by the given mini ID.
- *
- * @param miniId Mini ID to find.
- * @return Mini future.
- */
- private MiniFuture miniFuture(int miniId) {
- compoundsReadLock();
-
- try {
- IgniteInternalFuture<Long> fut = future(Math.abs(miniId) - 1);
-
- return !fut.isDone() ? (MiniFuture)fut : null;
- }
- finally {
- compoundsReadUnlock();
- }
- }
-
- /**
- * Creates new topology exception for cases when primary node leaves grid during mapping.
- *
- * @param nodeId Node ID.
- * @return Topology exception with user-friendly message.
- */
- private ClusterTopologyCheckedException newTopologyException(UUID nodeId) {
- ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to enlist keys " +
- "(primary node left grid, retry transaction if possible) [node=" + nodeId + ']');
-
- topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
-
- return topEx;
- }
-
- /**
- * @param res Response.
- */
- public void onResult(GridNearTxQueryEnlistResponse res) {
- MiniFuture mini = miniFuture(res.miniId());
-
- if (mini != null)
- mini.onResult(res, null);
- }
-
- /** {@inheritDoc} */
- @Override public Set<UUID> pendingResponseNodes() {
- if (initialized() && !isDone()) {
- return futures().stream()
- .map(MiniFuture.class::cast)
- .filter(mini -> !mini.isDone())
- .map(mini -> mini.node.id())
- .collect(Collectors.toSet());
- }
-
- return Collections.emptySet();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearTxQueryEnlistFuture.class, this, super.toString());
- }
-
- /** */
- private class MiniFuture extends GridFutureAdapter<Long> {
- /** */
- private boolean completed;
-
- /** Node ID. */
- @GridToStringExclude
- private final ClusterNode node;
-
- /**
- * @param node Cluster node.
- */
- private MiniFuture(ClusterNode node) {
- this.node = node;
- }
-
- /**
- * @param res Response.
- * @param err Exception.
- * @return {@code True} if future was completed by this call.
- */
- public boolean onResult(GridNearTxQueryEnlistResponse res, Throwable err) {
- assert res != null || err != null : this;
-
- if (err == null && res.error() != null)
- err = res.error();
-
- synchronized (this) {
- if (completed)
- return false;
-
- completed = true;
- }
-
- if (res != null && res.removeMapping()) {
- GridDistributedTxMapping m = tx.mappings().get(node.id());
-
- assert m != null && m.empty();
-
- tx.removeMapping(node.id());
-
- if (node.isLocal())
- tx.colocatedLocallyMapped(false);
- }
- else if (res != null) {
- tx.mappings().get(node.id()).addBackups(res.newDhtNodes());
-
- if (res.result() > 0 && !node.isLocal())
- tx.hasRemoteLocks(true);
- }
-
- return err != null ? onDone(err) : onDone(res.result(), res.error());
- }
- }
-
- /** */
- private static class IntArrayHolder {
- /** */
- private int[] arr;
-
- /** */
- private int size;
-
- /** */
- void add(int i) {
- if (arr == null)
- arr = new int[4];
-
- if (arr.length == size)
- arr = Arrays.copyOf(arr, size << 1);
-
- arr[size++] = i;
- }
-
- /** */
- public int[] array() {
- if (arr == null)
- return null;
- else if (size == arr.length)
- return arr;
- else
- return Arrays.copyOf(arr, size);
- }
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistRequest.java
deleted file mode 100644
index b8f3a575288..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistRequest.java
+++ /dev/null
@@ -1,599 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.internal.binary.BinaryUtils;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class GridNearTxQueryEnlistRequest extends GridCacheIdMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private long threadId;
-
- /** */
- private IgniteUuid futId;
-
- /** */
- private boolean clientFirst;
-
- /** */
- private int miniId;
-
- /** */
- private AffinityTopologyVersion topVer;
-
- /** */
- private GridCacheVersion lockVer;
-
- /** */
- private MvccSnapshot mvccSnapshot;
-
- /** */
- private int[] cacheIds;
-
- /** */
- private int[] parts;
-
- /** */
- private String schema;
-
- /** */
- private String qry;
-
- /** */
- @GridDirectTransient
- private Object[] params;
-
- /** */
- private byte[] paramsBytes;
-
- /** */
- private int flags;
-
- /** */
- private long timeout;
-
- /** */
- private long txTimeout;
-
- /** */
- private int taskNameHash;
-
- /** */
- private int pageSize;
-
- /** */
- public GridNearTxQueryEnlistRequest() {
- // No-op.
- }
-
- /**
- * @param cacheId Cache id.
- * @param threadId Thread id.
- * @param futId Future id.
- * @param miniId Mini fture id.
- * @param topVer Topology version.
- * @param lockVer Lock version.
- * @param mvccSnapshot MVCC snspshot.
- * @param cacheIds Involved cache ids.
- * @param parts Partitions.
- * @param schema Schema name.
- * @param qry Query string.
- * @param params Query parameters.
- * @param flags Flags.
- * @param pageSize Fetch page size.
- * @param timeout Timeout milliseconds.
- * @param txTimeout Tx timeout milliseconds.
- * @param taskNameHash Task name hash.
- * @param clientFirst {@code True} if this is the first client request.
- */
- public GridNearTxQueryEnlistRequest(
- int cacheId,
- long threadId,
- IgniteUuid futId,
- int miniId,
- AffinityTopologyVersion topVer,
- GridCacheVersion lockVer,
- MvccSnapshot mvccSnapshot,
- int[] cacheIds,
- int[] parts,
- String schema,
- String qry,
- Object[] params,
- int flags,
- int pageSize,
- long timeout,
- long txTimeout,
- int taskNameHash,
- boolean clientFirst) {
- this.cacheIds = cacheIds;
- this.parts = parts;
- this.schema = schema;
- this.qry = qry;
- this.params = params;
- this.flags = flags;
- this.pageSize = pageSize;
- this.txTimeout = txTimeout;
- this.cacheId = cacheId;
- this.threadId = threadId;
- this.futId = futId;
- this.miniId = miniId;
- this.topVer = topVer;
- this.lockVer = lockVer;
- this.mvccSnapshot = mvccSnapshot;
- this.timeout = timeout;
- this.taskNameHash = taskNameHash;
- this.clientFirst = clientFirst;
- }
-
- /**
- * @return Thread id.
- */
- public long threadId() {
- return threadId;
- }
-
- /**
- * @return Future id.
- */
- public IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Mini future ID.
- */
- public int miniId() {
- return miniId;
- }
-
- /**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
- * @return Lock version.
- */
- public GridCacheVersion version() {
- return lockVer;
- }
-
- /**
- * @return MVCC snapshot.
- */
- public MvccSnapshot mvccSnapshot() {
- return mvccSnapshot;
- }
-
- /**
- * @return Involved cache ids.
- */
- public int[] cacheIds() {
- return cacheIds;
- }
-
- /**
- * @return Partitions.
- */
- public int[] partitions() {
- return parts;
- }
-
- /**
- * @return Schema name.
- */
- public String schemaName() {
- return schema;
- }
-
- /**
- * @return Query string.
- */
- public String query() {
- return qry;
- }
-
- /**
- * @return Query parameters.
- */
- public Object[] parameters() {
- return params;
- }
-
- /**
- * @return Flags.
- */
- public int flags() {
- return flags;
- }
-
- /**
- * @return Fetch page size.
- */
- public int pageSize() {
- return pageSize;
- }
-
- /**
- * @return Timeout milliseconds.
- */
- public long timeout() {
- return timeout;
- }
-
- /**
- * @return Tx timeout milliseconds.
- */
- public long txTimeout() {
- return txTimeout;
- }
-
- /**
- * @return Task name hash.
- */
- public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
- * @return {@code True} if this is the first client request.
- */
- public boolean firstClientRequest() {
- return clientFirst;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 21;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- if (params != null && paramsBytes == null)
- paramsBytes = U.marshal(ctx, params);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- if (paramsBytes == null || params != null)
- return;
-
- Marshaller m = ctx.kernalContext().config().getMarshaller();
-
- if (m instanceof BinaryMarshaller)
- // To avoid deserializing of enum types.
- params = BinaryUtils.rawArrayFromBinary(((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr));
- else
- params = U.unmarshal(ctx, paramsBytes, ldr);
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeIntArray("cacheIds", cacheIds))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeBoolean("clientFirst", clientFirst))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeInt("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeIgniteUuid("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeMessage("lockVer", lockVer))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeInt("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeMessage("mvccSnapshot", mvccSnapshot))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeInt("pageSize", pageSize))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeByteArray("paramsBytes", paramsBytes))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeIntArray("parts", parts))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeString("qry", qry))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeString("schema", schema))
- return false;
-
- writer.incrementState();
-
- case 16:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 17:
- if (!writer.writeLong("threadId", threadId))
- return false;
-
- writer.incrementState();
-
- case 18:
- if (!writer.writeLong("timeout", timeout))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeAffinityTopologyVersion("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeLong("txTimeout", txTimeout))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- cacheIds = reader.readIntArray("cacheIds");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- clientFirst = reader.readBoolean("clientFirst");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- flags = reader.readInt("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- futId = reader.readIgniteUuid("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- lockVer = reader.readMessage("lockVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- miniId = reader.readInt("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- mvccSnapshot = reader.readMessage("mvccSnapshot");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- pageSize = reader.readInt("pageSize");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- paramsBytes = reader.readByteArray("paramsBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- parts = reader.readIntArray("parts");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- qry = reader.readString("qry");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- schema = reader.readString("schema");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 17:
- threadId = reader.readLong("threadId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 18:
- timeout = reader.readLong("timeout");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- topVer = reader.readAffinityTopologyVersion("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
- txTimeout = reader.readLong("txTimeout");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridNearTxQueryEnlistRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 151;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearTxQueryEnlistRequest.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java
deleted file mode 100644
index 2715f89b408..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.distributed.dht.ExceptionAware;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * A response to {@link GridNearTxQueryEnlistRequest}.
- */
-public class GridNearTxQueryEnlistResponse extends GridCacheIdMessage implements ExceptionAware {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Future ID. */
- private IgniteUuid futId;
-
- /** Error. */
- @GridDirectTransient
- private Throwable err;
-
- /** Serialized error. */
- private byte[] errBytes;
-
- /** Mini future id. */
- private int miniId;
-
- /** Result. */
- private long res;
-
- /** Remove mapping flag. */
- private boolean removeMapping;
-
- /** */
- private GridCacheVersion lockVer;
-
- /** New DHT nodes involved into transaction. */
- @GridDirectCollection(UUID.class)
- private Collection<UUID> newDhtNodes;
-
- /**
- * Default constructor.
- */
- public GridNearTxQueryEnlistResponse() {
- // No-op.
- }
-
- /**
- * @param cacheId Cache id.
- * @param futId Future id.
- * @param miniId Mini future id.
- * @param lockVer Lock version.
- * @param err Error.
- */
- public GridNearTxQueryEnlistResponse(int cacheId, IgniteUuid futId, int miniId, GridCacheVersion lockVer, Throwable err) {
- this.cacheId = cacheId;
- this.futId = futId;
- this.miniId = miniId;
- this.lockVer = lockVer;
- this.err = err;
- }
-
- /**
- * @param cacheId Cache id.
- * @param futId Future id.
- * @param miniId Mini future id.
- * @param lockVer Lock version.
- * @param res Result.
- * @param removeMapping Remove mapping flag.
- * @param newDhtNodes New DHT nodes involved into transaction.
- */
- public GridNearTxQueryEnlistResponse(int cacheId, IgniteUuid futId, int miniId, GridCacheVersion lockVer, long res,
- boolean removeMapping, Set<UUID> newDhtNodes) {
- this.cacheId = cacheId;
- this.futId = futId;
- this.miniId = miniId;
- this.lockVer = lockVer;
- this.res = res;
- this.removeMapping = removeMapping;
- this.newDhtNodes = newDhtNodes;
- }
-
- /**
- * @return Loc version.
- */
- public GridCacheVersion version() {
- return lockVer;
- }
-
- /**
- * @return Future id.
- */
- public IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Mini future id.
- */
- public int miniId() {
- return miniId;
- }
-
- /**
- * @return Result.
- */
- public long result() {
- return res;
- }
-
- /**
- * @return New DHT nodes involved into transaction.
- */
- public Collection<UUID> newDhtNodes() {
- return newDhtNodes;
- }
-
- /**
- * @return Remove mapping flag.
- */
- public boolean removeMapping() {
- return removeMapping;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Throwable error() {
- return err;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 11;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeByteArray("errBytes", errBytes))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeIgniteUuid("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeMessage("lockVer", lockVer))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeInt("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeCollection("newDhtNodes", newDhtNodes, MessageCollectionItemType.UUID))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeBoolean("removeMapping", removeMapping))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeLong("res", res))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- errBytes = reader.readByteArray("errBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- futId = reader.readIgniteUuid("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- lockVer = reader.readMessage("lockVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- miniId = reader.readInt("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- newDhtNodes = reader.readCollection("newDhtNodes", MessageCollectionItemType.UUID);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- removeMapping = reader.readBoolean("removeMapping");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- res = reader.readLong("res");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridNearTxQueryEnlistResponse.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 152;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- if (err != null && errBytes == null)
- errBytes = U.marshal(ctx.marshaller(), err);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- if (errBytes != null)
- err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
- }
-
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearTxQueryEnlistResponse.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
deleted file mode 100644
index a1ad39b5655..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
+++ /dev/null
@@ -1,644 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryResultsEnlistFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.EnlistOperation;
-import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
-import org.apache.ignite.internal.processors.security.SecurityUtils;
-import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.internal.processors.cache.distributed.dht.NearTxQueryEnlistResultHandler.createResponse;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
-/**
- * A future tracking requests for remote nodes transaction enlisting and locking
- * of entries produced with complex DML queries requiring reduce step.
- */
-public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractEnlistFuture {
- /** */
- public static final int DFLT_BATCH_SIZE = 1024;
-
- /** Res field updater. */
- private static final AtomicLongFieldUpdater<GridNearTxQueryResultsEnlistFuture> RES_UPD =
- AtomicLongFieldUpdater.newUpdater(GridNearTxQueryResultsEnlistFuture.class, "res");
-
- /** SkipCntr field updater. */
- private static final AtomicIntegerFieldUpdater<GridNearTxQueryResultsEnlistFuture> SKIP_UPD =
- AtomicIntegerFieldUpdater.newUpdater(GridNearTxQueryResultsEnlistFuture.class, "skipCntr");
-
- /** Marker object. */
- private static final Object FINISHED = new Object();
-
- /** */
- private final UpdateSourceIterator<?> it;
-
- /** */
- private final int batchSize;
-
- /** */
- private final AtomicInteger batchCntr = new AtomicInteger();
-
- /** */
- @SuppressWarnings("unused")
- @GridToStringExclude
- private volatile int skipCntr;
-
- /** */
- @SuppressWarnings("unused")
- @GridToStringExclude
- private volatile long res;
-
- /** */
- private final Map<UUID, Batch> batches = new ConcurrentHashMap<>();
-
- /** Row extracted from iterator but not yet used. */
- private Object peek;
-
- /** Topology locked flag. */
- private boolean topLocked;
-
- /** */
- private final boolean sequential;
-
- /**
- * @param cctx Cache context.
- * @param tx Transaction.
- * @param timeout Timeout.
- * @param it Rows iterator.
- * @param batchSize Batch size.
- * @param sequential Sequential locking flag.
- */
- public GridNearTxQueryResultsEnlistFuture(GridCacheContext<?, ?> cctx,
- GridNearTxLocal tx,
- long timeout,
- UpdateSourceIterator<?> it,
- int batchSize,
- boolean sequential) {
- super(cctx, tx, timeout);
-
- this.it = it;
- this.batchSize = batchSize > 0 ? batchSize : DFLT_BATCH_SIZE;
- this.sequential = sequential;
- }
-
- /** {@inheritDoc} */
- @Override protected void map(boolean topLocked) {
- this.topLocked = topLocked;
-
- sendNextBatches(null);
- }
-
- /**
- * Continue iterating the data rows and form new batches.
- *
- * @param nodeId Node that is ready for a new batch.
- */
- private void sendNextBatches(@Nullable UUID nodeId) {
- try {
- Collection<Batch> next = continueLoop(nodeId);
-
- if (next == null)
- return;
-
- boolean first = (nodeId != null);
-
- for (Batch batch : next) {
- ClusterNode node = batch.node();
-
- sendBatch(node, batch, first);
-
- if (!node.isLocal())
- first = false;
- }
- }
- catch (Throwable e) {
- onDone(e);
-
- if (e instanceof Error)
- throw (Error)e;
- }
- }
-
- /**
- * Iterate data rows and form batches.
- *
- * @param nodeId Id of node acknowledged the last batch.
- * @return Collection of newly completed batches.
- * @throws IgniteCheckedException If failed.
- */
- private Collection<Batch> continueLoop(@Nullable UUID nodeId) throws IgniteCheckedException {
- if (nodeId != null)
- batches.remove(nodeId);
-
- // Accumulate number of batches released since we got here.
- // Let only one thread do the looping.
- if (isDone() || SKIP_UPD.getAndIncrement(this) != 0)
- return null;
-
- ArrayList<Batch> res = null; Batch batch = null;
-
- boolean flush = false;
-
- EnlistOperation op = it.operation();
-
- while (true) {
- while (hasNext0()) {
- checkCompleted();
-
- Object cur = next0();
-
- KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((Map.Entry<?, ?>)cur).getKey());
-
- ClusterNode node = cctx.affinity().primaryByPartition(key.partition(), topVer);
-
- if (node == null)
- throw new ClusterTopologyServerNotFoundException("Failed to get primary node " +
- "[topVer=" + topVer + ", key=" + key + ']');
-
- if (!sequential)
- batch = batches.get(node.id());
- else if (batch != null && !batch.node().equals(node))
- res = markReady(res, batch);
-
- if (batch == null)
- batches.put(node.id(), batch = new Batch(node));
-
- if (batch.ready()) {
- // Can't advance further at the moment.
- batch = null;
-
- peek = cur;
-
- it.beforeDetach();
-
- flush = true;
-
- break;
- }
-
- batch.add(op.isDeleteOrLock() ? key : cur, !node.isLocal() && isLocalBackup(op, key));
-
- if (batch.size() == batchSize)
- res = markReady(res, batch);
- }
-
- if (SKIP_UPD.decrementAndGet(this) == 0)
- break;
-
- skipCntr = 1;
- }
-
- if (flush)
- return res;
-
- // No data left - flush incomplete batches.
- for (Batch batch0 : batches.values()) {
- if (!batch0.ready()) {
- if (res == null)
- res = new ArrayList<>();
-
- batch0.ready(true);
-
- res.add(batch0);
- }
- }
-
- if (batches.isEmpty())
- onDone(this.res);
-
- return res;
- }
-
- /** */
- private Object next0() {
- if (!hasNext0())
- throw new NoSuchElementException();
-
- Object cur;
-
- if ((cur = peek) != null)
- peek = null;
- else
- cur = it.next();
-
- return cur;
- }
-
- /** */
- private boolean hasNext0() {
- if (peek == null && !it.hasNext())
- peek = FINISHED;
-
- return peek != FINISHED;
- }
-
- /** */
- private boolean isLocalBackup(EnlistOperation op, KeyCacheObject key) {
- if (!cctx.affinityNode() || op == EnlistOperation.LOCK)
- return false;
- else if (cctx.isReplicated())
- return true;
-
- return cctx.topology().nodes(key.partition(), tx.topologyVersion()).contains(cctx.localNode());
- }
-
- /** */
- private ArrayList<Batch> markReady(ArrayList<Batch> batches, Batch batch) {
- if (!batch.ready()) {
- batch.ready(true);
-
- if (batches == null)
- batches = new ArrayList<>();
-
- batches.add(batch);
- }
-
- return batches;
- }
-
- /**
- * @param primaryId Primary node id.
- * @param rows Rows.
- * @param dhtVer Dht version assigned at primary node.
- * @param dhtFutId Dht future id assigned at primary node.
- */
- private void processBatchLocalBackupKeys(UUID primaryId, List<Object> rows, GridCacheVersion dhtVer,
- IgniteUuid dhtFutId) {
- assert dhtVer != null;
- assert dhtFutId != null;
-
- EnlistOperation op = it.operation();
-
- assert op != EnlistOperation.LOCK;
-
- boolean keysOnly = op.isDeleteOrLock();
-
- final ArrayList<KeyCacheObject> keys = new ArrayList<>(rows.size());
- final ArrayList<Message> vals = keysOnly ? null : new ArrayList<>(rows.size());
-
- for (Object row : rows) {
- if (keysOnly)
- keys.add(cctx.toCacheKeyObject(row));
- else {
- keys.add(cctx.toCacheKeyObject(((Map.Entry<?, ?>)row).getKey()));
- vals.add(cctx.toCacheObject(((Map.Entry<?, ?>)row).getValue()));
- }
- }
-
- try {
- GridDhtTxRemote dhtTx = cctx.tm().tx(dhtVer);
-
- if (dhtTx == null) {
- dhtTx = new GridDhtTxRemote(cctx.shared(),
- cctx.localNodeId(),
- primaryId,
- lockVer,
- topVer,
- dhtVer,
- null,
- cctx.systemTx(),
- cctx.ioPolicy(),
- PESSIMISTIC,
- REPEATABLE_READ,
- false,
- tx.remainingTime(),
- -1,
- SecurityUtils.securitySubjectId(cctx),
- tx.taskNameHash(),
- false,
- tx.label());
-
- dhtTx = cctx.tm().onCreated(null, dhtTx);
-
- if (dhtTx == null || !cctx.tm().onStarted(dhtTx)) {
- throw new IgniteTxRollbackCheckedException("Failed to update backup " +
- "(transaction has been completed): " + dhtVer);
- }
- }
-
- cctx.tm().txHandler().mvccEnlistBatch(dhtTx, cctx, it.operation(), keys, vals,
- null, null, -1);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
-
- return;
- }
-
- sendNextBatches(primaryId);
- }
-
- /**
- *
- * @param node Node.
- * @param batch Batch.
- * @param first First mapping flag.
- */
- private void sendBatch(ClusterNode node, Batch batch, boolean first) throws IgniteCheckedException {
- updateMappings(node);
-
- boolean clientFirst = first && cctx.localNode().isClient() && !topLocked && !tx.hasRemoteLocks();
-
- int batchId = batchCntr.incrementAndGet();
-
- if (node.isLocal())
- enlistLocal(batchId, node.id(), batch);
- else
- sendBatch(batchId, node.id(), batch, clientFirst);
- }
-
- /**
- * Send batch request to remote data node.
- *
- * @param batchId Id of a batch mini-future.
- * @param nodeId Node id.
- * @param batchFut Mini-future for the batch.
- * @param clientFirst {@code true} if originating node is client and it is a first request to any data node.
- */
- private void sendBatch(int batchId, UUID nodeId, Batch batchFut, boolean clientFirst) throws IgniteCheckedException {
- assert batchFut != null;
-
- GridNearTxQueryResultsEnlistRequest req = new GridNearTxQueryResultsEnlistRequest(cctx.cacheId(),
- threadId,
- futId,
- batchId,
- topVer,
- lockVer,
- null,
- clientFirst,
- remainingTime(),
- tx.remainingTime(),
- tx.taskNameHash(),
- batchFut.rows(),
- it.operation());
-
- sendRequest(req, nodeId);
- }
-
- /**
- *
- * @param req Request.
- * @param nodeId Remote node ID
- * @throws IgniteCheckedException if failed to send.
- */
- private void sendRequest(GridCacheMessage req, UUID nodeId) throws IgniteCheckedException {
- cctx.io().send(nodeId, req, cctx.ioPolicy());
- }
-
- /**
- * Enlist batch of entries to the transaction on local node.
- *
- * @param batchId Id of a batch mini-future.
- * @param nodeId Node id.
- * @param batch Batch.
- */
- private void enlistLocal(int batchId, UUID nodeId, Batch batch) throws IgniteCheckedException {
- Collection<Object> rows = batch.rows();
-
- GridDhtTxQueryResultsEnlistFuture fut = new GridDhtTxQueryResultsEnlistFuture(nodeId,
- lockVer,
- null,
- futId,
- batchId,
- tx,
- remainingTime(),
- cctx,
- rows,
- it.operation());
-
- updateLocalFuture(fut);
-
- fut.listen(() -> {
- assert fut.error() != null || fut.result() != null : fut;
-
- try {
- clearLocalFuture(fut);
-
- GridNearTxQueryResultsEnlistResponse res = fut.error() == null ? createResponse(fut) : null;
-
- if (checkResponse(nodeId, res, fut.error()))
- sendNextBatches(nodeId);
- }
- catch (IgniteCheckedException e) {
- checkResponse(nodeId, null, e);
- }
- finally {
- CU.unwindEvicts(cctx);
- }
- });
-
- fut.init();
- }
-
- /**
- * @param nodeId Sender node id.
- * @param res Response.
- */
- public void onResult(UUID nodeId, GridNearTxQueryResultsEnlistResponse res) {
- if (checkResponse(nodeId, res, res.error())) {
-
- Batch batch = batches.get(nodeId);
-
- if (batch != null && !F.isEmpty(batch.localBackupRows()) && res.dhtFutureId() != null)
- processBatchLocalBackupKeys(nodeId, batch.localBackupRows(), res.dhtVersion(), res.dhtFutureId());
- else
- sendNextBatches(nodeId);
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- if (batches.containsKey(nodeId)) {
- if (log.isDebugEnabled())
- log.debug("Found unacknowledged batch for left node [nodeId=" + nodeId + ", fut=" +
- this + ']');
-
- ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to enlist keys " +
- "(primary node left grid, retry transaction if possible) [node=" + nodeId + ']');
-
- topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
-
- onDone(topEx);
- }
-
- if (log.isDebugEnabled())
- log.debug("Future does not have mapping for left node (ignoring) [nodeId=" + nodeId +
- ", fut=" + this + ']');
-
- return false;
- }
-
- /**
- * @param nodeId Originating node ID.
- * @param res Response.
- * @param err Exception.
- * @return {@code True} if future was completed by this call.
- */
- public boolean checkResponse(UUID nodeId, GridNearTxQueryResultsEnlistResponse res, Throwable err) {
- assert res != null || err != null : this;
-
- if (err == null && res.error() != null)
- err = res.error();
-
- if (res != null)
- tx.mappings().get(nodeId).addBackups(res.newDhtNodes());
-
- if (err != null) {
- onDone(err);
-
- return false;
- }
-
- assert res != null;
-
- RES_UPD.getAndAdd(this, res.result());
-
- tx.hasRemoteLocks(true);
-
- return !isDone();
- }
-
- /** {@inheritDoc} */
- @Override public Set<UUID> pendingResponseNodes() {
- return batches.entrySet().stream()
- .filter(e -> e.getValue().ready())
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearTxQueryResultsEnlistFuture.class, this, super.toString());
- }
-
- /**
- * A batch of rows
- */
- private static class Batch {
- /** Node ID. */
- @GridToStringExclude
- private final ClusterNode node;
-
- /** Rows. */
- private final ArrayList<Object> rows = new ArrayList<>();
-
- /** Local backup rows. */
- private ArrayList<Object> locBkpRows;
-
- /** Readiness flag. Set when batch is full or no new rows are expected. */
- private boolean ready;
-
- /**
- * @param node Cluster node.
- */
- private Batch(ClusterNode node) {
- this.node = node;
- }
-
- /**
- * @return Node.
- */
- public ClusterNode node() {
- return node;
- }
-
- /**
- * Adds a row.
- *
- * @param row Row.
- * @param locBackup {@code true}, when the row key has local backup.
- */
- public void add(Object row, boolean locBackup) {
- rows.add(row);
-
- if (locBackup) {
- if (locBkpRows == null)
- locBkpRows = new ArrayList<>();
-
- locBkpRows.add(row);
- }
- }
-
- /**
- * @return number of rows.
- */
- public int size() {
- return rows.size();
- }
-
- /**
- * @return Collection of rows.
- */
- public Collection<Object> rows() {
- return rows;
- }
-
- /**
- * @return Collection of local backup rows.
- */
- public List<Object> localBackupRows() {
- return locBkpRows;
- }
-
- /**
- * @return Readiness flag.
- */
- public boolean ready() {
- return ready;
- }
-
- /**
- * Sets readiness flag.
- *
- * @param ready Flag value.
- */
- public void ready(boolean ready) {
- this.ready = ready;
- }
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistRequest.java
deleted file mode 100644
index 94fba118b34..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistRequest.java
+++ /dev/null
@@ -1,550 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CacheObjectContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.EnlistOperation;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * Request to enlist into transaction and acquire locks for entries produced
- * with complex DML queries with reducer step.
- *
- * One request per batch of entries is used.
- */
-public class GridNearTxQueryResultsEnlistRequest extends GridCacheIdMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private long threadId;
-
- /** */
- private IgniteUuid futId;
-
- /** */
- private boolean clientFirst;
-
- /** */
- private int miniId;
-
- /** */
- private AffinityTopologyVersion topVer;
-
- /** */
- private GridCacheVersion lockVer;
-
- /** */
- private MvccSnapshot mvccSnapshot;
-
- /** */
- private long timeout;
-
- /** */
- private long txTimeout;
-
- /** */
- private int taskNameHash;
-
- /** */
- @GridDirectTransient
- private Collection<Object> rows;
-
- /** */
- @GridToStringExclude
- private KeyCacheObject[] keys;
-
- /** */
- @GridToStringExclude
- private CacheObject[] values;
-
- /** */
- private EnlistOperation op;
-
- /**
- * Default constructor.
- */
- public GridNearTxQueryResultsEnlistRequest() {
- // No-op.
- }
-
- /**
- * @param cacheId Cache id.
- * @param threadId Thread id.
- * @param futId Future id.
- * @param miniId Mini-future id.
- * @param topVer Topology version.
- * @param lockVer Lock version.
- * @param mvccSnapshot Mvcc snapshot.
- * @param clientFirst First client request flag.
- * @param timeout Timeout.
- * @param txTimeout Tx timeout.
- * @param taskNameHash Task name hash.
- * @param rows Rows.
- * @param op Operation.
- */
- GridNearTxQueryResultsEnlistRequest(int cacheId,
- long threadId,
- IgniteUuid futId,
- int miniId,
- AffinityTopologyVersion topVer,
- GridCacheVersion lockVer,
- MvccSnapshot mvccSnapshot,
- boolean clientFirst,
- long timeout,
- long txTimeout, int taskNameHash,
- Collection<Object> rows,
- EnlistOperation op) {
- this.txTimeout = txTimeout;
- this.cacheId = cacheId;
- this.threadId = threadId;
- this.futId = futId;
- this.miniId = miniId;
- this.topVer = topVer;
- this.lockVer = lockVer;
- this.mvccSnapshot = mvccSnapshot;
- this.clientFirst = clientFirst;
- this.timeout = timeout;
- this.taskNameHash = taskNameHash;
- this.rows = rows;
- this.op = op;
- }
-
- /**
- * @return Thread id.
- */
- public long threadId() {
- return threadId;
- }
-
- /**
- * @return Future id.
- */
- public IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Mini future ID.
- */
- public int miniId() {
- return miniId;
- }
-
- /**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
- * @return Lock version.
- */
- public GridCacheVersion version() {
- return lockVer;
- }
-
- /**
- * @return MVCC snapshot.
- */
- public MvccSnapshot mvccSnapshot() {
- return mvccSnapshot;
- }
-
- /**
- * @return Timeout milliseconds.
- */
- public long timeout() {
- return timeout;
- }
-
- /**
- * @return Tx timeout milliseconds.
- */
- public long txTimeout() {
- return txTimeout;
- }
-
- /**
- * @return Task name hash.
- */
- public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
- * @return {@code True} if this is the first client request.
- */
- public boolean firstClientRequest() {
- return clientFirst;
- }
-
- /**
- * @return Collection of rows.
- */
- public Collection<Object> rows() {
- return rows;
- }
-
- /**
- * @return Operation.
- */
- public EnlistOperation operation() {
- return op;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
- CacheObjectContext objCtx = cctx.cacheObjectContext();
-
- if (rows != null && keys == null) {
- keys = new KeyCacheObject[rows.size()];
-
- int i = 0;
-
- boolean keysOnly = op.isDeleteOrLock();
-
- values = keysOnly ? null : new CacheObject[keys.length];
-
- for (Object row : rows) {
- Object key, val = null;
-
- if (keysOnly)
- key = row;
- else {
- key = ((IgniteBiTuple)row).getKey();
- val = ((IgniteBiTuple)row).getValue();
- }
-
- assert key != null && (keysOnly || val != null) : "key=" + key + ", val=" + val;
-
- KeyCacheObject key0 = cctx.toCacheKeyObject(key);
-
- assert key0 != null;
-
- key0.prepareMarshal(objCtx);
-
- keys[i] = key0;
-
- if (!keysOnly) {
- CacheObject val0 = cctx.toCacheObject(val);
-
- assert val0 != null;
-
- val0.prepareMarshal(objCtx);
-
- values[i] = val0;
- }
-
- i++;
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- if (keys != null) {
- rows = new ArrayList<>(keys.length);
-
- CacheObjectContext objCtx = ctx.cacheContext(cacheId).cacheObjectContext();
-
- for (int i = 0; i < keys.length; i++) {
- keys[i].finishUnmarshal(objCtx, ldr);
-
- if (op.isDeleteOrLock())
- rows.add(keys[i]);
- else {
- if (values[i] != null)
- values[i].finishUnmarshal(objCtx, ldr);
-
- rows.add(new IgniteBiTuple<>(keys[i], values[i]));
- }
- }
-
- keys = null;
- values = null;
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeBoolean("clientFirst", clientFirst))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeIgniteUuid("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeObjectArray("keys", keys, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeMessage("lockVer", lockVer))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeInt("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeMessage("mvccSnapshot", mvccSnapshot))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeLong("threadId", threadId))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeLong("timeout", timeout))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeAffinityTopologyVersion("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeLong("txTimeout", txTimeout))
- return false;
-
- writer.incrementState();
-
- case 16:
- if (!writer.writeObjectArray("values", values, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- clientFirst = reader.readBoolean("clientFirst");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- futId = reader.readIgniteUuid("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- keys = reader.readObjectArray("keys", MessageCollectionItemType.MSG, KeyCacheObject.class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- lockVer = reader.readMessage("lockVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- miniId = reader.readInt("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- mvccSnapshot = reader.readMessage("mvccSnapshot");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- byte opOrd;
-
- opOrd = reader.readByte("op");
-
- if (!reader.isLastRead())
- return false;
-
- op = EnlistOperation.fromOrdinal(opOrd);
-
- reader.incrementState();
-
- case 11:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- threadId = reader.readLong("threadId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- timeout = reader.readLong("timeout");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- topVer = reader.readAffinityTopologyVersion("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- txTimeout = reader.readLong("txTimeout");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
- values = reader.readObjectArray("values", MessageCollectionItemType.MSG, CacheObject.class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridNearTxQueryResultsEnlistRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 17;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 153;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearTxQueryResultsEnlistRequest.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java
deleted file mode 100644
index 2a0c63242f8..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import java.nio.ByteBuffer;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * A response to {@link GridNearTxQueryResultsEnlistRequest}.
- */
-public class GridNearTxQueryResultsEnlistResponse extends GridNearTxQueryEnlistResponse {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private GridCacheVersion dhtVer;
-
- /** */
- private IgniteUuid dhtFutId;
-
- /**
- * Default-constructor.
- */
- public GridNearTxQueryResultsEnlistResponse() {
- // No-op.
- }
-
- /**
- * @param cacheId Cache id.
- * @param futId Future id.
- * @param miniId Mini future id.
- * @param lockVer Lock version.
- * @param res Result.
- * @param dhtFutId Dht future id.
- * @param dhtVer Dht version.
- * @param newDhtNodes New DHT nodes involved into transaction.
- */
- public GridNearTxQueryResultsEnlistResponse(int cacheId,
- IgniteUuid futId,
- int miniId,
- GridCacheVersion lockVer,
- long res,
- GridCacheVersion dhtVer,
- IgniteUuid dhtFutId,
- Set<UUID> newDhtNodes) {
- super(cacheId, futId, miniId, lockVer, res, false, newDhtNodes);
-
- this.dhtVer = dhtVer;
- this.dhtFutId = dhtFutId;
- }
-
- /**
- * @param cacheId Cache id.
- * @param futId Future id.
- * @param miniId Mini future id.
- * @param lockVer Lock version.
- * @param err Error.
- */
- public GridNearTxQueryResultsEnlistResponse(int cacheId,
- IgniteUuid futId,
- int miniId,
- GridCacheVersion lockVer,
- Throwable err) {
- super(cacheId, futId, miniId, lockVer, err);
- }
-
- /**
- * @return Dht version.
- */
- public GridCacheVersion dhtVersion() {
- return dhtVer;
- }
-
- /**
- * @return Dht future id.
- */
- public IgniteUuid dhtFutureId() {
- return dhtFutId;
- }
-
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 13;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 11:
- if (!writer.writeIgniteUuid("dhtFutId", dhtFutId))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeMessage("dhtVer", dhtVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 11:
- dhtFutId = reader.readIgniteUuid("dhtFutId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- dhtVer = reader.readMessage("dhtVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridNearTxQueryResultsEnlistResponse.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 154;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearTxQueryResultsEnlistResponse.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index e48331f8265..8df92671e34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -23,7 +23,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
-import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -35,7 +34,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -45,14 +43,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
@@ -63,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
@@ -77,13 +71,9 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxRemote;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotAwareMessage;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
-import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.EnlistOperation;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
@@ -96,8 +86,6 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFutureCancelledException;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
@@ -1869,159 +1857,6 @@ public class IgniteTxHandler {
return null;
}
- /**
- * Writes updated values on the backup node.
- *
- * @param tx Transaction.
- * @param ctx Cache context.
- * @param op Operation.
- * @param keys Keys.
- * @param vals Values sent from the primary node.
- * @param snapshot Mvcc snapshot.
- * @param batchNum Batch number.
- * @param futId Future id.
- * @throws IgniteCheckedException If failed.
- */
- public void mvccEnlistBatch(GridDhtTxRemote tx, GridCacheContext ctx, EnlistOperation op, List<KeyCacheObject> keys,
- List<Message> vals, MvccSnapshot snapshot, IgniteUuid futId, int batchNum) throws IgniteCheckedException {
- assert keys != null && (vals == null || vals.size() == keys.size());
- assert tx != null;
-
- GridDhtCacheAdapter dht = ctx.dht();
-
- tx.addActiveCache(ctx, false);
-
- for (int i = 0; i < keys.size(); i++) {
- KeyCacheObject key = keys.get(i);
-
- assert key != null;
-
- int part = ctx.affinity().partition(key);
-
- try {
- GridDhtLocalPartition locPart = ctx.topology().localPartition(part, tx.topologyVersion(), false);
-
- if (locPart != null && locPart.reserve()) {
- try {
- // Skip renting partitions.
- if (locPart.state() == RENTING) {
- tx.addInvalidPartition(ctx.cacheId(), part);
-
- continue;
- }
-
- CacheObject val = null;
- EntryProcessor entryProc = null;
- Object[] invokeArgs = null;
-
- Message val0 = vals != null ? vals.get(i) : null;
-
- CacheEntryInfoCollection entries =
- val0 instanceof CacheEntryInfoCollection ? (CacheEntryInfoCollection)val0 : null;
-
- if (entries == null && !op.isDeleteOrLock() && !op.isInvoke())
- val = (val0 instanceof CacheObject) ? (CacheObject)val0 : null;
-
- if (entries == null && op.isInvoke()) {
- assert val0 instanceof GridInvokeValue;
-
- GridInvokeValue invokeVal = (GridInvokeValue)val0;
-
- entryProc = invokeVal.entryProcessor();
- invokeArgs = invokeVal.invokeArgs();
- }
-
- assert entries != null || entryProc != null || !op.isInvoke() : "entryProc=" + entryProc + ", op=" + op;
-
- GridDhtCacheEntry entry = dht.entryExx(key, tx.topologyVersion());
-
- GridCacheUpdateTxResult updRes;
-
- while (true) {
- ctx.shared().database().checkpointReadLock();
-
- try {
- if (entries == null) {
- switch (op) {
- case DELETE:
- updRes = entry.mvccRemove(
- tx,
- ctx.localNodeId(),
- tx.topologyVersion(),
- snapshot,
- false,
- false,
- null,
- false);
-
- break;
-
- case INSERT:
- case TRANSFORM:
- case UPSERT:
- case UPDATE:
- updRes = entry.mvccSet(
- tx,
- ctx.localNodeId(),
- val,
- entryProc,
- invokeArgs,
- 0,
- tx.topologyVersion(),
- snapshot,
- op.cacheOperation(),
- false,
- false,
- false,
- null,
- false,
- false);
-
- break;
-
- default:
- throw new IgniteSQLException("Cannot acquire lock for operation [op= "
- + op + "]" + "Operation is unsupported at the moment ",
- IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
- }
- }
- else {
- updRes = entry.mvccUpdateRowsWithPreloadInfo(tx,
- ctx.localNodeId(),
- tx.topologyVersion(),
- entries.infos(),
- op.cacheOperation(),
- snapshot,
- futId,
- batchNum);
- }
-
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- entry = dht.entryExx(key);
- }
- finally {
- ctx.shared().database().checkpointReadUnlock();
- }
- }
-
- assert updRes.updateFuture() == null : "Entry should not be locked on the backup";
- }
-
- finally {
- locPart.release();
- }
- }
- else
- tx.addInvalidPartition(ctx.cacheId(), part);
- }
- catch (GridDhtInvalidPartitionException e) {
- tx.addInvalidPartition(ctx.cacheId(), e.partition());
- }
- }
- }
-
/**
* @param cacheCtx Context.
* @param key Key
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 1547cb25faa..ed89cf1a235 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1514,11 +1514,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
}
- /** {@inheritDoc} */
- @Override public void touchPartition(int cacheId, int partId) {
- txState.touchPartition(cacheId, partId);
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return GridToStringBuilder.toString(IgniteTxLocalAdapter.class, this, "super", super.toString(),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 651be603d37..b61b1a9a629 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -56,12 +56,4 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
* @throws IgniteCheckedException If finish failed.
*/
public boolean localFinish(boolean commit, boolean clearThreadMap) throws IgniteCheckedException;
-
- /**
- * Remembers that particular cache partition was touched by current tx.
- *
- * @param cacheId Cache id.
- * @param partId Partition id.
- */
- public void touchPartition(int cacheId, int partId);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
index 5ed67d1b1ae..99ec983840b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
@@ -17,9 +17,6 @@
package org.apache.ignite.internal.processors.cache.transactions;
-import java.util.Map;
-import java.util.Set;
-
/**
*
*/
@@ -50,19 +47,6 @@ public interface IgniteTxLocalState extends IgniteTxState {
*/
public void seal();
- /**
- * @return Cache partitions touched by current tx.
- */
- public Map<Integer, Set<Integer>> touchedPartitions();
-
- /**
- * Remembers that particular cache partition was touched by current tx.
- *
- * @param cacheId Cache id.
- * @param partId Partition id.
- */
- public void touchPartition(int cacheId, int partId);
-
/**
* @return Recovery mode flag.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java
index 8a41ba1f284..79ffba7259b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java
@@ -17,12 +17,6 @@
package org.apache.ignite.internal.processors.cache.transactions;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -30,12 +24,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
*
*/
public abstract class IgniteTxLocalStateAdapter implements IgniteTxLocalState {
- /** */
- private static final Function<Integer, Set<Integer>> CREATE_INT_SET = k -> new HashSet<>();
-
- /** */
- private Map<Integer, Set<Integer>> touchedParts;
-
/**
* @param cacheCtx Cache context.
* @param tx Transaction.
@@ -51,19 +39,4 @@ public abstract class IgniteTxLocalStateAdapter implements IgniteTxLocalState {
cacheCtx.cache().metrics0().onTxRollback(durationNanos);
}
}
-
- /** {@inheritDoc} */
- @Override public Map<Integer, Set<Integer>> touchedPartitions() {
- Map<Integer, Set<Integer>> parts = touchedParts;
-
- return parts != null ? Collections.unmodifiableMap(parts) : null;
- }
-
- /** {@inheritDoc} */
- @Override public void touchPartition(int cacheId, int partId) {
- if (touchedParts == null)
- touchedParts = new HashMap<>();
-
- touchedParts.computeIfAbsent(cacheId, CREATE_INT_SET).add(partId);
- }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
index 15c92f92549..cd13883e51b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -40,9 +39,6 @@ public class TxCounters {
/** Final update counters for cache partitions in the end of transaction */
private volatile Map<Integer, PartitionUpdateCountersMessage> updCntrs;
- /** Counter tracking number of entries locked by tx. */
- private final AtomicInteger lockCntr = new AtomicInteger();
-
/**
* Accumulates size change for cache partition.
*
@@ -136,20 +132,6 @@ public class TxCounters {
return acc;
}
- /**
- * Increments lock counter.
- */
- public void incrementLockCounter() {
- lockCntr.incrementAndGet();
- }
-
- /**
- * @return Current value of lock counter.
- */
- public int lockCounter() {
- return lockCntr.get();
- }
-
/**
* @param cacheId Cache id.
* @param partId Partition id.
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index f3cce1df51e..dfad02f4824 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -907,8 +907,6 @@ org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactional
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter$7
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter$8
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter$9
-org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxAbstractEnlistFuture$1
-org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxAbstractEnlistFuture$2
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture$1
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse
@@ -924,17 +922,11 @@ org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFutu
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture$4
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse
-org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistRequest
-org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistResponse
-org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryFirstEnlistRequest
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException
-org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue
org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture$1
org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture$1
org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException
-org.apache.ignite.internal.processors.cache.distributed.dht.NearTxQueryEnlistResultHandler
-org.apache.ignite.internal.processors.cache.distributed.dht.NearTxResultHandler
org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture$1
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest
@@ -1082,10 +1074,6 @@ org.apache.ignite.internal.processors.cache.distributed.near.GridNearPessimistic
org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest
org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache
-org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxAbstractEnlistFuture$1
-org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistFuture$1
-org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest
-org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture$1
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture$2
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture$3
@@ -1127,12 +1115,6 @@ org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal$Fin
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareFutureAdapter$1
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse
-org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistFuture$1
-org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistRequest
-org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistResponse
-org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistFuture$1
-org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistRequest
-org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistResponse
org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest
org.apache.ignite.internal.processors.cache.distributed.near.consistency.IgniteAtomicConsistencyViolationException
org.apache.ignite.internal.processors.cache.distributed.near.consistency.IgniteConsistencyViolationException
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/AbstractPartitionPruningBaseTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/AbstractPartitionPruningBaseTest.java
index bb28e6de127..ad313600a93 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/AbstractPartitionPruningBaseTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/AbstractPartitionPruningBaseTest.java
@@ -45,7 +45,6 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteInClosure;
@@ -531,19 +530,6 @@ public abstract class AbstractPartitionPruningBaseTest extends GridCommonAbstrac
int[] parts = req.queryPartitions();
- if (!F.isEmpty(parts)) {
- for (int part : parts)
- INTERCEPTED_PARTS.add(part);
- }
- }
- else if (msg0.message() instanceof GridNearTxQueryEnlistRequest) {
- INTERCEPTED_NODES.add(node);
- INTERCEPTED_REQS.incrementAndGet();
-
- GridNearTxQueryEnlistRequest req = (GridNearTxQueryEnlistRequest)msg0.message();
-
- int[] parts = req.partitions();
-
if (!F.isEmpty(parts)) {
for (int part : parts)
INTERCEPTED_PARTS.add(part);