You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2023/12/07 12:12:15 UTC
(ignite-3) branch main updated: IGNITE-20771 Implement tx coordinator liveness check (#2923)
This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e7010e4cb7 IGNITE-20771 Implement tx coordinator liveness check (#2923)
e7010e4cb7 is described below
commit e7010e4cb749aad69399744a703f3529de23e232
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Thu Dec 7 15:12:09 2023 +0300
IGNITE-20771 Implement tx coordinator liveness check (#2923)
---
.../runner/app/ItIgniteNodeRestartTest.java | 5 +
.../internal/table/ItTransactionConflictTest.java | 44 +++++-
.../org/apache/ignite/internal/app/IgniteImpl.java | 4 +
.../exec/rel/TableScanNodeExecutionTest.java | 9 ++
modules/table/build.gradle | 2 +
.../ItAbstractInternalTableScanTest.java | 10 +-
.../ItInternalTableReadOnlyOperationsTest.java | 10 +-
.../ItTxDistributedCleanupRecoveryTest.java | 1 +
.../distributed/ItTxDistributedTestSingleNode.java | 5 +
...xDistributedTestSingleNodeNoCleanupMessage.java | 7 +
.../ignite/distributed/ItTxStateLocalMapTest.java | 5 +
.../rebalance/ItRebalanceDistributedTest.java | 8 +-
.../ignite/internal/table/ItColocationTest.java | 9 ++
.../internal/table/InteropOperationsTest.java | 72 +++++-----
.../internal/table/TableKvOperationsTestBase.java | 10 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 7 +
.../table/impl/DummyInternalTableImpl.java | 25 ++--
modules/transactions/build.gradle | 4 +
.../org/apache/ignite/internal/tx/TxManager.java | 2 +-
.../org/apache/ignite/internal/tx/TxState.java | 6 +-
.../org/apache/ignite/internal/tx/TxStateMeta.java | 47 ++++++-
.../ignite/internal/tx/TxStateMetaAbandoned.java | 89 ++++++++++++
.../ignite/internal/tx/TxStateMetaFinishing.java | 26 ++++
.../TransactionConfigurationModule.java | 41 ++++++
.../TransactionConfigurationSchema.java | 37 +++++
.../ignite/internal/tx/impl/OrphanDetector.java | 155 +++++++++++++++++----
.../ignite/internal/tx/impl/TxManagerImpl.java | 32 +++--
.../tx/impl/VolatileTxStateMetaStorage.java | 137 ++++++++++++++++++
.../apache/ignite/internal/tx/TxManagerTest.java | 9 +-
.../org/apache/ignite/internal/tx/TxStateTest.java | 6 +-
30 files changed, 717 insertions(+), 107 deletions(-)
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 11a81d45c8..cc8fd11bd7 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -126,6 +126,7 @@ import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -182,6 +183,9 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
@InjectConfiguration
private static MetaStorageConfiguration metaStorageConfiguration;
+ @InjectConfiguration
+ private static TransactionConfiguration txConfiguration;
+
/**
* Start some of Ignite components that are able to serve as Ignite node for test purposes.
*
@@ -333,6 +337,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
);
var txManager = new TxManagerImpl(
+ txConfiguration,
clusterSvc,
replicaService,
lockManager,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionConflictTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionConflictTest.java
index 7f5450b656..3af353e480 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionConflictTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionConflictTest.java
@@ -25,15 +25,19 @@ import static org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
+import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.table.RecordView;
@@ -64,6 +68,17 @@ public class ItTransactionConflictTest extends ClusterPerTestIntegrationTest {
});
}
+ @Override
+ protected void customizeInitParameters(InitParametersBuilder builder) {
+ super.customizeInitParameters(builder);
+
+ builder.clusterConfiguration("{\n"
+ + " \"transaction\": {\n"
+ + " \"abandonedCheckTs\": 600000\n"
+ + " }\n"
+ + "}\n");
+ }
+
@Test
public void test() throws Exception {
TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
@@ -91,39 +106,56 @@ public class ItTransactionConflictTest extends ClusterPerTestIntegrationTest {
log.info("Transaction coordinator is chosen [node={}].", txCrdNode.name());
+ Transaction oldRwTx = node(0).transactions().begin();
+
UUID orphanTxId = startTransactionAndStopNode(txCrdNode);
CompletableFuture<UUID> recoveryTxMsgCaptureFut = new CompletableFuture<>();
+ AtomicInteger msgCount = new AtomicInteger();
commitPartNode.dropMessages((nodeName, msg) -> {
if (msg instanceof TxRecoveryMessage) {
var recoveryTxMsg = (TxRecoveryMessage) msg;
recoveryTxMsgCaptureFut.complete(recoveryTxMsg.txId());
+
+ msgCount.incrementAndGet();
}
return false;
});
- runConflictingTransaction(node(0));
+ runConflictingTransaction(node(0), oldRwTx);
+ runConflictingTransaction(node(0), node(0).transactions().begin());
assertThat(recoveryTxMsgCaptureFut, willCompleteSuccessfully());
assertEquals(orphanTxId, recoveryTxMsgCaptureFut.join());
+ assertEquals(1, msgCount.get());
+
+ node(0).clusterConfiguration().getConfiguration(TransactionConfiguration.KEY).change(transactionChange ->
+ transactionChange.changeAbandonedCheckTs(1));
+
+ assertTrue(waitForCondition(() -> {
+ runConflictingTransaction(node(0), node(0).transactions().begin());
+
+ return msgCount.get() > 0;
+ }, 10_000));
}
/**
* Runs a transaction that was expectedly finished with the lock conflict exception.
*
* @param node Transaction coordinator node.
+ * @param rwTx A transaction to create a lock conflict with an abandoned one.
*/
- private void runConflictingTransaction(IgniteImpl node) {
+ private void runConflictingTransaction(IgniteImpl node, Transaction rwTx) {
RecordView view = node.tables().table(TABLE_NAME).recordView();
try {
- Transaction rwTx2 = node.transactions().begin();
+ view.upsert(rwTx, Tuple.create().set("key", 42).set("val", "val2"));
- view.upsert(rwTx2, Tuple.create().set("key", 42).set("val", "val2"));
+ fail("Lock conflict have to be detected.");
} catch (Exception e) {
assertEquals(Transactions.ACQUIRE_LOCK_ERR, extractCodeFrom(e));
@@ -132,8 +164,8 @@ public class ItTransactionConflictTest extends ClusterPerTestIntegrationTest {
}
/**
- * Starts the transaction, takes a lock, and stops the transaction coordinator.
- * The stopped node leaves the transaction in the pending state.
+ * Starts the transaction, takes a lock, and stops the transaction coordinator. The stopped node leaves the transaction in the pending
+ * state.
*
* @param node Transaction coordinator node.
* @return Transaction id.
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f92cf11b4a..5e51cf94ec 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -152,6 +152,7 @@ import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
@@ -564,8 +565,11 @@ public class IgniteImpl implements Ignite {
catalogManager
);
+ TransactionConfiguration txConfig = clusterConfigRegistry.getConfiguration(TransactionConfiguration.KEY);
+
// TODO: IGNITE-19344 - use nodeId that is validated on join (and probably generated differently).
txManager = new TxManagerImpl(
+ txConfig,
clusterSvc,
replicaSvc,
lockMgr,
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 699eb60281..29497d3a3a 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -37,6 +37,8 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -61,6 +63,7 @@ import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -75,13 +78,18 @@ import org.apache.ignite.network.TopologyService;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
/**
* Tests execution flow of TableScanNode.
*/
+@ExtendWith(ConfigurationExtension.class)
public class TableScanNodeExecutionTest extends AbstractExecutionTest<Object[]> {
private final LinkedList<AutoCloseable> closeables = new LinkedList<>();
+ @InjectConfiguration
+ private TransactionConfiguration txConfiguration;
+
// Ensures that all data from TableScanNode is being propagated correctly.
@Test
public void testScanNodeDataPropagation() throws InterruptedException {
@@ -130,6 +138,7 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest<Object[]>
ReplicaService replicaSvc = mock(ReplicaService.class, RETURNS_DEEP_STUBS);
TxManagerImpl txManager = new TxManagerImpl(
+ txConfiguration,
clusterService,
replicaSvc,
new HeapLockManager(),
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 189e0952ec..891d7e396e 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -92,6 +92,8 @@ dependencies {
testFixturesImplementation project(':ignite-catalog')
testFixturesImplementation project(':ignite-raft')
testFixturesImplementation project(':ignite-affinity')
+ testFixturesImplementation project(':ignite-configuration-api')
+ testFixturesImplementation(testFixtures(project(':ignite-configuration')))
testFixturesImplementation(testFixtures(project(':ignite-core')))
testFixturesImplementation(testFixtures(project(':ignite-storage-api')))
testFixturesImplementation(testFixtures(project(':ignite-transactions')))
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 467f8cf76d..0be3ee9e91 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -42,6 +42,8 @@ import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -58,6 +60,7 @@ import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.type.NativeTypes;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -69,7 +72,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
/**
* Tests for {@link InternalTable#scan(int, org.apache.ignite.internal.tx.InternalTransaction)}.
*/
-@ExtendWith(MockitoExtension.class)
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest {
private static final SchemaDescriptor ROW_SCHEMA = new SchemaDescriptor(
1,
@@ -77,6 +80,9 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest
new Column[]{new Column("val", NativeTypes.stringOf(100), false)}
);
+ @InjectConfiguration
+ private TransactionConfiguration txConfiguration;
+
/** Mock partition storage. */
@Mock
private MvPartitionStorage mockStorage;
@@ -91,7 +97,7 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest
public void setUp(TestInfo testInfo) {
when(mockStorage.scan(any(HybridTimestamp.class))).thenReturn(mock(PartitionTimestampCursor.class));
- internalTbl = new DummyInternalTableImpl(mock(ReplicaService.class), mockStorage, ROW_SCHEMA);
+ internalTbl = new DummyInternalTableImpl(mock(ReplicaService.class), mockStorage, ROW_SCHEMA, txConfiguration);
}
/**
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
index bded985578..4373873f25 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
@@ -40,6 +40,8 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -60,6 +62,7 @@ import org.apache.ignite.internal.table.distributed.replication.request.ReadOnly
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.tx.TransactionException;
@@ -74,7 +77,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
/**
* Tests for {@link InternalTable} read-only operations.
*/
-@ExtendWith(MockitoExtension.class)
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
1,
@@ -82,6 +85,9 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
new Column[]{new Column("value", NativeTypes.INT64, false)}
);
+ @InjectConfiguration
+ private TransactionConfiguration txConfiguration;
+
private static final HybridClock CLOCK = new HybridClockImpl();
private static final Row ROW_1 = createKeyValueRow(1, 1001);
@@ -114,7 +120,7 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
public void setUp(TestInfo testInfo) {
when(mockStorage.scan(any(HybridTimestamp.class))).thenReturn(mock(PartitionTimestampCursor.class));
- internalTbl = new DummyInternalTableImpl(replicaService, mockStorage, SCHEMA);
+ internalTbl = new DummyInternalTableImpl(replicaService, mockStorage, SCHEMA, txConfiguration);
lenient().when(readOnlyTx.isReadOnly()).thenReturn(true);
lenient().when(readOnlyTx.readTimestamp()).thenReturn(CLOCK.now());
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
index 1505b62415..4f102c2eca 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
@@ -82,6 +82,7 @@ public class ItTxDistributedCleanupRecoveryTest extends ItTxDistributedTestSingl
txTestCluster = new ItTxTestCluster(
testInfo,
raftConfiguration,
+ txConfiguration,
workDir,
nodes(),
replicas(),
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 10805afb90..28bb29fdba 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NodeFinder;
@@ -76,6 +77,9 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
@InjectConfiguration("mock: { fsync: false }")
protected static RaftConfiguration raftConfiguration;
+ @InjectConfiguration
+ protected static TransactionConfiguration txConfiguration;
+
/**
* Returns a count of nodes.
*
@@ -124,6 +128,7 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
txTestCluster = new ItTxTestCluster(
testInfo,
raftConfiguration,
+ txConfiguration,
workDir,
nodes(),
replicas(),
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index 813cef5c9d..e0fcc50880 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
@@ -51,6 +52,7 @@ import org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSour
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -73,6 +75,9 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut
/** A list of background cleanup futures. */
private final List<CompletableFuture<?>> cleanupFutures = new CopyOnWriteArrayList<>();
+ @InjectConfiguration
+ private TransactionConfiguration txConfiguration;
+
/**
* The constructor.
*
@@ -88,6 +93,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut
txTestCluster = new ItTxTestCluster(
testInfo,
raftConfiguration,
+ txConfiguration,
workDir,
nodes(),
replicas(),
@@ -104,6 +110,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut
PlacementDriver placementDriver
) {
return new TxManagerImpl(
+ txConfiguration,
clusterService,
replicaSvc,
new HeapLockManager(),
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
index dbb0ca7c23..b525669d8d 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.network.ClusterNode;
@@ -62,6 +63,9 @@ public class ItTxStateLocalMapTest extends IgniteAbstractTest {
@InjectConfiguration("mock: { fsync: false }")
private static RaftConfiguration raftConfig;
+ @InjectConfiguration
+ private static TransactionConfiguration txConfiguration;
+
private final TestInfo testInfo;
private ItTxTestCluster testCluster;
@@ -88,6 +92,7 @@ public class ItTxStateLocalMapTest extends IgniteAbstractTest {
testCluster = new ItTxTestCluster(
testInfo,
raftConfig,
+ txConfiguration,
workDir,
NODES,
NODES,
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 4b631aae3e..f78aef081f 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -167,6 +167,7 @@ import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -198,8 +199,7 @@ import org.mockito.Mockito;
/**
* Test suite for rebalance process, when replicas' number changed.
*/
-@ExtendWith(WorkDirectoryExtension.class)
-@ExtendWith(ConfigurationExtension.class)
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
@Timeout(120)
public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
private static final IgniteLogger LOG = Loggers.forClass(ItRebalanceDistributedTest.class);
@@ -220,6 +220,9 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
private static final int NODE_COUNT = 3;
+ @InjectConfiguration
+ private static TransactionConfiguration txConfiguration;
+
@InjectConfiguration
private static RaftConfiguration raftConfiguration;
@@ -942,6 +945,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
);
txManager = new TxManagerImpl(
+ txConfiguration,
clusterService,
replicaSvc,
lockManager,
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 315ebdc105..90b1839ea1 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -57,6 +57,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Command;
@@ -89,6 +91,7 @@ import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -104,6 +107,7 @@ import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@@ -111,6 +115,7 @@ import org.junit.jupiter.params.provider.MethodSource;
/**
* Tests for data colocation.
*/
+@ExtendWith(ConfigurationExtension.class)
public class ItColocationTest extends BaseIgniteAbstractTest {
/** Partitions count. */
private static final int PARTS = 32;
@@ -131,6 +136,9 @@ public class ItColocationTest extends BaseIgniteAbstractTest {
/** Message factory to create messages - RAFT commands. */
private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory();
+ @InjectConfiguration
+ private static TransactionConfiguration txConfiguration;
+
private SchemaDescriptor schema;
private SchemaRegistry schemaRegistry;
@@ -150,6 +158,7 @@ public class ItColocationTest extends BaseIgniteAbstractTest {
ReplicaService replicaService = mock(ReplicaService.class, RETURNS_DEEP_STUBS);
txManager = new TxManagerImpl(
+ txConfiguration,
clusterService,
replicaService,
new HeapLockManager(),
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java
index f354fb0239..7a67343096 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java
@@ -40,6 +40,8 @@ import java.util.BitSet;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -50,6 +52,7 @@ import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.type.NativeType;
import org.apache.ignite.internal.type.NativeTypes;
@@ -60,7 +63,9 @@ import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
/**
* Tests for different access methods:
@@ -68,29 +73,34 @@ import org.junit.jupiter.api.Test;
* 2) Write using different API's into it (row 1 - with all values, row 2 - with nulls).
* 3) Read data back through all possible APIs and validate it.
*/
+@ExtendWith(ConfigurationExtension.class)
public class InteropOperationsTest extends BaseIgniteAbstractTest {
/** Test schema. */
- private static final SchemaDescriptor SCHEMA;
+ private static SchemaDescriptor schema;
/** Table for tests. */
- private static final TableViewInternal TABLE;
+ private static TableViewInternal table;
/** Dummy internal table for tests. */
- private static final DummyInternalTableImpl INT_TABLE;
+ private static DummyInternalTableImpl intTable;
/** Key value binary view for test. */
- private static final KeyValueView<Tuple, Tuple> KV_BIN_VIEW;
+ private static KeyValueView<Tuple, Tuple> kvBinView;
/** Key value view for test. */
- private static final KeyValueView<Long, Value> KV_VIEW;
+ private static KeyValueView<Long, Value> kvView;
/** Record view for test. */
- private static final RecordView<Row> R_VIEW;
+ private static RecordView<Row> rView;
/** Record binary view for test. */
- private static final RecordView<Tuple> R_BIN_VIEW;
+ private static RecordView<Tuple> rBinView;
- static {
+ @InjectConfiguration
+ private static TransactionConfiguration txConfiguration;
+
+ @BeforeAll
+ static void beforeAll() {
NativeType[] types = {
NativeTypes.BOOLEAN,
NativeTypes.INT8, NativeTypes.INT16, NativeTypes.INT32, NativeTypes.INT64,
@@ -110,7 +120,7 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest {
int schemaVersion = 1;
- SCHEMA = new SchemaDescriptor(schemaVersion,
+ schema = new SchemaDescriptor(schemaVersion,
new Column[]{new Column("ID", NativeTypes.INT64, false)},
valueCols.toArray(Column[]::new)
);
@@ -118,27 +128,27 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest {
ClusterService clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
when(clusterService.topologyService().localMember().address()).thenReturn(DummyInternalTableImpl.ADDR);
- INT_TABLE = new DummyInternalTableImpl(mock(ReplicaService.class, RETURNS_DEEP_STUBS), SCHEMA);
+ intTable = new DummyInternalTableImpl(mock(ReplicaService.class, RETURNS_DEEP_STUBS), schema, txConfiguration);
- SchemaRegistry schemaRegistry = new DummySchemaManagerImpl(SCHEMA);
+ SchemaRegistry schemaRegistry = new DummySchemaManagerImpl(schema);
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class, RETURNS_DEEP_STUBS));
SchemaVersions schemaVersions = new ConstantSchemaVersions(schemaVersion);
- TABLE = new TableImpl(INT_TABLE, schemaRegistry, new HeapLockManager(), schemaVersions);
- KV_BIN_VIEW = new KeyValueBinaryViewImpl(INT_TABLE, schemaRegistry, schemaVersions);
+ table = new TableImpl(intTable, schemaRegistry, new HeapLockManager(), schemaVersions);
+ kvBinView = new KeyValueBinaryViewImpl(intTable, schemaRegistry, schemaVersions);
- KV_VIEW = new KeyValueViewImpl<>(
- INT_TABLE,
+ kvView = new KeyValueViewImpl<>(
+ intTable,
schemaRegistry,
schemaVersions,
Mapper.of(Long.class, "id"),
Mapper.of(Value.class)
);
- R_BIN_VIEW = TABLE.recordView();
- R_VIEW = TABLE.recordView(Mapper.of(Row.class));
+ rBinView = table.recordView();
+ rView = table.recordView(Mapper.of(Row.class));
}
/**
@@ -146,13 +156,13 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest {
*/
@Test
public void ensureAllTypesTested() {
- SchemaTestUtils.ensureAllTypesChecked(Arrays.stream(SCHEMA.valueColumns().columns()));
+ SchemaTestUtils.ensureAllTypesChecked(Arrays.stream(schema.valueColumns().columns()));
}
@AfterEach
public void clearTable() {
- TABLE.recordView().delete(null, Tuple.create().set("id", 1L));
- TABLE.recordView().delete(null, Tuple.create().set("id", 2L));
+ table.recordView().delete(null, Tuple.create().set("id", 1L));
+ table.recordView().delete(null, Tuple.create().set("id", 2L));
}
/**
@@ -230,7 +240,7 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest {
Tuple k = Tuple.create().set("id", (long) id);
Tuple v = createTuple(id, nulls);
- KV_BIN_VIEW.put(null, k, v);
+ kvBinView.put(null, k, v);
}
/**
@@ -242,8 +252,8 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest {
private boolean readKeyValueBinary(int id, boolean nulls) {
Tuple k = Tuple.create().set("id", (long) id);
- Tuple v = KV_BIN_VIEW.get(null, k);
- boolean contains = KV_BIN_VIEW.contains(null, k);
+ Tuple v = kvBinView.get(null, k);
+ boolean contains = kvBinView.contains(null, k);
assertEquals((v != null), contains);
@@ -267,7 +277,7 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest {
private void writeRecord(int id, boolean nulls) {
Row r1 = new Row(id, nulls);
- assertTrue(R_VIEW.insert(null, r1));
+ assertTrue(rView.insert(null, r1));
}
/**
@@ -280,7 +290,7 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest {
private boolean readRecord(int id, boolean nulls) {
Row expected = new Row(id, nulls);
- Row actual = R_VIEW.get(null, expected);
+ Row actual = rView.get(null, expected);
if (actual == null) {
return false;
@@ -301,7 +311,7 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest {
Tuple t1 = createTuple(id, nulls);
t1.set("id", (long) id);
- assertTrue(R_BIN_VIEW.insert(null, t1));
+ assertTrue(rBinView.insert(null, t1));
}
/**
@@ -314,7 +324,7 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest {
private boolean readRecordBinary(int id, boolean nulls) {
Tuple k = Tuple.create().set("id", (long) id);
- Tuple res = R_BIN_VIEW.get(null, k);
+ Tuple res = rBinView.get(null, k);
if (res == null) {
return false;
@@ -332,7 +342,7 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest {
* @param nulls If {@code true} - nullable fields will be filled, if {@code false} - otherwise.
*/
private void writeKewVal(int id, boolean nulls) {
- KV_VIEW.put(null, (long) id, new Value(id, nulls));
+ kvView.put(null, (long) id, new Value(id, nulls));
}
/**
@@ -343,7 +353,7 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest {
* @return {@code true} if read successfully, {@code false} - otherwise.
*/
private boolean readKeyValue(int id, boolean nulls) {
- Value res = KV_VIEW.get(null, Long.valueOf(id));
+ Value res = kvView.get(null, Long.valueOf(id));
if (res == null) {
return false;
@@ -366,7 +376,7 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest {
private Tuple createTuple(int id, boolean nulls) {
Tuple res = Tuple.create();
- for (Column col : SCHEMA.valueColumns().columns()) {
+ for (Column col : schema.valueColumns().columns()) {
if (!nulls && col.nullable()) {
continue;
}
@@ -433,7 +443,7 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest {
Tuple expected = createTuple(id, nulls);
expected.set("id", (long) id);
- for (Column col : SCHEMA.valueColumns().columns()) {
+ for (Column col : schema.valueColumns().columns()) {
if (!nulls && col.nullable()) {
continue;
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableKvOperationsTestBase.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableKvOperationsTestBase.java
index c9a85a5e11..1a343dbd66 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableKvOperationsTestBase.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableKvOperationsTestBase.java
@@ -21,6 +21,8 @@ import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
@@ -28,6 +30,7 @@ import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
@@ -38,11 +41,14 @@ import org.mockito.junit.jupiter.MockitoExtension;
/**
* Basic table operations test.
*/
-@ExtendWith(MockitoExtension.class)
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
abstract class TableKvOperationsTestBase extends BaseIgniteAbstractTest {
@Mock(answer = RETURNS_DEEP_STUBS)
protected ReplicaService replicaService;
+ @InjectConfiguration
+ private TransactionConfiguration txConfiguration;
+
protected static final int SCHEMA_VERSION = 1;
protected final SchemaVersions schemaVersions = new ConstantSchemaVersions(SCHEMA_VERSION);
@@ -65,6 +71,6 @@ abstract class TableKvOperationsTestBase extends BaseIgniteAbstractTest {
}
protected final DummyInternalTableImpl createInternalTable(SchemaDescriptor schema) {
- return new DummyInternalTableImpl(replicaService, schema);
+ return new DummyInternalTableImpl(replicaService, schema, txConfiguration);
}
}
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 02b0b9594f..e54eedd99e 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -119,6 +119,7 @@ import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
@@ -153,6 +154,8 @@ public class ItTxTestCluster {
private final RaftConfiguration raftConfig;
+ private final TransactionConfiguration txConfiguration;
+
private final Path workDir;
private final int nodes;
@@ -242,6 +245,7 @@ public class ItTxTestCluster {
public ItTxTestCluster(
TestInfo testInfo,
RaftConfiguration raftConfig,
+ TransactionConfiguration txConfiguration,
Path workDir,
int nodes,
int replicas,
@@ -249,6 +253,7 @@ public class ItTxTestCluster {
HybridTimestampTracker timestampTracker
) {
this.raftConfig = raftConfig;
+ this.txConfiguration = txConfiguration;
this.workDir = workDir;
this.nodes = nodes;
this.replicas = replicas;
@@ -389,6 +394,7 @@ public class ItTxTestCluster {
PlacementDriver placementDriver
) {
return new TxManagerImpl(
+ txConfiguration,
clusterService,
replicaSvc,
new HeapLockManager(),
@@ -819,6 +825,7 @@ public class ItTxTestCluster {
private void initializeClientTxComponents() {
clientTxManager = new TxManagerImpl(
+ txConfiguration,
client,
clientReplicaSvc,
new HeapLockManager(),
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 6b7b348cd8..ec5112cd8f 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -85,6 +85,7 @@ import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -144,23 +145,15 @@ public class DummyInternalTableImpl extends InternalTableImpl {
private static final AtomicInteger nextTableId = new AtomicInteger(10_001);
- /**
- * Creates a new local table.
- *
- * @param replicaSvc Replica service.
- */
- public DummyInternalTableImpl(ReplicaService replicaSvc) {
- this(replicaSvc, SCHEMA);
- }
-
/**
* Creates a new local table.
*
* @param replicaSvc Replica service.
* @param schema Schema.
+ * @param txConfiguration Transaction configuration.
*/
- public DummyInternalTableImpl(ReplicaService replicaSvc, SchemaDescriptor schema) {
- this(replicaSvc, new TestMvPartitionStorage(0), schema);
+ public DummyInternalTableImpl(ReplicaService replicaSvc, SchemaDescriptor schema, TransactionConfiguration txConfiguration) {
+ this(replicaSvc, new TestMvPartitionStorage(0), schema, txConfiguration);
}
/**
@@ -191,16 +184,18 @@ public class DummyInternalTableImpl extends InternalTableImpl {
* @param replicaSvc Replica service.
* @param mvPartStorage Multi version partition storage.
* @param schema Schema descriptor.
+ * @param txConfiguration Transaction configuration.
*/
public DummyInternalTableImpl(
ReplicaService replicaSvc,
MvPartitionStorage mvPartStorage,
- SchemaDescriptor schema
+ SchemaDescriptor schema,
+ TransactionConfiguration txConfiguration
) {
this(
replicaSvc,
mvPartStorage,
- txManager(replicaSvc),
+ txManager(replicaSvc, txConfiguration),
false,
null,
schema,
@@ -423,8 +418,9 @@ public class DummyInternalTableImpl extends InternalTableImpl {
* Creates a {@link TxManager}.
*
* @param replicaSvc Replica service to use.
+ * @param txConfiguration Transaction configuration.
*/
- public static TxManagerImpl txManager(ReplicaService replicaSvc) {
+ public static TxManagerImpl txManager(ReplicaService replicaSvc, TransactionConfiguration txConfiguration) {
TopologyService topologyService = mock(TopologyService.class);
when(topologyService.localMember()).thenReturn(LOCAL_NODE);
@@ -433,6 +429,7 @@ public class DummyInternalTableImpl extends InternalTableImpl {
when(clusterService.topologyService()).thenReturn(topologyService);
var txManager = new TxManagerImpl(
+ txConfiguration,
clusterService,
replicaSvc,
new HeapLockManager(),
diff --git a/modules/transactions/build.gradle b/modules/transactions/build.gradle
index 05cc72ecc8..ef0f34056c 100644
--- a/modules/transactions/build.gradle
+++ b/modules/transactions/build.gradle
@@ -23,6 +23,9 @@ apply from: "$rootDir/buildscripts/java-integration-test.gradle"
dependencies {
annotationProcessor project(":ignite-network-annotation-processor")
+ annotationProcessor project(":ignite-configuration-annotation-processor")
+ annotationProcessor libs.auto.service
+
implementation project(':ignite-api')
implementation project(':ignite-core')
implementation project(':ignite-network-api')
@@ -34,6 +37,7 @@ dependencies {
implementation project(':ignite-distribution-zones')
implementation project(':ignite-configuration-api')
implementation project(':ignite-placement-driver-api')
+ implementation libs.auto.service.annotations
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 1b0faeb812..fc6a98888f 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -81,7 +81,7 @@ public interface TxManager extends IgniteComponent {
* @deprecated Use lockManager directly.
*/
@Deprecated
- public LockManager lockManager();
+ LockManager lockManager();
/**
* Execute transaction cleanup asynchronously.
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
index 8a18e61639..ab141c2a09 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
@@ -35,9 +35,9 @@ public enum TxState {
{ false, true, false, true, true, true },
{ false, true, true, true, true, true },
{ false, false, false, true, true, true },
- { false, false, false, true, false, true },
- { false, false, false, false, true, true },
- { true, true, true, true, true, true }
+ { false, false, false, true, false, false },
+ { false, false, false, false, true, false },
+ { false, false, true, true, true, true }
};
/**
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index b0241b32fd..0990bbfb59 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -17,6 +17,10 @@
package org.apache.ignite.internal.tx;
+import static org.apache.ignite.internal.tx.TxState.ABANDONED;
+import static org.apache.ignite.internal.tx.TxState.FINISHING;
+import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
+
import java.util.Objects;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -51,6 +55,25 @@ public class TxStateMeta implements TransactionMeta {
String txCoordinatorId,
@Nullable TablePartitionId commitPartitionId,
@Nullable HybridTimestamp commitTimestamp
+ ) {
+ this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, 0);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param txState Transaction state.
+ * @param txCoordinatorId Transaction coordinator id.
+ * @param commitPartitionId Commit partition replication group id.
+ * @param commitTimestamp Commit timestamp.
+ * @param lastAbandonedMarkerTs Timestamp indicates when the transaction is marked as abandoned.
+ */
+ private TxStateMeta(
+ TxState txState,
+ String txCoordinatorId,
+ TablePartitionId commitPartitionId,
+ HybridTimestamp commitTimestamp,
+ long lastAbandonedMarkerTs
) {
this.txState = txState;
this.txCoordinatorId = txCoordinatorId;
@@ -58,6 +81,28 @@ public class TxStateMeta implements TransactionMeta {
this.commitTimestamp = commitTimestamp;
}
+ /**
+ * Creates a transaction state for the same transaction, but this one is marked abandoned.
+ *
+ * @return Transaction state meta.
+ */
+ public TxStateMetaAbandoned abandoned() {
+ assert checkTransitionCorrectness(txState, ABANDONED) : "Transaction state is incorrect [txState=" + txState + "].";
+
+ return new TxStateMetaAbandoned(txCoordinatorId, commitPartitionId);
+ }
+
+ /**
+ * Creates a transaction state for the same transaction, but this one is marked finishing.
+ *
+ * @return Transaction state meta.
+ */
+ public TxStateMetaFinishing finishing() {
+ assert checkTransitionCorrectness(txState, FINISHING) : "Transaction state is incorrect [txState=" + txState + "].";
+
+ return new TxStateMetaFinishing(txCoordinatorId, commitPartitionId);
+ }
+
@Override
public TxState txState() {
return txState;
@@ -84,12 +129,12 @@ public class TxStateMeta implements TransactionMeta {
if (o == null || getClass() != o.getClass()) {
return false;
}
-
TxStateMeta that = (TxStateMeta) o;
if (txState != that.txState) {
return false;
}
+
if (txCoordinatorId != null ? !txCoordinatorId.equals(that.txCoordinatorId) : that.txCoordinatorId != null) {
return false;
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java
new file mode 100644
index 0000000000..674283240b
--- /dev/null
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static org.apache.ignite.internal.tx.TxState.ABANDONED;
+
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.FastTimestamps;
+
+/**
+ * Abandoned transaction state meta.
+ */
+public class TxStateMetaAbandoned extends TxStateMeta {
+ private static final long serialVersionUID = 8521181896862227127L;
+
+ /** Timestamp when the latest {@code ABANDONED} state set. */
+ private final long lastAbandonedMarkerTs;
+
+ /**
+ * Constructor.
+ *
+ * @param txCoordinatorId Transaction coordinator id.
+ * @param commitPartitionId Commit partition replication group id.
+ */
+ TxStateMetaAbandoned(
+ String txCoordinatorId,
+ TablePartitionId commitPartitionId
+ ) {
+ super(ABANDONED, txCoordinatorId, commitPartitionId, null);
+
+ this.lastAbandonedMarkerTs = FastTimestamps.coarseCurrentTimeMillis();
+ }
+
+ /**
+ * The last timestamp when the transaction was marked as abandoned.
+ *
+ * @return Timestamp or {@code 0} if the transaction is in not abandoned.
+ */
+ public long lastAbandonedMarkerTs() {
+ return lastAbandonedMarkerTs;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+
+ TxStateMetaAbandoned that = (TxStateMetaAbandoned) o;
+
+ return lastAbandonedMarkerTs == that.lastAbandonedMarkerTs;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+
+ result = 31 * result + (int) (lastAbandonedMarkerTs ^ (lastAbandonedMarkerTs >>> 32));
+
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(TxStateMetaAbandoned.class, this);
+ }
+}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
index bc53742775..b08b2f57e0 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
@@ -55,4 +55,30 @@ public class TxStateMetaFinishing extends TxStateMeta {
public @Nullable HybridTimestamp commitTimestamp() {
throw new UnsupportedOperationException("Can't get commit timestamp from FINISHING transaction state meta.");
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+
+ TxStateMetaFinishing that = (TxStateMetaFinishing) o;
+
+ return txFinishFuture.equals(that.txFinishFuture);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+
+ result = 31 * result + txFinishFuture.hashCode();
+
+ return result;
+ }
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationModule.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationModule.java
new file mode 100644
index 0000000000..c6928ccabf
--- /dev/null
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationModule.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.configuration;
+
+import com.google.auto.service.AutoService;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.ignite.configuration.ConfigurationModule;
+import org.apache.ignite.configuration.RootKey;
+import org.apache.ignite.configuration.annotation.ConfigurationType;
+
+/**
+ * {@link ConfigurationModule} for distributed (cluster-wide) configuration of Transactions.
+ */
+@AutoService(ConfigurationModule.class)
+public class TransactionConfigurationModule implements ConfigurationModule {
+ @Override
+ public ConfigurationType type() {
+ return ConfigurationType.DISTRIBUTED;
+ }
+
+ @Override
+ public Collection<RootKey<?, ?>> rootKeys() {
+ return Set.of(TransactionConfiguration.KEY);
+ }
+}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
new file mode 100644
index 0000000000..57ee2c977b
--- /dev/null
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.configuration;
+
+import org.apache.ignite.configuration.annotation.ConfigurationRoot;
+import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Range;
+
+/**
+ * Configuration schema for transactions.
+ */
+@ConfigurationRoot(rootName = "transaction", type = ConfigurationType.DISTRIBUTED)
+public class TransactionConfigurationSchema {
+ /** Default checking transaction interval. */
+ public static final long DEFAULT_ABANDONED_CHECK_TS = 5_000;
+
+ /** Checking transaction interval. */
+ @Range(min = 0)
+ @Value(hasDefault = true)
+ public final long abandonedCheckTs = DEFAULT_ABANDONED_CHECK_TS;
+}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
index 76ddbf6231..ebf740228c 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
@@ -17,14 +17,18 @@
package org.apache.ignite.internal.tx.impl;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.tx.TxState.ABANDONED;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
+import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -32,9 +36,10 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.tx.LockManager;
-import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.TxStateMetaAbandoned;
import org.apache.ignite.internal.tx.event.LockEvent;
import org.apache.ignite.internal.tx.event.LockEventParameters;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
@@ -42,6 +47,7 @@ import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.tx.TransactionException;
/**
* The class detects transactions that are left without a coordinator but still hold locks. For that orphan transaction, the recovery
@@ -77,8 +83,14 @@ public class OrphanDetector {
/** Hybrid clock. */
private final HybridClock clock;
+ /**
+ * The time interval in milliseconds in which the orphan resolution sends the recovery message again, in case the transaction is still
+ * not finalized.
+ */
+ private long checkTxStateInterval;
+
/** Local transaction state storage. */
- private Function<UUID, TxStateMeta> txLocalStateStorage;
+ private VolatileTxStateMetaStorage txLocalStateStorage;
/**
* The constructor.
@@ -94,7 +106,8 @@ public class OrphanDetector {
ReplicaService replicaService,
PlacementDriver placementDriver,
LockManager lockManager,
- HybridClock clock) {
+ HybridClock clock
+ ) {
this.topologyService = topologyService;
this.replicaService = replicaService;
this.placementDriver = placementDriver;
@@ -106,9 +119,17 @@ public class OrphanDetector {
* Starts the detector.
*
* @param txLocalStateStorage Local transaction state storage.
+ * @param checkTxStateIntervalProvider Global provider of configuration check state interval.
*/
- public void start(Function<UUID, TxStateMeta> txLocalStateStorage) {
+ public void start(VolatileTxStateMetaStorage txLocalStateStorage, ConfigurationValue<Long> checkTxStateIntervalProvider) {
this.txLocalStateStorage = txLocalStateStorage;
+ this.checkTxStateInterval = checkTxStateIntervalProvider.value();
+
+ checkTxStateIntervalProvider.listen(ctx -> {
+ this.checkTxStateInterval = ctx.newValue();
+
+ return nullCompletedFuture();
+ });
lockManager.listen(LockEvent.LOCK_CONFLICT, lockConflictListener);
}
@@ -123,26 +144,30 @@ public class OrphanDetector {
}
private CompletableFuture<Boolean> lockConflictListener(LockEventParameters params, Throwable e) {
- return handleLockHolder(params.lockHolderTx())
- .thenApply(v -> false);
+ try {
+ handleLockHolder(params.lockHolderTx());
+ } catch (NodeStoppingException ex) {
+ return failedFuture(ex);
+ }
+
+ return completedFuture(false);
}
/**
* Sends {@link TxRecoveryMessage} if the transaction is orphaned.
*
* @param txId Transaction id that holds a lock.
- * @return Future to complete.
*/
- private CompletableFuture<Void> handleLockHolder(UUID txId) {
+ private Void handleLockHolder(UUID txId) throws NodeStoppingException {
if (busyLock.enterBusy()) {
try {
- return handleLockHolderInternal(txId);
+ handleLockHolderInternal(txId);
} finally {
busyLock.leaveBusy();
}
}
- return failedFuture(new NodeStoppingException());
+ throw new NodeStoppingException();
}
/**
@@ -151,26 +176,37 @@ public class OrphanDetector {
* @param txId Transaction id that holds a lock.
* @return Future to complete.
*/
- private CompletableFuture<Void> handleLockHolderInternal(UUID txId) {
- TxStateMeta txState = txLocalStateStorage.apply(txId);
+ private void handleLockHolderInternal(UUID txId) {
+ TxStateMeta txState = txLocalStateStorage.state(txId);
// Transaction state for full transactions is not stored in the local map, so it can be null.
- if (txState == null
- || txState.txState() == TxState.ABANDONED
- || isFinalState(txState.txState())
- || topologyService.getById(txState.txCoordinatorId()) != null) {
- return nullCompletedFuture();
+ if (txState == null || isFinalState(txState.txState()) || isTxCoordinatorAlive(txState)) {
+ return;
}
- LOG.info(
- "Conflict was found, and the coordinator of the transaction that holds a lock is not available "
- + "[txId={}, txCrd={}].",
- txId,
- txState.txCoordinatorId()
- );
+ if (isRecoveryNeeded(txId, txState)) {
+ LOG.info(
+ "Conflict was found, and the coordinator of the transaction that holds a lock is not available "
+ + "[txId={}, txCrd={}].",
+ txId,
+ txState.txCoordinatorId()
+ );
- return placementDriver.awaitPrimaryReplica(
- txState.commitPartitionId(),
+ sentTxRecoveryMessage(txState.commitPartitionId(), txId);
+ }
+
+ throw new TransactionException(ACQUIRE_LOCK_ERR, "The lock is held by the abandoned transaction [abandonedTxId=" + txId + "].");
+ }
+
+ /**
+ * Sends transaction recovery message to commit partition for particular transaction.
+ *
+ * @param cmpPartGrp Replication group of commit partition.
+ * @param txId Transaction id.
+ */
+ private void sentTxRecoveryMessage(ReplicationGroupId cmpPartGrp, UUID txId) {
+ placementDriver.awaitPrimaryReplica(
+ cmpPartGrp,
clock.now(),
AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC,
SECONDS
@@ -180,7 +216,7 @@ public class OrphanDetector {
if (commitPartPrimaryNode == null) {
LOG.warn(
"The primary replica of the commit partition is not available [commitPartGrp={}, tx={}]",
- txState.commitPartitionId(),
+ cmpPartGrp,
txId
);
@@ -188,10 +224,73 @@ public class OrphanDetector {
}
return replicaService.invoke(commitPartPrimaryNode, FACTORY.txRecoveryMessage()
- .groupId(txState.commitPartitionId())
+ .groupId(cmpPartGrp)
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
.txId(txId)
.build());
+ }).exceptionally(throwable -> {
+ if (throwable != null) {
+ LOG.warn("A recovery message for the transaction was handled with the error [tx={}].", throwable, txId);
+ }
+
+ return null;
+ });
+ }
+
+ /**
+ * Does a life check for the transaction coordinator.
+ *
+ * @param txState Transaction state meta.
+ * @return True when the transaction coordinator is alive, false otherwise.
+ */
+ private boolean isTxCoordinatorAlive(TxStateMeta txState) {
+ return topologyService.getById(txState.txCoordinatorId()) != null;
+ }
+
+ /**
+ * Checks whether the recovery transaction message is required to be sent or not.
+ *
+ * @param txId Transaction id.
+ * @param txState Transaction meta state.
+ * @return True when transaction recovery is needed, false otherwise.
+ */
+ private boolean isRecoveryNeeded(UUID txId, TxStateMeta txState) {
+ if (txState == null
+ || isFinalState(txState.txState())
+ || isTxAbandonedNotLong(txState)) {
+ return false;
+ }
+
+ TxStateMetaAbandoned txAbandonedState = txState.abandoned();
+
+ TxStateMeta updatedTxState = txLocalStateStorage.updateMeta(txId, txStateMeta -> {
+ if (txStateMeta != null
+ && !isFinalState(txStateMeta.txState())
+ && (txStateMeta.txState() != ABANDONED || isTxAbandonedNotLong(txStateMeta))) {
+ return txAbandonedState;
+ }
+
+ return txStateMeta;
});
+
+ return txAbandonedState == updatedTxState;
+ }
+
+ /**
+ * Checks whether the transaction state is recently marked as abandoned or not.
+ *
+ * @param txState Transaction state metadata.
+ * @return True if the state recently updated to {@link org.apache.ignite.internal.tx.TxState#ABANDONED}.
+ */
+ private boolean isTxAbandonedNotLong(TxStateMeta txState) {
+ if (txState.txState() != ABANDONED) {
+ return false;
+ }
+
+ assert txState instanceof TxStateMetaAbandoned : "The transaction state does not match the metadata [mata=" + txState + "].";
+
+ var txStateAbandoned = (TxStateMetaAbandoned) txState;
+
+ return txStateAbandoned.lastAbandonedMarkerTs() + checkTxStateInterval >= coarseCurrentTimeMillis();
}
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index dc03c46e0e..67ad05a466 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -82,6 +82,7 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.TxStateMetaFinishing;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.util.CompletableFutures;
@@ -110,6 +111,9 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
/** Tx messages factory. */
private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
+ /** Transaction configuration. */
+ private final TransactionConfiguration txConfig;
+
private final ReplicaService replicaService;
/** Lock manager. */
@@ -124,8 +128,8 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
/** Generates transaction IDs. */
private final TransactionIdGenerator transactionIdGenerator;
- /** The local map for tx states. */
- private final ConcurrentHashMap<UUID, TxStateMeta> txStateMap = new ConcurrentHashMap<>();
+ /** The local state storage. */
+ private final VolatileTxStateMetaStorage txStateVolatileStorage = new VolatileTxStateMetaStorage();
/** Txn contexts. */
private final ConcurrentHashMap<UUID, TxContext> txCtxMap = new ConcurrentHashMap<>(MAX_CONCURRENT_TXNS);
@@ -163,6 +167,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
/**
* The constructor.
*
+ * @param txConfig Transaction configuration.
* @param clusterService Cluster service.
* @param replicaService Replica service.
* @param lockManager Lock manager.
@@ -172,6 +177,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
* @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe time propagation period in ms.
*/
public TxManagerImpl(
+ TransactionConfiguration txConfig,
ClusterService clusterService,
ReplicaService replicaService,
LockManager lockManager,
@@ -180,6 +186,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
PlacementDriver placementDriver,
LongSupplier idleSafeTimePropagationPeriodMsSupplier
) {
+ this.txConfig = txConfig;
this.replicaService = replicaService;
this.lockManager = lockManager;
this.clock = clock;
@@ -269,12 +276,12 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
@Override
public TxStateMeta stateMeta(UUID txId) {
- return txStateMap.get(txId);
+ return txStateVolatileStorage.state(txId);
}
@Override
public TxStateMeta updateTxMeta(UUID txId, Function<TxStateMeta, TxStateMeta> updater) {
- return txStateMap.compute(txId, (k, oldMeta) -> {
+ return txStateVolatileStorage.updateMeta(txId, oldMeta -> {
TxStateMeta newMeta = updater.apply(oldMeta);
if (newMeta == null) {
@@ -329,8 +336,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
// than all the read timestamps processed before.
// Every concurrent operation will now use a finish future from the finishing state meta and get only final transaction
// state after the transaction is finished.
- TxStateMetaFinishing finishingStateMeta = (TxStateMetaFinishing) updateTxMeta(txId, old ->
- new TxStateMetaFinishing(localNodeId, old == null ? null : old.commitPartitionId()));
+ TxStateMetaFinishing finishingStateMeta = (TxStateMetaFinishing) updateTxMeta(txId, TxStateMeta::finishing);
TxContext tuple = txCtxMap.compute(txId, (uuid, tuple0) -> {
if (tuple0 == null) {
@@ -550,15 +556,15 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
@Override
public int finished() {
- return (int) txStateMap.entrySet().stream()
- .filter(e -> isFinalState(e.getValue().txState()))
+ return (int) txStateVolatileStorage.states().stream()
+ .filter(e -> isFinalState(e.txState()))
.count();
}
@Override
public int pending() {
- return (int) txStateMap.entrySet().stream()
- .filter(e -> e.getValue().txState() == PENDING)
+ return (int) txStateVolatileStorage.states().stream()
+ .filter(e -> e.txState() == PENDING)
.count();
}
@@ -566,12 +572,16 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
public void start() {
localNodeId = clusterService.topologyService().localMember().id();
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class, this);
- orphanDetector.start(txStateMap::get);
+
+ txStateVolatileStorage.start();
+
+ orphanDetector.start(txStateVolatileStorage, txConfig.abandonedCheckTs());
}
@Override
public void beforeNodeStop() {
orphanDetector.stop();
+ txStateVolatileStorage.stop();
}
@Override
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
new file mode 100644
index 0000000000..15e3215ead
--- /dev/null
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/**
+ * The class represents volatile transaction state storage that stores a transaction state meta until the node stops.
+ */
+public class VolatileTxStateMetaStorage {
+ /** Busy lock to stop synchronously. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** The local map for tx states. */
+ private ConcurrentHashMap<UUID, TxStateMeta> txStateMap;
+
+ /**
+ * Starts the storage.
+ */
+ public void start() {
+ txStateMap = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Stops the detector.
+ */
+ public void stop() {
+ busyLock.block();
+
+ // TODO: IGNITE-21024 There is no possibility to clean this map because it can be used after the node has been stopped.
+ // txStateMap.clear();
+ }
+
+ /**
+ * Atomically changes the state meta of a transaction.
+ *
+ * @param txId Transaction id.
+ * @param updater Transaction meta updater.
+ * @return Updated transaction state.
+ */
+ public TxStateMeta updateMeta(UUID txId, Function<TxStateMeta, TxStateMeta> updater) {
+ return updateMetaInternal(txId, updater);
+
+ // TODO: IGNITE-21024 Public methods of transaction manager do not have NodeStoppingException in definition.
+ // if (busyLock.enterBusy()) {
+ // try {
+ // return updateMetaInternal(txId, updater);
+ // } finally {
+ // busyLock.leaveBusy();
+ // }
+ // }
+ // throw new NodeStoppingException();
+ }
+
+ /**
+ * The internal method for atomically changing the state meta of a transaction.
+ *
+ * @param txId Transaction id.
+ * @param updater Transaction meta updater.
+ * @return Updated transaction state.
+ */
+ private TxStateMeta updateMetaInternal(UUID txId, Function<TxStateMeta, TxStateMeta> updater) {
+ return txStateMap.compute(txId, (k, oldMeta) -> {
+ TxStateMeta newMeta = updater.apply(oldMeta);
+
+ if (newMeta == null) {
+ return null;
+ }
+
+ TxState oldState = oldMeta == null ? null : oldMeta.txState();
+
+ return checkTransitionCorrectness(oldState, newMeta.txState()) ? newMeta : oldMeta;
+ });
+ }
+
+ /**
+ * Returns a transaction state meta.
+ *
+ * @param txId Transaction id.
+ * @return The state meta or null if the state is unknown.
+ */
+ public TxStateMeta state(UUID txId) {
+ return txStateMap.get(txId);
+
+ // TODO: IGNITE-21024 Public methods of transaction manager do not have NodeStoppingException in definition.
+ // if (busyLock.enterBusy()) {
+ // try {
+ // return txStateMap.get(txId);
+ // } finally {
+ // busyLock.leaveBusy();
+ // }
+ // }
+ // throw new NodeStoppingException();
+ }
+
+ /**
+ * Gets all defined transactions meta states.
+ *
+ * @return Collection of transaction meta states.
+ */
+ public Collection<TxStateMeta> states() {
+ return txStateMap.values();
+
+ // TODO: IGNITE-21024 Public methods of transaction manager do not have NodeStoppingException in definition.
+ // if (busyLock.enterBusy()) {
+ // try {
+ // return txStateMap.values();
+ // } finally {
+ // busyLock.leaveBusy();
+ // }
+ // }
+ // throw new NodeStoppingException();
+ }
+}
diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 14cc7ac856..0cc5b0dedb 100644
--- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -46,6 +46,8 @@ import static org.mockito.Mockito.when;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.LongSupplier;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -56,6 +58,7 @@ import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.PrimaryReplicaExpiredException;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
@@ -80,7 +83,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
/**
* Basic tests for a transaction manager.
*/
-@ExtendWith(MockitoExtension.class)
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
public class TxManagerTest extends IgniteAbstractTest {
private static final ClusterNode LOCAL_NODE = new ClusterNodeImpl("local_id", "local", new NetworkAddress("127.0.0.1", 2004), null);
@@ -102,6 +105,9 @@ public class TxManagerTest extends IgniteAbstractTest {
@Mock
private PlacementDriver placementDriver;
+ @InjectConfiguration
+ private TransactionConfiguration txConfiguration;
+
/** Init test callback. */
@BeforeEach
public void setup() {
@@ -116,6 +122,7 @@ public class TxManagerTest extends IgniteAbstractTest {
when(replicaService.invoke(anyString(), any())).thenReturn(nullCompletedFuture());
txManager = new TxManagerImpl(
+ txConfiguration,
clusterService,
replicaService,
new HeapLockManager(),
diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxStateTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxStateTest.java
index 35dacd26bd..22d04806cf 100644
--- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxStateTest.java
+++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxStateTest.java
@@ -96,7 +96,7 @@ public class TxStateTest {
// Allowed.
assertTrue(TxState.checkTransitionCorrectness(TxState.ABORTED, TxState.ABORTED));
- assertTrue(TxState.checkTransitionCorrectness(TxState.ABORTED, TxState.ABANDONED));
+ assertFalse(TxState.checkTransitionCorrectness(TxState.ABORTED, TxState.ABANDONED));
}
@Test
@@ -108,7 +108,7 @@ public class TxStateTest {
// Allowed.
assertTrue(TxState.checkTransitionCorrectness(TxState.COMMITED, TxState.COMMITED));
- assertTrue(TxState.checkTransitionCorrectness(TxState.COMMITED, TxState.ABANDONED));
+ assertFalse(TxState.checkTransitionCorrectness(TxState.COMMITED, TxState.ABANDONED));
}
/**
@@ -117,7 +117,7 @@ public class TxStateTest {
@Test
void testTransitionsFromAbandoned() {
// Allowed.
- assertTrue(TxState.checkTransitionCorrectness(TxState.ABANDONED, TxState.PENDING));
+ assertFalse(TxState.checkTransitionCorrectness(TxState.ABANDONED, TxState.PENDING));
assertTrue(TxState.checkTransitionCorrectness(TxState.ABANDONED, TxState.FINISHING));
assertTrue(TxState.checkTransitionCorrectness(TxState.ABANDONED, TxState.ABORTED));
assertTrue(TxState.checkTransitionCorrectness(TxState.ABANDONED, TxState.COMMITED));