You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2023/12/27 15:35:17 UTC

(ignite-3) branch cache_poc created (now 231cad4cd6)

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a change to branch cache_poc
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


      at 231cad4cd6 Jcache basic integration

This branch includes the following new commits:

     new 231cad4cd6 Jcache basic integration

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



(ignite-3) 01/01: Jcache basic integration

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch cache_poc
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 231cad4cd65758414d868ebfed1eec084d9a0ff5
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Wed Dec 27 18:35:06 2023 +0300

    Jcache basic integration
---
 gradle/libs.versions.toml                          |   3 +
 modules/api/build.gradle                           |   2 +
 .../main/java/org/apache/ignite/cache/Cache.java}  |  16 +-
 .../org/apache/ignite/cache/package-info.java}     |  18 +-
 .../apache/ignite/table/mapper/TypeConverter.java  |   4 +-
 .../java/org/apache/ignite/tx/Transaction.java     |   4 +-
 .../ignite/client/fakes/FakeInternalTable.java     |  15 +
 .../schema/marshaller/KvMarshallerTest.java        |  12 +
 .../org/apache/ignite/distributed/ItCacheTest.java | 248 +++++++++++++++
 .../ignite/internal/table/CacheEntryImpl.java      |  28 ++
 .../apache/ignite/internal/table/CacheImpl.java    | 351 +++++++++++++++++++++
 .../ignite/internal/table/InternalTable.java       |   6 +
 .../internal/table/KeyValueBinaryViewImpl.java     |  10 +-
 .../ignite/internal/table/KeyValueViewImpl.java    |   2 +-
 .../apache/ignite/internal/table/TableImpl.java    |  20 +-
 .../ignite/internal/table/TableViewInternal.java   |  23 ++
 .../distributed/storage/InternalTableImpl.java     |  63 ++++
 .../apache/ignite/distributed/ItTxTestCluster.java | 234 +++++++++++++-
 .../ignite/internal/tx/InternalTransaction.java    |   2 +
 .../apache/ignite/internal/tx/LockException.java   |   4 +-
 .../org/apache/ignite/internal/tx/TxManager.java   |   4 +-
 .../internal/tx/impl/ReadOnlyTransactionImpl.java  |   7 +
 .../internal/tx/impl/ReadWriteTransactionImpl.java |  40 ++-
 .../ignite/internal/tx/impl/TxManagerImpl.java     | 107 ++++++-
 24 files changed, 1156 insertions(+), 67 deletions(-)

diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 71512f347a..37dffa6161 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -79,6 +79,7 @@ awaitility = "4.2.0"
 progressBar = "0.9.4"
 guava = "31.1-jre"
 jna = "5.13.0"
+jcache = "1.1.1"
 
 #Tools
 pmdTool = "6.55.0"
@@ -255,3 +256,5 @@ awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility"
 progressBar = { module = "me.tongfei:progressbar", version.ref = "progressBar" }
 
 jna = { module = "net.java.dev.jna:jna", version.ref = "jna"}
+
+jcache = { module = "javax.cache:cache-api", version.ref = "jcache"}
\ No newline at end of file
diff --git a/modules/api/build.gradle b/modules/api/build.gradle
index bb3ff39da3..48be779be5 100644
--- a/modules/api/build.gradle
+++ b/modules/api/build.gradle
@@ -22,6 +22,8 @@ apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
 
 
 dependencies {
+    api libs.jcache
+
     implementation libs.fastutil.core
     implementation libs.jetbrains.annotations
 
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java b/modules/api/src/main/java/org/apache/ignite/cache/Cache.java
similarity index 57%
copy from modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
copy to modules/api/src/main/java/org/apache/ignite/cache/Cache.java
index ab1919a537..60e6b41b6f 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
+++ b/modules/api/src/main/java/org/apache/ignite/cache/Cache.java
@@ -15,20 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.tx;
+package org.apache.ignite.cache;
 
 /**
- * This exception is thrown when a lock cannot be acquired, released or downgraded.
+ *
  */
-public class LockException extends TransactionInternalCheckedException {
-    /**
-     * Creates a new instance of LockException with the given message.
-     *
-     * @param code Full error code. {{@link org.apache.ignite.lang.ErrorGroups.Transactions#ACQUIRE_LOCK_ERR},
-     *     {@link org.apache.ignite.lang.ErrorGroups.Transactions#ACQUIRE_LOCK_TIMEOUT_ERR},
-     * @param msg The detail message.
-     */
-    public LockException(int code, String msg) {
-        super(code, msg);
-    }
+public interface Cache<K, V> extends javax.cache.Cache<K, V> {
 }
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java b/modules/api/src/main/java/org/apache/ignite/cache/package-info.java
similarity index 56%
copy from modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
copy to modules/api/src/main/java/org/apache/ignite/cache/package-info.java
index ab1919a537..b2de547924 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
+++ b/modules/api/src/main/java/org/apache/ignite/cache/package-info.java
@@ -15,20 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.tx;
-
 /**
- * This exception is thrown when a lock cannot be acquired, released or downgraded.
+ * Cache access API.
  */
-public class LockException extends TransactionInternalCheckedException {
-    /**
-     * Creates a new instance of LockException with the given message.
-     *
-     * @param code Full error code. {{@link org.apache.ignite.lang.ErrorGroups.Transactions#ACQUIRE_LOCK_ERR},
-     *     {@link org.apache.ignite.lang.ErrorGroups.Transactions#ACQUIRE_LOCK_TIMEOUT_ERR},
-     * @param msg The detail message.
-     */
-    public LockException(int code, String msg) {
-        super(code, msg);
-    }
-}
+
+package org.apache.ignite.cache;
diff --git a/modules/api/src/main/java/org/apache/ignite/table/mapper/TypeConverter.java b/modules/api/src/main/java/org/apache/ignite/table/mapper/TypeConverter.java
index f2c4e6f7cd..c460fb8197 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/mapper/TypeConverter.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/mapper/TypeConverter.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.table.mapper;
 
 /**
- * Type converter interface provides methods for additional data transformation of the field type to a type 
+ * Type converter interface provides methods for additional data transformation of the field type to a type
  * compatible with the column type, and vice versa.
  *
  * <p>The converter can be used to convert objects (or their fields) whose type is incompatible with the schema.
@@ -35,7 +35,7 @@ public interface TypeConverter<ObjectT, ColumnT> {
      * @return Object of column type.
      * @throws Exception If transformation failed.
      */
-    ColumnT toColumnType(ObjectT obj) throws Exception;
+    ColumnT toColumnType(ObjectT obj) throws Exception; // TODO specify excpection
 
     /**
      * Transforms to an object of the target type; called after the data is read from a column.
diff --git a/modules/api/src/main/java/org/apache/ignite/tx/Transaction.java b/modules/api/src/main/java/org/apache/ignite/tx/Transaction.java
index d2c0979de9..9e433560b5 100644
--- a/modules/api/src/main/java/org/apache/ignite/tx/Transaction.java
+++ b/modules/api/src/main/java/org/apache/ignite/tx/Transaction.java
@@ -56,9 +56,9 @@ public interface Transaction {
     CompletableFuture<Void> rollbackAsync();
 
     /**
-     * Returns {code true} if given transaction is a read-only, {@code false} otherwise.
+     * Returns {@code true} if given transaction is a read-only, {@code false} otherwise.
      *
-     * @return {code true} if given transaction is a read-only, {@code false} otherwise.
+     * @return {@code true} if given transaction is a read-only, {@code false} otherwise.
      */
     boolean isReadOnly();
 }
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 770204fd04..bb6a18e243 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -475,4 +475,19 @@ public class FakeInternalTable implements InternalTable {
     public @Nullable PendingComparableValuesTracker<Long, Void> getPartitionStorageIndexTracker(int partitionId) {
         return null;
     }
+
+    @Override
+    public CompletableFuture<BinaryRow> getForCache(BinaryRowEx keyRow, @Nullable InternalTransaction tx) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> putForCache(BinaryRowEx row, @Nullable InternalTransaction tx) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> removeForCache(BinaryRowEx keyRow, InternalTransaction tx) {
+        return null;
+    }
 }
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
index 56e2abe045..d0ac713f10 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
@@ -440,6 +440,18 @@ public class KvMarshallerTest {
     public void pojoMapping(MarshallerFactory factory) throws MarshallerException, IOException {
         Assumptions.assumeFalse(factory instanceof AsmMarshallerGenerator, "Generated marshaller doesn't support column mapping.");
 
+        final SchemaDescriptor schema0 = new SchemaDescriptor(
+                1,
+                new Column[]{new Column("key", BYTES, false)},
+                new Column[]{new Column("val", BYTES, true),
+                });
+
+        final KvMarshaller<Object, Object> marshaller00 = factory.create(schema0,
+                Mapper.of(Object.class, "\"key\"", new SerializingConverter<>()),
+                Mapper.of(Object.class, "\"val\"", new SerializingConverter<>()));
+
+        BinaryRow binaryRow00 = marshaller00.marshal(new Object(), new Object());
+
         final SchemaDescriptor schema = new SchemaDescriptor(
                 1,
                 new Column[]{new Column("key", INT64, false)},
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItCacheTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItCacheTest.java
new file mode 100644
index 0000000000..315aa10736
--- /dev/null
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItCacheTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.cache.Cache.Entry;
+import javax.cache.integration.CacheLoader;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriter;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.cache.Cache;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.schema.configuration.GcConfiguration;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.tx.HybridTimestampTracker;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test cache.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class ItCacheTest extends IgniteAbstractTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItCacheTest.class);
+
+    private static final int CACHE_TABLE_ID = 2;
+
+    private static final String CACHE_NAME = "test";
+
+    private TableViewInternal testTable;
+
+    protected final TestInfo testInfo;
+
+    //TODO fsync can be turned on again after https://issues.apache.org/jira/browse/IGNITE-20195
+    @InjectConfiguration("mock: { fsync: false }")
+    private static RaftConfiguration raftConfiguration;
+
+    @InjectConfiguration
+    protected static GcConfiguration gcConfig;
+
+    @InjectConfiguration
+    protected static TransactionConfiguration txConfiguration;
+
+    private ItTxTestCluster txTestCluster;
+
+    private final HybridTimestampTracker timestampTracker = new HybridTimestampTracker();
+
+    /**
+     * The constructor.
+     *
+     * @param testInfo Test info.
+     */
+    public ItCacheTest(TestInfo testInfo) {
+        this.testInfo = testInfo;
+    }
+
+    @BeforeEach
+    public void before() throws Exception {
+        txTestCluster = new ItTxTestCluster(
+                testInfo,
+                raftConfiguration,
+                txConfiguration,
+                workDir,
+                1,
+                1,
+                false,
+                timestampTracker
+        );
+        txTestCluster.prepareCluster();
+
+        testTable = txTestCluster.startCache(CACHE_NAME, CACHE_TABLE_ID);
+
+        log.info("Caches have been started");
+    }
+
+    @AfterEach
+    public void after() throws Exception {
+        txTestCluster.shutdownCluster();
+    }
+
+    /**
+     * Test basic cache operations.
+     */
+    @Test
+    public void testBasic() {
+        try (Cache<Integer, Integer> view = testTable.cacheView(txTestCluster.igniteTransactions, null, null, null, null)) {
+            view.put(1, 1);
+
+            assertEquals(1, view.get(1));
+
+            assertTrue(view.remove(1));
+
+            assertNull(view.get(1));
+
+            view.put(1, 2);
+
+            assertEquals(2, view.get(1));
+
+            assertTrue(view.remove(1));
+
+            assertNull(view.get(1));
+        }
+    }
+
+    /**
+     * Test basic cache operations.
+     */
+    @Test
+    public void testReadWriteThrough() {
+        Map<Integer, Integer> testStore = new HashMap<>();
+
+        TestCacheLoader loader = new TestCacheLoader(testStore);
+        TestCacheWriter writer = new TestCacheWriter(testStore);
+        try (Cache<Integer, Integer> view = testTable.cacheView(
+                txTestCluster.igniteTransactions,
+                loader,
+                writer,
+                null,
+                null)
+        ) {
+            assertNull(view.get(1));
+            validate(testStore, loader, writer, 0, 1, 0, 0);
+
+            assertNull(view.get(1)); // Repeating get should fetch tombstone and avoid loading from store.
+            validate(testStore, loader, writer, 0, 1, 0, 0);
+
+            view.put(1, 1);
+            validate(testStore, loader, writer, 1, 1, 1, 0);
+
+            assertEquals(1, view.get(1));
+            validate(testStore, loader, writer, 1, 1, 1, 0);
+
+            assertTrue(view.remove(1));
+            validate(testStore, loader, writer, 0, 1, 1, 1);
+
+            assertNull(view.get(1));
+            validate(testStore, loader, writer, 0, 2, 1, 1);
+        }
+    }
+
+    private static void validate(
+            Map<Integer, Integer> testStore, TestCacheLoader loader,
+            TestCacheWriter writer,
+            int expSize,
+            int expRead,
+            int expWrite,
+            int expRemove
+    ) {
+        assertEquals(expSize, testStore.size());
+        assertEquals(expRead, loader.getCounter());
+        assertEquals(expWrite, writer.getWriteCounter());
+        assertEquals(expRemove, writer.getRemoveCounter());
+    }
+
+    private static class TestCacheLoader implements CacheLoader<Integer, Integer> {
+        private final Map<Integer, Integer> store;
+        private long counter;
+
+        public TestCacheLoader(Map<Integer, Integer> store) {
+            this.store = store;
+        }
+
+        @Override
+        public Integer load(Integer key) throws CacheLoaderException {
+            counter++;
+            return store.get(key);
+        }
+
+        @Override
+        public Map<Integer, Integer> loadAll(Iterable<? extends Integer> keys) throws CacheLoaderException {
+            return null;
+        }
+
+        public long getCounter() {
+            return counter;
+        }
+    }
+
+    private static class TestCacheWriter implements CacheWriter<Integer, Integer> {
+        private final Map<Integer, Integer> store;
+        private long writeCounter;
+        private long removeCounter;
+
+        public TestCacheWriter(Map<Integer, Integer> store) {
+            this.store = store;
+        }
+
+        @Override
+        public void write(Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException {
+            writeCounter++;
+            store.put(entry.getKey(), entry.getValue());
+        }
+
+        @Override
+        public void writeAll(Collection<Entry<? extends Integer, ? extends Integer>> entries) throws CacheWriterException {
+
+        }
+
+        @Override
+        public void delete(Object key) throws CacheWriterException {
+            removeCounter++;
+            store.remove(key);
+        }
+
+        @Override
+        public void deleteAll(Collection<?> keys) throws CacheWriterException {
+
+        }
+
+        public long getWriteCounter() {
+            return writeCounter;
+        }
+
+        public long getRemoveCounter() {
+            return removeCounter;
+        }
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/CacheEntryImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/CacheEntryImpl.java
new file mode 100644
index 0000000000..9fb90f4277
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/CacheEntryImpl.java
@@ -0,0 +1,28 @@
+package org.apache.ignite.internal.table;
+
+import javax.cache.Cache;
+
+public class CacheEntryImpl<K, V> implements Cache.Entry<K, V> {
+    private final K key;
+    private final V value;
+
+    public CacheEntryImpl(K key, V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    @Override
+    public K getKey() {
+        return key;
+    }
+
+    @Override
+    public V getValue() {
+        return value;
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> clazz) {
+        return null;
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/CacheImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/CacheImpl.java
new file mode 100644
index 0000000000..0d60f248eb
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/CacheImpl.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.cache.CacheManager;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Configuration;
+import javax.cache.integration.CacheLoader;
+import javax.cache.integration.CacheWriter;
+import javax.cache.integration.CompletionListener;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import org.apache.ignite.cache.Cache;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.TypeConverter;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Key-value view implementation.
+ */
+public class CacheImpl<K, V> extends KeyValueBinaryViewImpl implements Cache<K, V> {
+    private static final byte[] TOMBSTONE = new byte[0];
+
+    public static final String KEY_COL = "KEY";
+    public static final String VAL_COL = "VALUE";
+    public static final String TTL_COL = "TTL";
+
+    private final TypeConverter<K, byte[]> keySerializer;
+    private final TypeConverter<V, byte[]> valueSerializer;
+
+    private final CacheLoader<K, V> loader;
+    private final CacheWriter<K, V> writer;
+
+    private final IgniteTransactions transactions;
+    private final int schemaVersion;
+
+    public CacheImpl(
+            InternalTable tbl,
+            SchemaVersions schemaVersions,
+            SchemaRegistry schemaReg,
+            IgniteTransactions transactions,
+            @Nullable CacheLoader<K, V> loader,
+            @Nullable CacheWriter<K, V> writer,
+            @Nullable TypeConverter<K, byte[]> keyConverter,
+            @Nullable TypeConverter<V, byte[]> valueConverter
+    ) {
+        super(tbl, schemaReg, schemaVersions);
+
+        // Schema is immutable for cache.
+        this.schemaVersion = rowConverter.registry().lastKnownSchemaVersion();
+
+        this.transactions = transactions;
+        this.loader = loader;
+        this.writer = writer;
+        this.keySerializer = keyConverter == null ? new SerializingConverter<>() : keyConverter;
+        this.valueSerializer = valueConverter == null ? new SerializingConverter<>() : valueConverter;
+    }
+
+    @Override
+    public @Nullable V get(K key) {
+        InternalTransaction tx = (InternalTransaction) transactions.begin();
+
+        try {
+            Tuple keyTup = Tuple.create().set(KEY_COL, keySerializer.toColumnType(key));
+
+            Row keyRow = marshal(keyTup, null, schemaVersion);
+
+            Tuple valTup = tbl.getForCache(keyRow, tx).thenApply(row -> unmarshalValue(row, schemaVersion)).join();
+
+            if (valTup == null) {
+                if (loader != null) {
+                    V val = loader.load(key);
+                    if (val == null) {
+                        valTup = Tuple.create().set(VAL_COL, TOMBSTONE).set(TTL_COL, 0L); // TODO ttl from policy
+                    } else {
+                        valTup = Tuple.create().set(VAL_COL, valueSerializer.toColumnType(val)).set(TTL_COL, 0L);
+                    }
+
+                    Row row = marshal(keyTup, valTup, schemaVersion);
+                    tbl.putForCache(row, tx).join();
+                }
+
+                return null;
+            }
+
+            // TODO use readResolve for tombstone.
+            byte[] val = valTup.value(VAL_COL);
+
+            assert val != null;
+
+            // Tombstone.
+            if (val.length == 0) {
+                return null;
+            }
+
+            return (V) keySerializer.toObjectType(val);
+        } catch (TransactionException e) {
+            tx.safeCleanup(false);
+
+            throw e;
+        } catch (Exception e) {
+            tx.safeCleanup(false);
+
+            // TODO mapper error has no codes. Need to refactor converter exceptions.
+            throw new TransactionException(e);
+        } finally {
+            tx.safeCleanup(true);
+        }
+    }
+
+    @Override
+    public Map<K, V> getAll(Set<? extends K> keys) {
+        return null;
+    }
+
+    @Override
+    public boolean containsKey(K key) {
+        return false;
+    }
+
+    @Override
+    public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionListener) {
+
+    }
+
+    @Override
+    public void put(K key, V value) {
+        InternalTransaction tx = (InternalTransaction) transactions.begin();
+
+        Objects.requireNonNull(key);
+        Objects.requireNonNull(value);
+
+        try {
+            Tuple keyTup = Tuple.create().set(KEY_COL, keySerializer.toColumnType(key));
+            Tuple valTup = Tuple.create().set(VAL_COL, valueSerializer.toColumnType(value)).set(TTL_COL, 0L);
+
+            Row row = marshal(keyTup, valTup, schemaVersion);
+            tbl.putForCache(row, tx).join();
+
+            if (writer != null) {
+                writer.write(new CacheEntryImpl<>(key, value));
+            }
+
+            tx.safeCleanup(true);
+        } catch (TransactionException e) {
+            tx.safeCleanup(false); // Async cleanup.
+
+            throw e;
+        } catch (Exception e) {
+            tx.safeCleanup(false); // Async cleanup.
+
+            throw new TransactionException(e);
+        }
+    }
+
+    @Override
+    public V getAndPut(K key, V value) {
+        return null;
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> map) {
+
+    }
+
+    @Override
+    public boolean putIfAbsent(K key, V value) {
+        return false;
+    }
+
+    @Override
+    public boolean remove(K key) {
+        InternalTransaction tx = (InternalTransaction) transactions.begin();
+
+        Objects.requireNonNull(key);
+
+        try {
+            Tuple keyTup = Tuple.create().set(KEY_COL, keySerializer.toColumnType(key));
+
+            Row row = marshal(keyTup, null, schemaVersion);
+
+            boolean removed = tbl.removeForCache(row, tx).join();
+
+            // Always call external delete.
+            if (writer != null) {
+                writer.delete(key);
+            }
+
+            tx.safeCleanup(true); // Safe to call in final block.
+
+            return removed;
+        } catch (TransactionException e) {
+            tx.safeCleanup(false); // Async cleanup.
+
+            throw e;
+        } catch (Exception e) {
+            tx.safeCleanup(false); // Async cleanup.
+
+            throw new TransactionException(e);
+        }
+    }
+
+    @Override
+    public boolean remove(K key, V oldValue) {
+        return false;
+    }
+
+    @Override
+    public V getAndRemove(K key) {
+        return null;
+    }
+
+    @Override
+    public boolean replace(K key, V oldValue, V newValue) {
+        return false;
+    }
+
+    @Override
+    public boolean replace(K key, V value) {
+        return false;
+    }
+
+    @Override
+    public V getAndReplace(K key, V value) {
+        return null;
+    }
+
+    @Override
+    public void removeAll(Set<? extends K> keys) {
+
+    }
+
+    @Override
+    public void removeAll() {
+
+    }
+
+    @Override
+    public void clear() {
+
+    }
+
+    @Override
+    public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) {
+        return null;
+    }
+
+    @Override
+    public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments) throws EntryProcessorException {
+        return null;
+    }
+
+    @Override
+    public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor,
+            Object... arguments) {
+        return null;
+    }
+
+    @Override
+    public String getName() {
+        return null;
+    }
+
+    @Override
+    public CacheManager getCacheManager() {
+        return null;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public boolean isClosed() {
+        return false;
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> clazz) {
+        return null;
+    }
+
+    @Override
+    public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
+
+    }
+
+    @Override
+    public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
+
+    }
+
+    @Override
+    public Iterator<Entry<K, V>> iterator() {
+        return null;
+    }
+
+    private static class SerializingConverter<T> implements TypeConverter<T, byte[]> {
+        /** {@inheritDoc} */
+        @Override
+        public byte[] toColumnType(T obj) throws Exception {
+            ByteArrayOutputStream out = new ByteArrayOutputStream(512);
+
+            try (ObjectOutputStream oos = new ObjectOutputStream(out)) {
+                oos.writeObject(obj);
+            }
+
+            return out.toByteArray();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public T toObjectType(byte[] data) throws Exception {
+            try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data))) {
+                return (T) ois.readObject();
+            }
+        }
+    }
+}
+
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index c631642a69..77296def6e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -86,6 +86,8 @@ public interface InternalTable extends ManuallyCloseable {
      */
     CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable InternalTransaction tx);
 
+    CompletableFuture<BinaryRow> getForCache(BinaryRowEx keyRow, @Nullable InternalTransaction tx);
+
     /**
      * Asynchronously gets a row with same key columns values as given one from the table on a specific node for the proposed readTimestamp.
      *
@@ -138,6 +140,8 @@ public interface InternalTable extends ManuallyCloseable {
      */
     CompletableFuture<Void> upsert(BinaryRowEx row, @Nullable InternalTransaction tx);
 
+    CompletableFuture<Void> putForCache(BinaryRowEx row, @Nullable InternalTransaction tx);
+
     /**
      * Asynchronously inserts records into a table, if they do not exist, or replaces the existing ones.
      *
@@ -223,6 +227,8 @@ public interface InternalTable extends ManuallyCloseable {
      */
     CompletableFuture<Boolean> delete(BinaryRowEx keyRow, @Nullable InternalTransaction tx);
 
+    CompletableFuture<Boolean> removeForCache(BinaryRowEx keyRow, InternalTransaction tx);
+
     /**
      * Asynchronously deletes given row from the table.
      *
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
index da4d606b44..bcb4df9f5e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
@@ -446,7 +446,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu
      * @return Row.
      * @throws IgniteException If failed to marshal key and/or value.
      */
-    private Row marshal(Tuple key, @Nullable Tuple val, int schemaVersion) throws IgniteException {
+    protected Row marshal(Tuple key, @Nullable Tuple val, int schemaVersion) throws IgniteException {
         try {
             return marshallerCache.marshaller(schemaVersion).marshal(key, val);
         } catch (TupleMarshallerException ex) {
@@ -461,7 +461,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu
      * @param schemaVersion The version to use when unmarshalling.
      * @return Value tuple.
      */
-    private @Nullable Tuple unmarshalValue(BinaryRow row, int schemaVersion) {
+    protected @Nullable Tuple unmarshalValue(BinaryRow row, int schemaVersion) {
         if (row == null) {
             return null;
         }
@@ -476,7 +476,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu
      * @param schemaVersion The version to use when unmarshalling.
      * @return Key-value pairs of tuples.
      */
-    private Map<Tuple, Tuple> unmarshalValues(Collection<BinaryRow> rows, int schemaVersion) {
+    protected Map<Tuple, Tuple> unmarshalValues(Collection<BinaryRow> rows, int schemaVersion) {
         Map<Tuple, Tuple> pairs = IgniteUtils.newHashMap(rows.size());
 
         for (Row row : rowConverter.resolveRows(rows, schemaVersion)) {
@@ -495,7 +495,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu
      * @param schemaVersion Schema version to use when marshalling.
      * @return Rows.
      */
-    private List<BinaryRowEx> marshalKeys(Collection<Tuple> keys, int schemaVersion) {
+    protected List<BinaryRowEx> marshalKeys(Collection<Tuple> keys, int schemaVersion) {
         if (keys.isEmpty()) {
             return Collections.emptyList();
         }
@@ -515,7 +515,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu
      * @param schemaVersion Schema version to use when marshalling.
      * @return Keys.
      */
-    private Collection<Tuple> unmarshalKeys(Collection<BinaryRow> rows, int schemaVersion) {
+    protected Collection<Tuple> unmarshalKeys(Collection<BinaryRow> rows, int schemaVersion) {
         if (rows.isEmpty()) {
             return Collections.emptyList();
         }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index 8f1179e251..afccee9d09 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -474,7 +474,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu
         try {
             return marsh.marshal(key);
         } catch (MarshallerException e) {
-            throw new IgniteException(e);
+            throw new org.apache.ignite.lang.MarshallerException(e);
         }
     }
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index e816e2b57b..28084c3c9b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -27,6 +27,9 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
+import javax.cache.integration.CacheLoader;
+import javax.cache.integration.CacheWriter;
+import org.apache.ignite.cache.Cache;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.marshaller.MarshallerException;
 import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -52,6 +55,9 @@ import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.mapper.TypeConverter;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -122,7 +128,8 @@ public class TableImpl implements TableViewInternal {
         return tbl;
     }
 
-    @Override public String name() {
+    @Override
+    public String name() {
         return tbl.name();
     }
 
@@ -299,6 +306,17 @@ public class TableImpl implements TableViewInternal {
         // TODO: IGNITE-19150 Also need to destroy the index storages
     }
 
+    @Override
+    public <K, V> Cache<K, V> cacheView(
+            IgniteTransactions transactions,
+            @Nullable CacheLoader<K, V> loader,
+            @Nullable CacheWriter<K, V> writer,
+            @Nullable TypeConverter<K, byte[]> keyConverter,
+            @Nullable TypeConverter<V, byte[]> valueConverter
+    ) {
+        return new CacheImpl<>(tbl, schemaVersions, schemaReg, transactions, loader, writer, keyConverter, valueConverter);
+    }
+
     private void awaitIndexes() {
         List<CompletableFuture<?>> toWait = new ArrayList<>();
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
index 8b2d7fdea0..9a032f5c47 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.table;
 
+import javax.cache.integration.CacheLoader;
+import javax.cache.integration.CacheWriter;
+import org.apache.ignite.cache.Cache;
 import org.apache.ignite.internal.schema.ColumnsExtractor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
@@ -26,6 +29,9 @@ import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.mapper.TypeConverter;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.jetbrains.annotations.Nullable;
 
 /** Internal table view interface. */
 public interface TableViewInternal extends Table {
@@ -129,4 +135,21 @@ public interface TableViewInternal extends Table {
      * @param indexId An index id to unregister.
      */
     void unregisterIndex(int indexId);
+
+    /**
+     * @param transactions Transactions facade.
+     * @param loader Cache loader.
+     * @param writer Cache writer.
+     * @param keyConverter Key converter.
+     * @param valueConverter Key converter.
+     * @param <K> Key type.
+     * @param <V> Value type.
+     * @return The instance.
+     */
+    <K, V> Cache<K, V> cacheView(
+            IgniteTransactions transactions,
+            @Nullable CacheLoader<K, V> loader,
+            @Nullable CacheWriter<K, V> writer,
+            @Nullable TypeConverter<K, byte[]> keyConverter,
+            @Nullable TypeConverter<V, byte[]> valueConverter);
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 0030060085..1d58f447e2 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -785,6 +785,27 @@ public class InternalTableImpl implements InternalTable {
         );
     }
 
+    @Override
+    public CompletableFuture<BinaryRow> getForCache(BinaryRowEx keyRow, InternalTransaction tx) {
+        return enlistInTx(
+                keyRow,
+                tx,
+                (txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
+                        .groupId(groupId)
+                        .schemaVersion(keyRow.schemaVersion())
+                        .primaryKey(keyRow.tupleSlice())
+                        .commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
+                        .transactionId(txo.id())
+                        .enlistmentConsistencyToken(term)
+                        .requestType(RW_GET)
+                        .timestampLong(clock.nowLong())
+                        .full(tx == null)
+                        .build(),
+                (res, req) -> false,
+                true
+        );
+    }
+
     @Override
     public CompletableFuture<BinaryRow> get(
             BinaryRowEx keyRow,
@@ -981,6 +1002,27 @@ public class InternalTableImpl implements InternalTable {
         );
     }
 
+    @Override
+    public CompletableFuture<Void> putForCache(BinaryRowEx row, @Nullable InternalTransaction tx) {
+        return enlistInTx(
+                row,
+                tx,
+                (txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
+                        .groupId(groupId)
+                        .commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
+                        .schemaVersion(row.schemaVersion())
+                        .binaryTuple(row.tupleSlice())
+                        .transactionId(txo.id())
+                        .enlistmentConsistencyToken(term)
+                        .requestType(RequestType.RW_UPSERT)
+                        .timestampLong(clock.nowLong())
+                        .full(tx == null)
+                        .build(),
+                (res, req) -> false,
+                true
+        );
+    }
+
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, InternalTransaction tx) {
@@ -1196,6 +1238,27 @@ public class InternalTableImpl implements InternalTable {
         );
     }
 
+    @Override
+    public CompletableFuture<Boolean> removeForCache(BinaryRowEx keyRow, InternalTransaction tx) {
+        return enlistInTx(
+                keyRow,
+                tx,
+                (txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
+                        .groupId(groupId)
+                        .commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
+                        .schemaVersion(keyRow.schemaVersion())
+                        .primaryKey(keyRow.tupleSlice())
+                        .transactionId(txo.id())
+                        .enlistmentConsistencyToken(term)
+                        .requestType(RequestType.RW_DELETE)
+                        .timestampLong(clock.nowLong())
+                        .full(tx == null)
+                        .build(),
+                (res, req) -> !res,
+                true
+        );
+    }
+
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> deleteExact(BinaryRowEx oldRow, InternalTransaction tx) {
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index b6b433bd91..7817ec916a 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -86,6 +86,7 @@ import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.ColumnsExtractor;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -127,6 +128,7 @@ import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
@@ -610,7 +612,7 @@ public class ItTxTestCluster {
             }
         }
 
-        CompletableFuture.allOf(partitionReadyFutures.toArray(new CompletableFuture[0])).join();
+        allOf(partitionReadyFutures.toArray(new CompletableFuture[0])).join();
 
         raftClients.computeIfAbsent(tableName, t -> new ArrayList<>()).addAll(clients.values());
 
@@ -635,6 +637,234 @@ public class ItTxTestCluster {
         );
     }
 
+    /**
+     * Starts a cache.
+     *
+     * @param cacheName Cache name.
+     * @param cacheId Cache id.
+     * @return Groups map.
+     */
+    public TableViewInternal startCache(String cacheName, int cacheId) throws Exception {
+        SchemaDescriptor schemaDescriptor = new SchemaDescriptor(
+                1,
+                new Column[]{new Column("key".toUpperCase(), NativeTypes.BYTES, false)},
+                new Column[]{
+                        new Column("ttl".toUpperCase(), NativeTypes.INT64, false),
+                        new Column("value".toUpperCase(), NativeTypes.BYTES, false)
+                }
+        );
+
+        CatalogService catalogService = mock(CatalogService.class);
+
+        CatalogTableDescriptor tableDescriptor = mock(CatalogTableDescriptor.class);
+        when(tableDescriptor.tableVersion()).thenReturn(SCHEMA_VERSION);
+
+        lenient().when(catalogService.table(anyInt(), anyLong())).thenReturn(tableDescriptor);
+
+        List<Set<Assignment>> calculatedAssignments = AffinityUtils.calculateAssignments(
+                cluster.stream().map(node -> node.topologyService().localMember().name()).collect(toList()),
+                1,
+                replicas
+        );
+
+        List<Set<String>> assignments = calculatedAssignments.stream()
+                .map(a -> a.stream().map(Assignment::consistentId).collect(toSet()))
+                .collect(toList());
+
+        List<TablePartitionId> grpIds = IntStream.range(0, assignments.size())
+                .mapToObj(i -> new TablePartitionId(cacheId, i))
+                .collect(toList());
+
+        Int2ObjectOpenHashMap<RaftGroupService> clients = new Int2ObjectOpenHashMap<>();
+
+        List<CompletableFuture<Void>> partitionReadyFutures = new ArrayList<>();
+
+        int globalIndexId = 1;
+
+        ThreadLocalPartitionCommandsMarshaller commandsMarshaller =
+                new ThreadLocalPartitionCommandsMarshaller(cluster.get(0).serializationRegistry());
+
+        for (int p = 0; p < assignments.size(); p++) {
+            Set<String> partAssignments = assignments.get(p);
+
+            TablePartitionId grpId = grpIds.get(p);
+
+            for (String assignment : partAssignments) {
+                int partId = p;
+
+                var mvPartStorage = new TestMvPartitionStorage(partId);
+                var txStateStorage = txStateStorages.get(assignment);
+                var transactionStateResolver = new TransactionStateResolver(
+                        replicaServices.get(assignment),
+                        txManagers.get(assignment),
+                        clocks.get(assignment),
+                        nodeResolver,
+                        clusterServices.get(assignment).messagingService(),
+                        placementDriver
+                );
+                transactionStateResolver.start();
+
+                int indexId = globalIndexId++;
+
+                ColumnsExtractor row2Tuple = BinaryRowConverter.keyExtractor(schemaDescriptor);
+
+                StorageHashIndexDescriptor pkIndexDescriptor = mock(StorageHashIndexDescriptor.class);
+
+                when(pkIndexDescriptor.columns()).then(invocation -> Collections.nCopies(
+                        schemaDescriptor.keyColumns().columns().length,
+                        mock(StorageHashIndexColumnDescriptor.class)
+                ));
+
+                Lazy<TableSchemaAwareIndexStorage> pkStorage = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
+                        indexId,
+                        new TestHashIndexStorage(partId, pkIndexDescriptor),
+                        row2Tuple
+                ));
+
+                IndexLocker pkLocker = new HashIndexLocker(indexId, true, txManagers.get(assignment).lockManager(), row2Tuple);
+
+                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(partAssignments);
+
+                PendingComparableValuesTracker<HybridTimestamp, Void> safeTime =
+                        new PendingComparableValuesTracker<>(clocks.get(assignment).now());
+                PendingComparableValuesTracker<Long, Void> storageIndexTracker = new PendingComparableValuesTracker<>(0L);
+
+                PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(cacheId, partId, mvPartStorage);
+
+                IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(
+                        DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get()))
+                );
+
+                StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
+                        partId,
+                        partitionDataStorage,
+                        indexUpdateHandler
+                );
+
+                TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
+                        clusterServices.get(assignment),
+                        logicalTopologyService(clusterServices.get(assignment)),
+                        Loza.FACTORY,
+                        new RaftGroupEventsClientListener()
+                );
+
+                PartitionListener partitionListener = new PartitionListener(
+                        txManagers.get(assignment),
+                        partitionDataStorage,
+                        storageUpdateHandler,
+                        txStateStorage,
+                        safeTime,
+                        storageIndexTracker
+                );
+
+                CompletableFuture<Void> partitionReadyFuture = raftServers.get(assignment).startRaftGroupNode(
+                        new RaftNodeId(grpId, configuration.peer(assignment)),
+                        configuration,
+                        partitionListener,
+                        RaftGroupEventsListener.noopLsnr,
+                        topologyAwareRaftGroupServiceFactory
+                ).thenAccept(
+                        raftSvc -> {
+                            try {
+                                DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schemaDescriptor);
+
+                                PartitionReplicaListener listener = newReplicaListener(
+                                        mvPartStorage,
+                                        raftSvc,
+                                        txManagers.get(assignment),
+                                        Runnable::run,
+                                        partId,
+                                        cacheId,
+                                        () -> Map.of(pkLocker.id(), pkLocker),
+                                        pkStorage,
+                                        Map::of,
+                                        clocks.get(assignment),
+                                        safeTime,
+                                        txStateStorage,
+                                        transactionStateResolver,
+                                        storageUpdateHandler,
+                                        new DummyValidationSchemasSource(schemaManager),
+                                        nodeResolver.getByConsistentId(assignment),
+                                        new AlwaysSyncedSchemaSyncService(),
+                                        catalogService,
+                                        placementDriver,
+                                        clusterServices.get(assignment).topologyService()
+                                );
+
+                                replicaManagers.get(assignment).startReplica(
+                                        new TablePartitionId(cacheId, partId),
+                                        nullCompletedFuture(),
+                                        listener,
+                                        raftSvc,
+                                        storageIndexTracker
+                                );
+                            } catch (NodeStoppingException e) {
+                                fail("Unexpected node stopping", e);
+                            }
+                        }
+                );
+
+                partitionReadyFutures.add(partitionReadyFuture);
+            }
+
+            PeersAndLearners membersConf = PeersAndLearners.fromConsistentIds(partAssignments);
+
+            if (startClient) {
+                RaftGroupService service = RaftGroupServiceImpl
+                        .start(grpId, client, FACTORY, raftConfig, membersConf, true, executor, commandsMarshaller)
+                        .get(5, TimeUnit.SECONDS);
+
+                clients.put(p, service);
+            } else {
+                // Create temporary client to find a leader address.
+                ClusterService tmpSvc = cluster.get(0);
+
+                RaftGroupService service = RaftGroupServiceImpl
+                        .start(grpId, tmpSvc, FACTORY, raftConfig, membersConf, true, executor, commandsMarshaller)
+                        .get(5, TimeUnit.SECONDS);
+
+                Peer leader = service.leader();
+
+                service.shutdown();
+
+                ClusterService leaderSrv = cluster.stream()
+                        .filter(cluster -> cluster.topologyService().localMember().name().equals(leader.consistentId()))
+                        .findAny()
+                        .orElseThrow();
+
+                RaftGroupService leaderClusterSvc = RaftGroupServiceImpl
+                        .start(grpId, leaderSrv, FACTORY, raftConfig, membersConf, true, executor, commandsMarshaller)
+                        .get(5, TimeUnit.SECONDS);
+
+                clients.put(p, leaderClusterSvc);
+            }
+        }
+
+        allOf(partitionReadyFutures.toArray(new CompletableFuture[0])).join();
+
+        raftClients.computeIfAbsent(cacheName, t -> new ArrayList<>()).addAll(clients.values());
+
+        return new TableImpl(
+                new InternalTableImpl(
+                        cacheName,
+                        cacheId,
+                        clients,
+                        1,
+                        nodeResolver,
+                        clientTxManager,
+                        mock(MvTableStorage.class),
+                        mock(TxStateTableStorage.class),
+                        startClient ? clientReplicaSvc : replicaServices.get(localNodeName),
+                        startClient ? clientClock : clocks.get(localNodeName),
+                        timestampTracker,
+                        placementDriver
+                ),
+                new DummySchemaManagerImpl(schemaDescriptor),
+                clientTxManager.lockManager(),
+                new ConstantSchemaVersions(SCHEMA_VERSION)
+        );
+    }
+
     protected PartitionReplicaListener newReplicaListener(
             MvPartitionStorage mvDataStorage,
             RaftGroupService raftClient,
@@ -682,7 +912,7 @@ public class ItTxTestCluster {
         );
     }
 
-    private LogicalTopologyService logicalTopologyService(ClusterService clusterService) {
+    private static LogicalTopologyService logicalTopologyService(ClusterService clusterService) {
         return new LogicalTopologyService() {
             @Override
             public void addEventListener(LogicalTopologyEventListener listener) {
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
index def619b54b..81d3840dfe 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
@@ -101,4 +101,6 @@ public interface InternalTransaction extends Transaction {
     default CompletableFuture<Void> finish(boolean commit, HybridTimestamp executionTimestamp) {
         return commit ? commitAsync() : rollbackAsync();
     }
+
+    CompletableFuture<Void> safeCleanup(boolean commit);
 }
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
index ab1919a537..fb74783170 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.internal.tx;
 
+import org.apache.ignite.internal.replicator.exception.ExpectedReplicationException;
+
 /**
  * This exception is thrown when a lock cannot be acquired, released or downgraded.
  */
-public class LockException extends TransactionInternalCheckedException {
+public class LockException extends TransactionInternalCheckedException implements ExpectedReplicationException {
     /**
      * Creates a new instance of LockException with the given message.
      *
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index b1f3e2a246..45658891cc 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -117,14 +117,14 @@ public interface TxManager extends IgniteComponent {
      * @param timestampTracker Observable timestamp tracker is used to track a timestamp for either read-write or read-only
      *         transaction execution. The tracker is also used to determine the read timestamp for read-only transactions. Each client
      *         should pass its own tracker to provide linearizability between read-write and read-only transactions started by this client.
-     * @param commitPartition Partition to store a transaction state.
+     * @param commitPartition Partition to store a transaction state. Null to skip commit partition step - txn will be committed externally.
      * @param commit {@code true} if a commit requested.
      * @param enlistedGroups Enlisted partition groups with consistency token.
      * @param txId Transaction id.
      */
     CompletableFuture<Void> finish(
             HybridTimestampTracker timestampTracker,
-            TablePartitionId commitPartition,
+            @Nullable TablePartitionId commitPartition,
             boolean commit,
             Map<TablePartitionId, Long> enlistedGroups,
             UUID txId
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 3e0e6c4dbb..a87ae18ca8 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -119,4 +119,11 @@ class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl {
                         old -> new TxStateMeta(COMMITTED, old.txCoordinatorId(), old.commitPartitionId(), old.commitTimestamp())
                 ));
     }
+
+    @Override
+    public CompletableFuture<Void> safeCleanup(boolean commit) {
+        assert false;
+
+        return null;
+    }
 }
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index 3b021fd192..d6a8ce20d3 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -132,6 +132,17 @@ public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl {
     /** {@inheritDoc} */
     @Override
     protected CompletableFuture<Void> finish(boolean commit) {
+        return finishInternal(commit, false);
+    }
+
+    /**
+     * Internal method for finishing this transaction.
+     *
+     * @param commit {@code true} to commit, false to rollback.
+     * @param skipCommitPartition {@code true} to skip commit partition step (a txn will be committed externally).
+     * @return The future of transaction completion.
+     */
+    private CompletableFuture<Void> finishInternal(boolean commit, boolean skipCommitPartition) {
         if (hasTxFinalizationBegun()) {
             assert finishFuture != null : "Transaction is in final state but there is no finish future [id="
                     + id() + ", state=" + state() + "].";
@@ -145,29 +156,21 @@ public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl {
             if (!hasTxFinalizationBegun()) {
                 assert finishFuture == null : "Transaction is already finished [id=" + id() + ", state=" + state() + "].";
 
-                finishFuture = finishInternal(commit);
+                Map<TablePartitionId, Long> enlistedGroups = enlisted.entrySet().stream()
+                        .collect(Collectors.toMap(
+                                Entry::getKey,
+                                entry -> entry.getValue().get2()
+                        ));
+
+                finishFuture = txManager.finish(observableTsTracker, skipCommitPartition ? null : commitPart, commit, enlistedGroups, id());
             }
 
             return finishFuture;
         } finally {
             enlistPartitionLock.writeLock().unlock();
         }
-    }
 
-    /**
-     * Internal method for finishing this transaction.
-     *
-     * @param commit {@code true} to commit, false to rollback.
-     * @return The future of transaction completion.
-     */
-    private CompletableFuture<Void> finishInternal(boolean commit) {
-        Map<TablePartitionId, Long> enlistedGroups = enlisted.entrySet().stream()
-                .collect(Collectors.toMap(
-                        Entry::getKey,
-                        entry -> entry.getValue().get2()
-                ));
-
-        return txManager.finish(observableTsTracker, commitPart, commit, enlistedGroups, id());
+
     }
 
     /** {@inheritDoc} */
@@ -186,4 +189,9 @@ public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl {
     public HybridTimestamp startTimestamp() {
         return TransactionIds.beginTimestamp(id());
     }
+
+    @Override
+    public CompletableFuture<Void> safeCleanup(boolean commit) {
+        return finishInternal(commit, true);
+    }
 }
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index fcc14cb871..4cdabf7895 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -327,12 +327,12 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
     @Override
     public CompletableFuture<Void> finish(
             HybridTimestampTracker observableTimestampTracker,
-            TablePartitionId commitPartition,
+            @Nullable TablePartitionId commitPartition,
             boolean commit,
             Map<TablePartitionId, Long> enlistedGroups,
             UUID txId
     ) {
-        LOG.debug("Finish [commit={}, txId={}, groups={}].", commit, txId, enlistedGroups);
+        LOG.debug("Finish [commit={}, txId={}, groups={}, commitPartId={}].", commit, txId, enlistedGroups, commitPartition);
 
         assert enlistedGroups != null;
 
@@ -363,7 +363,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
             return tuple0;
         });
 
-        // Wait for commit acks first, then proceed with the finish request.
+        // Wait for write acks first, then proceed with the finish request.
         return tuple.performFinish(commit, ignored ->
                 prepareFinish(
                         observableTimestampTracker,
@@ -377,7 +377,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
 
     private CompletableFuture<Void> prepareFinish(
             HybridTimestampTracker observableTimestampTracker,
-            TablePartitionId commitPartition,
+            @Nullable TablePartitionId commitPartition,
             boolean commit,
             Map<TablePartitionId, Long> enlistedGroups,
             UUID txId,
@@ -394,6 +394,17 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
                         (unused, throwable) -> {
                             boolean verifiedCommit = throwable == null && commit;
 
+                            // A tx will be externally committed.
+                            if (commitPartition == null) {
+                                return cleanupOnly(
+                                        observableTimestampTracker,
+                                        commitTimestamp,
+                                        new HashSet<>(enlistedGroups.keySet()),
+                                        commit,
+                                        txId,
+                                        txFinishFuture);
+                            }
+
                             Collection<ReplicationGroupId> replicationGroupIds = new HashSet<>(enlistedGroups.keySet());
 
                             return durableFinish(
@@ -410,6 +421,86 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
                 .thenCompose(r -> verificationFuture);
     }
 
+    /**
+     * Cleanup tx state.
+     */
+    private CompletableFuture<Void> cleanupOnly(
+            HybridTimestampTracker observableTimestampTracker,
+            HybridTimestamp commitTimestamp,
+            Collection<TablePartitionId> replicationGroupIds,
+            boolean commit,
+            UUID txId,
+            CompletableFuture<TransactionMeta> txFinishFuture
+    ) {
+        return cleanup(replicationGroupIds, commit, commitTimestamp, txId)
+                .handle((res, ex) -> {
+                    if (ex != null) {
+                        Throwable cause = ExceptionUtils.unwrapCause(ex);
+
+                        if (cause instanceof TransactionException) {
+                            TransactionException transactionException = (TransactionException) cause;
+
+                            if (transactionException.code() == TX_WAS_ABORTED_ERR) {
+                                updateTxMeta(txId, old -> {
+                                    TxStateMeta txStateMeta = new TxStateMeta(ABORTED, old.txCoordinatorId(), null, null);
+
+                                    txFinishFuture.complete(txStateMeta);
+
+                                    return txStateMeta;
+                                });
+
+                                return CompletableFuture.<Void>failedFuture(cause);
+                            }
+                        }
+
+                        if (TransactionFailureHandler.isRecoverable(cause)) {
+                            LOG.warn("Failed to cleanup Tx. The operation will be retried [txId={}].", ex, txId);
+
+                            // Safe to retry, cleanup is idempotent.
+                            return cleanupOnly(
+                                    observableTimestampTracker,
+                                    commitTimestamp,
+                                    replicationGroupIds,
+                                    commit,
+                                    txId,
+                                    txFinishFuture
+                            );
+                        } else {
+                            LOG.warn("Failed to cleanup Tx [txId={}].", ex, txId);
+
+                            return CompletableFuture.<Void>failedFuture(cause);
+                        }
+                    } else {
+                        updateTxMeta(txId, old -> {
+                            if (isFinalState(old.txState())) {
+                                txFinishFuture.complete(old);
+
+                                return old;
+                            }
+
+                            assert old instanceof TxStateMetaFinishing;
+
+                            TxStateMeta finalTxStateMeta = coordinatorFinalTxStateMeta(
+                                    commit,
+                                    old.commitPartitionId(),
+                                    commitTimestamp
+                            );
+
+                            txFinishFuture.complete(finalTxStateMeta);
+
+                            return finalTxStateMeta;
+                        });
+
+                        if (commit) {
+                            observableTimestampTracker.update(commitTimestamp);
+                        }
+                    }
+
+                    return CompletableFutures.<Void>nullCompletedFuture();
+                })
+                .thenCompose(Function.identity());
+    }
+
     /**
      * Durable finish request.
      */
@@ -422,9 +513,11 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
             HybridTimestamp commitTimestamp,
             CompletableFuture<TransactionMeta> txFinishFuture
     ) {
+        assert commitPartition != null;
+
         return inBusyLockAsync(busyLock, () -> placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(commitPartition)
                 .thenCompose(meta ->
-                        makeFinishRequest(
+                        sendFinishRequest(
                                 observableTimestampTracker,
                                 commitPartition,
                                 meta.getLeaseholder(),
@@ -479,7 +572,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
                 .thenCompose(Function.identity()));
     }
 
-    private CompletableFuture<Void> makeFinishRequest(
+    private CompletableFuture<Void> sendFinishRequest(
             HybridTimestampTracker observableTimestampTracker,
             TablePartitionId commitPartition,
             String primaryConsistentId,
@@ -690,7 +783,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
      */
     private TxStateMeta coordinatorFinalTxStateMeta(
             boolean commit,
-            TablePartitionId commitPartitionId,
+            @Nullable TablePartitionId commitPartitionId,
             @Nullable HybridTimestamp commitTimestamp
     ) {
         return new TxStateMeta(commit ? COMMITTED : ABORTED, localNodeId, commitPartitionId, commitTimestamp);