You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2022/10/28 15:22:42 UTC
[ignite-3] branch ignite-3.0.0-beta1 updated: IGNITE-17967 RO writeIntent resolution tests hang up in case of multi node cluster (#1255)
This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-3.0.0-beta1 by this push:
new 6ff8e032e9 IGNITE-17967 RO writeIntent resolution tests hang up in case of multi node cluster (#1255)
6ff8e032e9 is described below
commit 6ff8e032e9c80630e667f043805212e3299adcaf
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Fri Oct 28 17:30:21 2022 +0300
IGNITE-17967 RO writeIntent resolution tests hang up in case of multi node cluster (#1255)
(cherry picked from commit 7d4bf9747dace1471214aea56caa3148b76a20eb)
---
.../internal/sql/engine/SqlQueryProcessor.java | 3 +-
.../replicator/PartitionReplicaListener.java | 240 +++++++++++++--------
.../ignite/internal/table/TxAbstractTest.java | 10 +-
.../apache/ignite/internal/table/TxLocalTest.java | 15 +-
.../table/impl/DummyInternalTableImpl.java | 28 ++-
5 files changed, 183 insertions(+), 113 deletions(-)
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index b3fdf34019..8491b3ee99 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -141,7 +141,8 @@ public class SqlQueryProcessor implements QueryProcessor {
/** Transaction manager. */
private final TxManager txManager;
- private HybridClock clock;
+ /** Clock. */
+ private final HybridClock clock;
/** Constructor. */
public SqlQueryProcessor(
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 42334a4117..4953287a6a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -241,25 +241,33 @@ public class PartitionReplicaListener implements ReplicaListener {
} else if (request instanceof ReadWriteMultiRowReplicaRequest) {
return processMultiEntryAction((ReadWriteMultiRowReplicaRequest) request);
} else if (request instanceof ReadWriteSwapRowReplicaRequest) {
- return processTwoEntriesAction((ReadWriteSwapRowReplicaRequest) request);
+ return processTwoEntriesAction((ReadWriteSwapRowReplicaRequest) request)
+ .thenApply(Function.identity());
} else if (request instanceof ReadWriteScanRetrieveBatchReplicaRequest) {
- return processScanRetrieveBatchAction((ReadWriteScanRetrieveBatchReplicaRequest) request);
+ return processScanRetrieveBatchAction((ReadWriteScanRetrieveBatchReplicaRequest) request)
+ .thenApply(Function.identity());
} else if (request instanceof ReadWriteScanCloseReplicaRequest) {
processScanCloseAction((ReadWriteScanCloseReplicaRequest) request);
return completedFuture(null);
} else if (request instanceof TxFinishReplicaRequest) {
- return processTxFinishAction((TxFinishReplicaRequest) request);
+ return processTxFinishAction((TxFinishReplicaRequest) request)
+ .thenApply(Function.identity());
} else if (request instanceof TxCleanupReplicaRequest) {
- return processTxCleanupAction((TxCleanupReplicaRequest) request);
+ return processTxCleanupAction((TxCleanupReplicaRequest) request)
+ .thenApply(Function.identity());
} else if (request instanceof ReadOnlySingleRowReplicaRequest) {
- return processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request, isPrimary);
+ return processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request, isPrimary)
+ .thenApply(Function.identity());
} else if (request instanceof ReadOnlyMultiRowReplicaRequest) {
- return processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request, isPrimary);
+ return processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request, isPrimary)
+ .thenApply(Function.identity());
} else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) {
- return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request, isPrimary);
+ return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request, isPrimary)
+ .thenApply(Function.identity());
} else if (request instanceof ReplicaSafeTimeSyncRequest) {
- return processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request);
+ return processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request)
+ .thenApply(Function.identity());
} else {
throw new UnsupportedReplicaRequestException(request.getClass());
}
@@ -327,7 +335,7 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param isPrimary Whether the given replica is primary.
* @return Result future.
*/
- private CompletableFuture<Object> processReadOnlyScanRetrieveBatchAction(
+ private CompletableFuture<ArrayList<BinaryRow>> processReadOnlyScanRetrieveBatchAction(
ReadOnlyScanRetrieveBatchReplicaRequest request,
Boolean isPrimary
) {
@@ -339,24 +347,59 @@ public class PartitionReplicaListener implements ReplicaListener {
IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
- ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
+ CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(readTimestamp);
+
+ return safeReadFuture.thenCompose(unused -> retrieveExactEntriesUntilCursorEmpty(readTimestamp, cursorId, batchCount));
+ }
+ /**
+ * Extracts exact amount of entries, or less if cursor is become empty, from a cursor on the specific time.
+ *
+ * @param readTimestamp Timestamp of the moment when that moment when the data will be extracted.
+ * @param cursorId Cursor id.
+ * @param count Amount of entries which sill be extracted.
+ * @return Result future.
+ */
+ private CompletableFuture<ArrayList<BinaryRow>> retrieveExactEntriesUntilCursorEmpty(
+ HybridTimestamp readTimestamp,
+ IgniteUuid cursorId,
+ int count
+ ) {
@SuppressWarnings("resource") PartitionTimestampCursor cursor = cursors.computeIfAbsent(cursorId,
id -> mvDataStorage.scan(HybridTimestamp.MAX_VALUE));
- CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(readTimestamp);
+ ArrayList<CompletableFuture<BinaryRow>> resolutionFuts = new ArrayList<>(count);
- // TODO https://issues.apache.org/jira/browse/IGNITE-17824 Dedicated thread pool should be used.
- return safeReadFuture.thenApplyAsync(ignored -> {
- while (batchRows.size() < batchCount && cursor.hasNext()) {
- BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), readTimestamp, () -> cursor.committed(readTimestamp));
+ while (resolutionFuts.size() < count && cursor.hasNext()) {
+ ReadResult readResult = cursor.next();
+ HybridTimestamp newestCommitTimestamp = readResult.newestCommitTimestamp();
+
+ BinaryRow candidate =
+ newestCommitTimestamp == null || !readResult.isWriteIntent() ? null : cursor.committed(newestCommitTimestamp);
+
+ resolutionFuts.add(resolveReadResult(readResult, readTimestamp, () -> candidate));
+ }
+
+ return allOf(resolutionFuts.toArray(new CompletableFuture[0])).thenCompose(unused -> {
+ ArrayList<BinaryRow> rows = new ArrayList<>(count);
+
+ for (CompletableFuture<BinaryRow> resolutionFut : resolutionFuts) {
+ BinaryRow resolvedReadResult = resolutionFut.join();
if (resolvedReadResult != null) {
- batchRows.add(resolvedReadResult);
+ rows.add(resolvedReadResult);
}
}
- return batchRows;
+ if (rows.size() < count && cursor.hasNext()) {
+ return retrieveExactEntriesUntilCursorEmpty(readTimestamp, cursorId, count - rows.size()).thenApply(binaryRows -> {
+ rows.addAll(binaryRows);
+
+ return rows;
+ });
+ } else {
+ return completedFuture(rows);
+ }
});
}
@@ -367,8 +410,8 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param isPrimary Whether the given replica is primary.
* @return Result future.
*/
- private CompletableFuture<Object> processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request, Boolean isPrimary) {
- BinaryRow tableRow = request.binaryRow();
+ private CompletableFuture<BinaryRow> processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request, Boolean isPrimary) {
+ BinaryRow searchRow = request.binaryRow();
HybridTimestamp readTimestamp = request.readTimestamp();
if (request.requestType() != RequestType.RO_GET) {
@@ -378,11 +421,7 @@ public class PartitionReplicaListener implements ReplicaListener {
CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(request.readTimestamp());
- // TODO https://issues.apache.org/jira/browse/IGNITE-17824 Dedicated thread pool should be used.
- return safeReadFuture.thenApplyAsync(ignored -> {
- //TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
- return resolveRowByPk(tableRow, readTimestamp, (rowId, binaryRow) -> binaryRow);
- });
+ return safeReadFuture.thenCompose(unused -> resolveRowByPk(searchRow, readTimestamp));
}
/**
@@ -392,7 +431,13 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param isPrimary Whether the given replica is primary.
* @return Result future.
*/
- private CompletableFuture<Object> processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request, Boolean isPrimary) {
+ private CompletableFuture<ArrayList<BinaryRow>> processReadOnlyMultiEntryAction(
+ ReadOnlyMultiRowReplicaRequest request,
+ Boolean isPrimary
+ ) {
+ Collection<BinaryRow> searchRows = request.binaryRows();
+ HybridTimestamp readTimestamp = request.readTimestamp();
+
if (request.requestType() != RequestType.RO_GET_ALL) {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
format("Unknown single request [actionType={}]", request.requestType()));
@@ -400,17 +445,28 @@ public class PartitionReplicaListener implements ReplicaListener {
CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(request.readTimestamp());
- // TODO https://issues.apache.org/jira/browse/IGNITE-17824 Dedicated thread pool should be used.
- return safeReadFuture.thenApplyAsync(ignored -> {
- ArrayList<BinaryRow> result = new ArrayList<>(request.binaryRows().size());
+ return safeReadFuture.thenCompose(unused -> {
+ ArrayList<CompletableFuture<BinaryRow>> resolutionFuts = new ArrayList<>(searchRows.size());
- for (BinaryRow searchRow : request.binaryRows()) {
- BinaryRow row = resolveRowByPk(searchRow, request.readTimestamp(), (rowId, binaryRow) -> binaryRow);
+ for (BinaryRow searchRow : searchRows) {
+ CompletableFuture<BinaryRow> fut = resolveRowByPk(searchRow, readTimestamp);
- result.add(row);
+ resolutionFuts.add(fut);
}
- return result;
+ return allOf(resolutionFuts.toArray(new CompletableFuture[0])).thenApply(unused1 -> {
+ ArrayList<BinaryRow> result = new ArrayList<>(resolutionFuts.size());
+
+ for (CompletableFuture<BinaryRow> resolutionFut : resolutionFuts) {
+ BinaryRow resolvedReadResult = resolutionFut.join();
+
+ if (resolvedReadResult != null) {
+ result.add(resolvedReadResult);
+ }
+ }
+
+ return result;
+ });
});
}
@@ -420,7 +476,7 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param request Request.
* @return Future.
*/
- private CompletionStage<Object> processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) {
+ private CompletionStage<Void> processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) {
return raftClient.run(new SafeTimeSyncCommand());
}
@@ -489,7 +545,7 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param request Scan retrieve batch request operation.
* @return Listener response.
*/
- private CompletableFuture<Object> processScanRetrieveBatchAction(ReadWriteScanRetrieveBatchReplicaRequest request) {
+ private CompletableFuture<ArrayList<BinaryRow>> processScanRetrieveBatchAction(ReadWriteScanRetrieveBatchReplicaRequest request) {
UUID txId = request.transactionId();
int batchCount = request.batchSize();
@@ -525,7 +581,7 @@ public class PartitionReplicaListener implements ReplicaListener {
* @return future result of the operation.
*/
// TODO: need to properly handle primary replica changes https://issues.apache.org/jira/browse/IGNITE-17615
- private CompletableFuture<Object> processTxFinishAction(TxFinishReplicaRequest request) {
+ private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest request) {
List<ReplicationGroupId> aggregatedGroupIds = request.groups().values().stream()
.flatMap(List::stream).map(IgniteBiTuple::get1).collect(Collectors.toList());
@@ -552,7 +608,7 @@ public class PartitionReplicaListener implements ReplicaListener {
)
);
- return allOf(cleanupFutures).thenApply(ignored -> null);
+ return allOf(cleanupFutures);
}
/**
@@ -601,7 +657,7 @@ public class PartitionReplicaListener implements ReplicaListener {
* @return CompletableFuture of void.
*/
// TODO: need to properly handle primary replica changes https://issues.apache.org/jira/browse/IGNITE-17615
- private CompletableFuture processTxCleanupAction(TxCleanupReplicaRequest request) {
+ private CompletableFuture<Void> processTxCleanupAction(TxCleanupReplicaRequest request) {
try {
closeAllTransactionCursors(request.txId());
} catch (Exception e) {
@@ -613,50 +669,6 @@ public class PartitionReplicaListener implements ReplicaListener {
.thenRun(() -> lockManager.locks(request.txId()).forEachRemaining(lockManager::release));
}
- /**
- * Finds the row and its identifier by given pk search row.
- *
- * @param tableRow A bytes representing a primary tableRow.
- * @param ts A timestamp regarding which we need to resolve the given row.
- * @param action An action to perform on a resolved row.
- * @param <T> A type of the value returned by action.
- * @return Result of the given action.
- */
- private <T> T resolveRowByPk(
- BinaryRow tableRow,
- HybridTimestamp ts,
- BiFunction<@Nullable RowId, @Nullable BinaryRow, T> action
- ) {
- try (Cursor<RowId> cursor = pkIndexStorage.get().get(tableRow)) {
- for (RowId rowId : cursor) {
- ReadResult readResult = mvDataStorage.read(rowId, ts);
-
- BinaryRow row = resolveReadResult(readResult, ts, () -> {
- if (readResult.newestCommitTimestamp() == null) {
- return null;
- }
-
- ReadResult committedReadResult = mvDataStorage.read(rowId, readResult.newestCommitTimestamp());
-
- assert !committedReadResult.isWriteIntent() :
- "The result is not committed [rowId=" + rowId + ", timestamp="
- + readResult.newestCommitTimestamp() + ']';
-
- return committedReadResult.binaryRow();
- });
-
- if (row != null && row.hasValue()) {
- return action.apply(rowId, row);
- }
- }
-
- return action.apply(null, null);
- } catch (Exception e) {
- throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
- format("Unable to close cursor [tableId={}]", tableId), e);
- }
- }
-
/**
* Finds the row and its identifier by given pk search row.
*
@@ -694,6 +706,42 @@ public class PartitionReplicaListener implements ReplicaListener {
});
}
+ /**
+ * Finds the row and its identifier by given pk search row.
+ *
+ * @param searchKey A bytes representing a primary key.
+ * @param ts A timestamp regarding which we need to resolve the given row.
+ * @return Result of the given action.
+ */
+ private CompletableFuture<BinaryRow> resolveRowByPk(BinaryRow searchKey, HybridTimestamp ts) {
+ try (Cursor<RowId> cursor = pkIndexStorage.get().get(searchKey)) {
+ for (RowId rowId : cursor) {
+ ReadResult readResult = mvDataStorage.read(rowId, ts);
+
+ return resolveReadResult(readResult, ts, () -> {
+ HybridTimestamp newestCommitTimestamp = readResult.newestCommitTimestamp();
+
+ if (newestCommitTimestamp == null) {
+ return null;
+ }
+
+ ReadResult committedReadResult = mvDataStorage.read(rowId, newestCommitTimestamp);
+
+ assert !committedReadResult.isWriteIntent() :
+ "The result is not committed [rowId=" + rowId + ", timestamp="
+ + newestCommitTimestamp + ']';
+
+ return committedReadResult.binaryRow();
+ });
+ }
+
+ return completedFuture(null);
+ } catch (Exception e) {
+ throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+ format("Unable to close cursor [tableId={}]", tableId), e);
+ }
+ }
+
/**
* Tests row values for equality.
*
@@ -1122,7 +1170,7 @@ public class PartitionReplicaListener implements ReplicaListener {
locks[idx++] = locker.locksForInsert(txId, tableRow, rowId);
}
- return CompletableFuture.allOf(locks);
+ return allOf(locks);
}
private CompletableFuture<?> takeRemoveLockOnIndexes(BinaryRow tableRow, RowId rowId, UUID txId) {
@@ -1139,7 +1187,7 @@ public class PartitionReplicaListener implements ReplicaListener {
locks[idx++] = locker.locksForRemove(txId, tableRow, rowId);
}
- return CompletableFuture.allOf(locks);
+ return allOf(locks);
}
/**
@@ -1193,7 +1241,7 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param request Two actions operation request.
* @return Listener response.
*/
- private CompletableFuture<Object> processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request) {
+ private CompletableFuture<Boolean> processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request) {
BinaryRow newRow = request.binaryRow();
BinaryRow expectedRow = request.oldBinaryRow();
TablePartitionId commitPartitionId = request.commitPartitionId();
@@ -1278,7 +1326,7 @@ public class PartitionReplicaListener implements ReplicaListener {
if (expectedTerm.equals(currentTerm)) {
return completedFuture(null);
} else {
- return CompletableFuture.failedFuture(new PrimaryReplicaMissException(expectedTerm, currentTerm));
+ return failedFuture(new PrimaryReplicaMissException(expectedTerm, currentTerm));
}
}
);
@@ -1297,7 +1345,8 @@ public class PartitionReplicaListener implements ReplicaListener {
* @return Resolved binary row.
*/
private BinaryRow resolveReadResult(ReadResult readResult, UUID txId) {
- return resolveReadResult(readResult, txId, null, null);
+ // Here is a safety join (waiting of the future result), because the resolution for RW transaction cannot lead to a network request.
+ return resolveReadResult(readResult, txId, null, null).join();
}
/**
@@ -1306,10 +1355,13 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param readResult Read result to resolve.
* @param timestamp Timestamp.
* @param lastCommitted Action to get the latest committed row.
- * @return Resolved binary row.
+ * @return Future to resolved binary row.
*/
- private BinaryRow resolveReadResult(ReadResult readResult, HybridTimestamp timestamp, Supplier<BinaryRow> lastCommitted) {
-
+ private CompletableFuture<BinaryRow> resolveReadResult(
+ ReadResult readResult,
+ HybridTimestamp timestamp,
+ Supplier<BinaryRow> lastCommitted
+ ) {
return resolveReadResult(readResult, null, timestamp, lastCommitted);
}
@@ -1327,9 +1379,9 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param txId Nullable transaction id, should be provided if resolution is performed within the context of RW transaction.
* @param timestamp Timestamp is used in RO transaction only.
* @param lastCommitted Action to get the latest committed row, it is used in RO transaction only.
- * @return Resolved binary row.
+ * @return Future to resolved binary row.
*/
- private BinaryRow resolveReadResult(
+ private CompletableFuture<BinaryRow> resolveReadResult(
ReadResult readResult,
@Nullable UUID txId,
@Nullable HybridTimestamp timestamp,
@@ -1344,7 +1396,7 @@ public class PartitionReplicaListener implements ReplicaListener {
if (retrievedResultTxId == null || txId.equals(retrievedResultTxId)) {
// Same transaction - return retrieved value. It may be both writeIntent or regular value.
- return readResult.binaryRow();
+ return completedFuture(readResult.binaryRow());
} else {
// Should never happen, currently, locks prevent reading another transaction intents during RW requests.
throw new AssertionError("Mismatched transaction id, expectedTxId={" + txId + "},"
@@ -1352,14 +1404,14 @@ public class PartitionReplicaListener implements ReplicaListener {
}
} else {
if (!readResult.isWriteIntent()) {
- return readResult.binaryRow();
+ return completedFuture(readResult.binaryRow());
}
CompletableFuture<BinaryRow> writeIntentResolutionFut = resolveWriteIntentAsync(
readResult, timestamp, lastCommitted);
// RO request.
- return writeIntentResolutionFut.join();
+ return writeIntentResolutionFut;
}
}
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 0d9cc2d77b..844dab2eb5 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -90,7 +90,7 @@ public abstract class TxAbstractTest extends IgniteAbstractTest {
);
/** Table ID test value. */
- public static final UUID tableId2 = java.util.UUID.randomUUID();
+ public static final UUID tableId2 = UUID.randomUUID();
protected static SchemaDescriptor CUSTOMERS_SCHEMA = new SchemaDescriptor(
1,
@@ -1798,7 +1798,6 @@ public abstract class TxAbstractTest extends IgniteAbstractTest {
assertEquals(100., accounts.recordView().get(readOnlyTx, makeKey(1)).doubleValue("balance"));
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17967")
@Test
public void testReadOnlyGetWriteIntentResolutionUpdate() {
accounts.recordView().upsert(null, makeValue(1, 100.));
@@ -1822,7 +1821,7 @@ public abstract class TxAbstractTest extends IgniteAbstractTest {
assertEquals(300., accounts.recordView().get(readOnlyTx2, makeKey(1)).doubleValue("balance"));
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17967, https://issues.apache.org/jira/browse/IGNITE-17968")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17968")
@Test
public void testReadOnlyGetWriteIntentResolutionRemove() {
accounts.recordView().upsert(null, makeValue(1, 100.));
@@ -1857,7 +1856,6 @@ public abstract class TxAbstractTest extends IgniteAbstractTest {
validateBalance(retrievedKeys, 100., 200.);
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17967")
// TODO: IGNITE-17968 Remove after fix.
@Test
public void testReadOnlyPendingWriteIntentSkipped() {
@@ -1876,14 +1874,14 @@ public abstract class TxAbstractTest extends IgniteAbstractTest {
tx.commit();
Collection<Tuple> retrievedKeys2 = accounts.recordView().getAll(readOnlyTx, List.of(makeKey(1), makeKey(2)));
- validateBalance(retrievedKeys2, 100., 300.);
+ validateBalance(retrievedKeys2, 100., 200.);
Transaction readOnlyTx2 = igniteTransactions.readOnly().begin();
Collection<Tuple> retrievedKeys3 = accounts.recordView().getAll(readOnlyTx2, List.of(makeKey(1), makeKey(2)));
validateBalance(retrievedKeys3, 100., 300.);
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17967, https://issues.apache.org/jira/browse/IGNITE-17968")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17968")
@Test
public void testReadOnlyPendingWriteIntentSkippedCombined() {
accounts.recordView().upsert(null, makeValue(1, 100.));
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index 0c6a57dce7..9809044912 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -31,6 +32,7 @@ import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.tx.LockManager;
@@ -38,6 +40,7 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.table.Table;
@@ -64,6 +67,7 @@ public class TxLocalTest extends TxAbstractTest {
lockManager = new HeapLockManager();
ReplicaService replicaSvc = mock(ReplicaService.class, RETURNS_DEEP_STUBS);
+ PlacementDriver placementDriver = mock(PlacementDriver.class, RETURNS_DEEP_STUBS);
Map<ReplicationGroupId, DummyInternalTableImpl> tables = new HashMap<>();
@@ -77,15 +81,22 @@ public class TxLocalTest extends TxAbstractTest {
}
).when(replicaSvc).invoke(any(), any());
+ doAnswer(invocationOnMock -> {
+ TxStateReplicaRequest request = invocationOnMock.getArgument(1);
+
+ return CompletableFuture.completedFuture(
+ tables.get(request.groupId()).txStateStorage().getTxStateStorage(0).get(request.txId()));
+ }).when(placementDriver).sendMetaRequest(any(), any());
+
txManager = new TxManagerImpl(replicaSvc, lockManager, new HybridClockImpl());
igniteTransactions = new IgniteTransactionsImpl(txManager);
- DummyInternalTableImpl table = new DummyInternalTableImpl(replicaSvc, txManager, true);
+ DummyInternalTableImpl table = new DummyInternalTableImpl(replicaSvc, txManager, true, placementDriver);
accounts = new TableImpl(table, new DummySchemaManagerImpl(ACCOUNTS_SCHEMA), lockManager);
- DummyInternalTableImpl table2 = new DummyInternalTableImpl(replicaSvc, txManager, true);
+ DummyInternalTableImpl table2 = new DummyInternalTableImpl(replicaSvc, txManager, true, placementDriver);
customers = new TableImpl(table2, new DummySchemaManagerImpl(CUSTOMERS_SCHEMA), lockManager);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 3f202bb5f0..4cff2aea18 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -50,14 +50,14 @@ import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
-import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateTableStorage;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -104,9 +104,15 @@ public class DummyInternalTableImpl extends InternalTableImpl {
* @param txManager Transaction manager.
* @param crossTableUsage If this dummy table is going to be used in cross-table tests, it won't mock the calls of ReplicaService
* by itself.
+ * @param placementDriver Placement driver.
*/
- public DummyInternalTableImpl(ReplicaService replicaSvc, TxManager txManager, boolean crossTableUsage) {
- this(replicaSvc, new TestMvPartitionStorage(0), txManager, crossTableUsage);
+ public DummyInternalTableImpl(
+ ReplicaService replicaSvc,
+ TxManager txManager,
+ boolean crossTableUsage,
+ PlacementDriver placementDriver
+ ) {
+ this(replicaSvc, new TestMvPartitionStorage(0), txManager, crossTableUsage, placementDriver);
}
/**
@@ -116,7 +122,7 @@ public class DummyInternalTableImpl extends InternalTableImpl {
* @param mvPartStorage Multi version partition storage.
*/
public DummyInternalTableImpl(ReplicaService replicaSvc, MvPartitionStorage mvPartStorage) {
- this(replicaSvc, mvPartStorage, null, false);
+ this(replicaSvc, mvPartStorage, null, false, null);
}
/**
@@ -127,12 +133,14 @@ public class DummyInternalTableImpl extends InternalTableImpl {
* @param txManager Transaction manager, if {@code null}, then default one will be created.
* @param crossTableUsage If this dummy table is going to be used in cross-table tests, it won't mock the calls of ReplicaService
* by itself.
+ * @param placementDriver Placement driver.
*/
public DummyInternalTableImpl(
ReplicaService replicaSvc,
MvPartitionStorage mvPartStorage,
@Nullable TxManager txManager,
- boolean crossTableUsage
+ boolean crossTableUsage,
+ PlacementDriver placementDriver
) {
super(
"test",
@@ -143,7 +151,7 @@ public class DummyInternalTableImpl extends InternalTableImpl {
addr -> Mockito.mock(ClusterNode.class),
txManager == null ? new TxManagerImpl(replicaSvc, new HeapLockManager(), new HybridClockImpl()) : txManager,
mock(MvTableStorage.class),
- mock(TxStateTableStorage.class),
+ new TestConcurrentHashMapTxStateTableStorage(),
replicaSvc,
new HybridClockImpl()
);
@@ -242,15 +250,15 @@ public class DummyInternalTableImpl extends InternalTableImpl {
pkStorage,
clock,
new PendingComparableValuesTracker<>(clock.now()),
+ txStateStorage().getOrCreateTxStateStorage(0),
null,
- null,
- null,
+ placementDriver,
peer -> true
);
partitionListener = new PartitionListener(
mvPartStorage,
- new TestConcurrentHashMapTxStateStorage(),
+ txStateStorage().getOrCreateTxStateStorage(0),
this.txManager,
() -> Map.of(pkStorage.get().id(), pkStorage.get())
);