You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/11/15 12:01:11 UTC

[ignite-3] branch main updated: IGNITE-15492 Check schema availability on local node. Fixes #344

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1957cf3  IGNITE-15492 Check schema availability on local node. Fixes #344
1957cf3 is described below

commit 1957cf3101b37750293ccf5b940e18b1099a38bf
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Mon Nov 15 15:00:48 2021 +0300

    IGNITE-15492 Check schema availability on local node. Fixes #344
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../query/calcite/StopCalciteModuleTest.java       |   9 +-
 .../ignite/client/fakes/FakeSchemaRegistry.java    |  13 +
 .../apache/ignite/internal/util/IgniteUtils.java   |   7 +-
 .../internal/testframework/IgniteTestUtils.java    |  63 +++++
 .../metastorage/watch/WatchAggregator.java         |  22 +-
 .../raft/jraft/rpc/impl/RaftGroupServiceImpl.java  |  43 ++--
 modules/runner/pom.xml                             |   6 +
 .../internal/runner/app/ItDataSchemaSyncTest.java  | 261 +++++++++++++++++++++
 .../internal/runner/app/ItTablesApiTest.java       | 122 +---------
 .../internal/test/WatchListenerInhibitor.java      | 149 ++++++++++++
 .../ignite/internal/schema/SchemaRegistry.java     |  16 ++
 .../schema/registry/SchemaRegistryImpl.java        |  68 +++++-
 .../schema/registry/SchemaRegistryImplTest.java    |  16 +-
 .../schema/registry/UpgradingRowAdapterTest.java   |   5 +-
 .../ignite/distributed/ItDistributedTableTest.java |  11 +-
 .../schema/marshaller/TupleMarshallerImpl.java     |  46 ++--
 .../internal/table/RecordBinaryViewImpl.java       |  13 +-
 .../internal/table/distributed/TableManager.java   | 157 ++++++++++++-
 .../TupleMarshallerFixlenOnlyBenchmark.java        |   4 +-
 .../TupleMarshallerVarlenOnlyBenchmark.java        |   3 +-
 .../ignite/internal/table/TableManagerTest.java    |   4 +-
 .../table/impl/DummySchemaManagerImpl.java         |  19 +-
 22 files changed, 849 insertions(+), 208 deletions(-)

diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/StopCalciteModuleTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/StopCalciteModuleTest.java
index bd37d73..22237c3 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/StopCalciteModuleTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/StopCalciteModuleTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite;
 
+import static org.apache.ignite.internal.schema.registry.SchemaRegistryImpl.INITIAL_SCHEMA_VERSION;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -114,11 +115,11 @@ public class StopCalciteModuleTest {
                 new Column[]{new Column("ID", NativeTypes.INT32, false)},
                 new Column[]{new Column("VAL", NativeTypes.INT32, false)}
         );
-
-        schemaReg = new SchemaRegistryImpl(0, (v) -> schemaDesc);
-
+    
+        schemaReg = new SchemaRegistryImpl(0, (v) -> schemaDesc, () -> INITIAL_SCHEMA_VERSION);
+    
         when(tbl.tableName()).thenReturn("PUBLIC.TEST");
-
+    
         // Mock create table (notify on register listener).
         doAnswer(invocation -> {
             EventListener<TableEventParameters> clo = (EventListener<TableEventParameters>) invocation.getArguments()[1];
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
index 52cbad5..6f2eb3c 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.client.fakes;
 
+import java.util.Collection;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -91,6 +93,11 @@ public class FakeSchemaRegistry implements SchemaRegistry {
     }
 
     /** {@inheritDoc} */
+    @Override public SchemaDescriptor waitLatestSchema() {
+        return schema();
+    }
+
+    /** {@inheritDoc} */
     @Override
     public int lastSchemaVersion() {
         return lastVer;
@@ -101,4 +108,10 @@ public class FakeSchemaRegistry implements SchemaRegistry {
     public Row resolve(BinaryRow row) {
         return new Row(schema(row.schemaVersion()), row);
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public Collection<Row> resolve(Collection<BinaryRow> rows) {
+        return rows.stream().map(this::resolve).collect(Collectors.toList());
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index e462585..53b84e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.LoggerMessageHelper;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -518,10 +519,10 @@ public class IgniteUtils {
      * @param msg Message to print with the stack.
      * @deprecated Calls to this method should never be committed to master.
      */
-    public static void dumpStack(IgniteLogger log, String msg) {
+    public static void dumpStack(IgniteLogger log, String msg, Object... params) {
         String reason = "Dumping stack.";
-        
-        var err = new Exception(msg);
+    
+        var err = new Exception(LoggerMessageHelper.format(msg, params));
     
         if (log != null) {
             log.error(reason, err);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 54ed33e..ad48b1c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -25,7 +25,10 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.util.BitSet;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.LoggerMessageHelper;
 import org.jetbrains.annotations.NotNull;
@@ -195,6 +198,66 @@ public final class IgniteTestUtils {
     }
 
     /**
+     * Runs runnable task asyncronously.
+     *
+     * @param task Runnable.
+     * @return Future with task result.
+     */
+    public static CompletableFuture runAsync(final Runnable task) {
+        return runAsync(task, "async-runnable-runner");
+    }
+
+    /**
+     * Runs runnable task asyncronously.
+     *
+     * @param task Runnable.
+     * @return Future with task result.
+     */
+    public static CompletableFuture runAsync(final Runnable task, String threadName) {
+        return runAsync(() -> {
+            task.run();
+
+            return null;
+        }, threadName);
+    }
+
+    /**
+     * Runs callable task asyncronously.
+     *
+     * @param task Callable.
+     * @return Future with task result.
+     */
+    public static <T> CompletableFuture<T> runAsync(final Callable<T> task) {
+        return runAsync(task, "async-callable-runner");
+    }
+
+    /**
+     * Runs callable task asyncronously.
+     *
+     * @param task Callable.
+     * @param threadName Thread name.
+     * @return Future with task result.
+     */
+    public static <T> CompletableFuture<T> runAsync(final Callable<T> task, String threadName) {
+        final NamedThreadFactory thrFactory = new NamedThreadFactory(threadName);
+
+        final CompletableFuture<T> fut = new CompletableFuture<T>();
+
+        thrFactory.newThread(() -> {
+            try {
+                // Execute task.
+                T res = task.call();
+
+                fut.complete(res);
+            } catch (Throwable e) {
+                fut.completeExceptionally(e);
+            }
+        }).start();
+
+        return fut;
+    }
+
+    /**
      * Waits for the condition.
      *
      * @param cond          Condition.
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/WatchAggregator.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/WatchAggregator.java
index 34ebd58..46706e7 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/WatchAggregator.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/WatchAggregator.java
@@ -166,15 +166,29 @@ public class WatchAggregator {
         final LinkedHashMap<Long, Watch> cpWatches = new LinkedHashMap<>(watches);
 
         return new WatchListener() {
-
+    
             @Override
             public boolean onUpdate(@NotNull WatchEvent evt) {
+                //TODO: IGNITE-15858 Fix stopWatch may solve the issue.
+                synchronized (watches) {
+                    processWatchEvents(evt);
+                }
+        
+                return true;
+            }
+
+            /**
+             * Process watch events synchronously.
+             *
+             * @param evt Watch event.
+             */
+            private void processWatchEvents(@NotNull WatchEvent evt) {
                 var watchIt = cpWatches.entrySet().iterator();
                 Collection<Long> toCancel = new ArrayList<>();
 
                 while (watchIt.hasNext()) {
-                    Map.Entry<Long, WatchAggregator.Watch> entry = watchIt.next();
-                    WatchAggregator.Watch watch = entry.getValue();
+                    Map.Entry<Long, Watch> entry = watchIt.next();
+                    Watch watch = entry.getValue();
                     var filteredEvts = new ArrayList<EntryEvent>();
 
                     for (EntryEvent entryEvt : evt.entryEvents()) {
@@ -207,8 +221,6 @@ public class WatchAggregator {
                 }
 
                 storeRevision.accept(entries, revision);
-
-                return true;
             }
 
             @Override
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index 0270ed1..10f719f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -17,6 +17,26 @@
 
 package org.apache.ignite.raft.jraft.rpc.impl;
 
+import static java.lang.System.currentTimeMillis;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.ThreadLocalRandom.current;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -44,26 +64,6 @@ import org.apache.ignite.raft.jraft.rpc.ActionResponse;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests;
 import org.jetbrains.annotations.NotNull;
 
-import static java.lang.System.currentTimeMillis;
-import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.ThreadLocalRandom.current;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
-
 /**
  * The implementation of {@link RaftGroupService}
  */
@@ -492,7 +492,8 @@ public class RaftGroupServiceImpl implements RaftGroupService {
 
         CompletableFuture<?> fut0 = cluster.messagingService().invoke(peer.address(), (NetworkMessage) req, networkTimeout);
 
-        fut0.whenComplete(new BiConsumer<Object, Throwable>() {
+        //TODO: IGNITE-15389 org.apache.ignite.internal.metastorage.client.CursorImpl has potential deadlock inside
+        fut0.whenCompleteAsync(new BiConsumer<Object, Throwable>() {
             @Override public void accept(Object resp, Throwable err) {
                 if (err != null) {
                     if (recoverable(err)) {
diff --git a/modules/runner/pom.xml b/modules/runner/pom.xml
index 5bcfea1..41eeeab 100644
--- a/modules/runner/pom.xml
+++ b/modules/runner/pom.xml
@@ -164,6 +164,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-metastorage-server</artifactId>
             <type>test-jar</type>
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
new file mode 100644
index 0000000..71bdb41
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.schema.SchemaBuilders;
+import org.apache.ignite.schema.definition.ColumnType;
+import org.apache.ignite.schema.definition.TableDefinition;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * There is a test of table schema synchronization.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItDataSchemaSyncTest extends IgniteAbstractTest {
+    /**
+     * Schema name.
+     */
+    public static final String SCHEMA = "PUBLIC";
+
+    /**
+     * Short table name.
+     */
+    public static final String SHORT_TABLE_NAME = "tbl1";
+
+    /**
+     * Table name.
+     */
+    public static final String TABLE_NAME = SCHEMA + "." + SHORT_TABLE_NAME;
+
+    /**
+     * Nodes bootstrap configuration.
+     */
+    private static final Map<String, String> nodesBootstrapCfg = new LinkedHashMap<>() {
+        {
+            put("node0", "{\n"
+                    + "  \"node\": {\n"
+                    + "    \"metastorageNodes\":[ \"node0\" ]\n"
+                    + "  },\n"
+                    + "  \"network\": {\n"
+                    + "    \"port\":3344,\n"
+                    + "    \"nodeFinder\": {\n"
+                    + "      \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+                    + "    }\n"
+                    + "  }\n"
+                    + "}");
+        
+            put("node1", "{\n"
+                    + "  \"node\": {\n"
+                    + "    \"metastorageNodes\":[ \"node0\" ]\n"
+                    + "  },\n"
+                    + "  \"network\": {\n"
+                    + "    \"port\":3345,\n"
+                    + "    \"nodeFinder\": {\n"
+                    + "      \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+                    + "    }\n"
+                    + "  }\n"
+                    + "}");
+        
+            put("node2", "{\n"
+                    + "  \"node\": {\n"
+                    + "    \"metastorageNodes\":[ \"node0\" ]\n"
+                    + "  },\n"
+                    + "  \"network\": {\n"
+                    + "    \"port\":3346,\n"
+                    + "    \"nodeFinder\": {\n"
+                    + "      \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+                    + "    }\n"
+                    + "  }\n"
+                    + "}");
+        }
+    };
+
+    /**
+     * Cluster nodes.
+     */
+    private final List<Ignite> clusterNodes = new ArrayList<>();
+
+    /**
+     * Starts a cluster before every test started.
+     */
+    @BeforeEach
+    void beforeEach() throws Exception {
+        nodesBootstrapCfg.forEach((nodeName, configStr) ->
+                clusterNodes.add(IgnitionManager.start(nodeName, configStr, workDir.resolve(nodeName)))
+        );
+    }
+
+    /**
+     * Stops a cluster after every test finished.
+     */
+    @AfterEach
+    void afterEach() throws Exception {
+        IgniteUtils.closeAll(Lists.reverse(clusterNodes));
+    }
+
+    /**
+     * The test executes various operation over the lagging node.
+     * The operations can be executed only the node overtakes a distributed cluster state.
+     */
+    @Test
+    public void test() throws Exception {
+        Ignite ignite0 = clusterNodes.get(0);
+        final IgniteImpl ignite1 = (IgniteImpl) clusterNodes.get(1);
+    
+        createTable(ignite0, SCHEMA, SHORT_TABLE_NAME);
+    
+        TableImpl table = (TableImpl) ignite0.tables().table(TABLE_NAME);
+    
+        assertEquals(1, table.schemaView().schema().version());
+    
+        for (int i = 0; i < 10; i++) {
+            table.recordView().insert(Tuple.create()
+                    .set("key", Long.valueOf(i))
+                    .set("valInt", i)
+                    .set("valStr", "str_" + i)
+            );
+        }
+
+        WatchListenerInhibitor listenerInhibitor = WatchListenerInhibitor.metastorageEventsInhibitor(ignite1);
+
+        listenerInhibitor.startInhibit();
+    
+        ignite0.tables().alterTable(TABLE_NAME,
+                tblChanger -> tblChanger.changeColumns(cols -> {
+                    int colIdx = tblChanger.columns().namedListKeys().stream()
+                            .mapToInt(Integer::parseInt).max().getAsInt() + 1;
+                
+                    cols.create(String.valueOf(colIdx),
+                            colChg -> convert(SchemaBuilders.column("valStr2", ColumnType.string())
+                                    .withDefaultValueExpression("default").build(), colChg));
+                })
+        );
+
+        for (Ignite node : clusterNodes) {
+            if (node == ignite1) {
+                continue;
+            }
+
+            TableImpl tableOnNode = (TableImpl) node.tables().table(TABLE_NAME);
+
+            IgniteTestUtils.waitForCondition(() -> tableOnNode.schemaView().lastSchemaVersion() == 2, 10_000);
+        }
+
+        TableImpl table1 = (TableImpl) ignite1.tables().table(TABLE_NAME);
+
+        for (int i = 10; i < 20; i++) {
+            table.recordView().insert(Tuple.create()
+                    .set("key", Long.valueOf(i))
+                    .set("valInt", i)
+                    .set("valStr", "str_" + i)
+                    .set("valStr2", "str2_" + i)
+            );
+        }
+
+        final CompletableFuture insertFut = IgniteTestUtils.runAsync(() ->
+                table1.recordView().insert(Tuple.create()
+                        .set("key", Long.valueOf(0))
+                        .set("valInt", 0)
+                        .set("valStr", "str_" + 0)
+                        .set("valStr2", "str2_" + 0)
+            ));
+    
+        final CompletableFuture getFut = IgniteTestUtils.runAsync(() -> {
+            table1.recordView().get(Tuple.create().set("key", Long.valueOf(10)));
+        });
+    
+        final CompletableFuture checkDefaultFut = IgniteTestUtils.runAsync(() -> {
+            assertEquals("default",
+                    table1.recordView().get(Tuple.create().set("key", Long.valueOf(0)))
+                            .value("valStr2"));
+        });
+
+        assertEquals(1, table1.schemaView().lastSchemaVersion());
+
+        assertFalse(getFut.isDone());
+        assertFalse(insertFut.isDone());
+        assertFalse(checkDefaultFut.isDone());
+
+        listenerInhibitor.stopInhibit();
+
+        getFut.get(10, TimeUnit.SECONDS);
+        insertFut.get(10, TimeUnit.SECONDS);
+        checkDefaultFut.get(10, TimeUnit.SECONDS);
+
+        for (Ignite node : clusterNodes) {
+            Table tableOnNode = node.tables().table(TABLE_NAME);
+
+            for (int i = 0; i < 20; i++) {
+                Tuple row = tableOnNode.recordView().get(Tuple.create().set("key", Long.valueOf(i)));
+
+                assertNotNull(row);
+
+                assertEquals(i, row.intValue("valInt"));
+                assertEquals("str_" + i, row.value("valStr"));
+                assertEquals(i < 10 ? "default" : ("str2_" + i), row.value("valStr2"));
+            }
+        }
+    }
+
+    /**
+     * Creates a table with the passed name on the specific schema.
+     *
+     * @param node           Cluster node.
+     * @param schemaName     Schema name.
+     * @param shortTableName Table name.
+     */
+    protected void createTable(Ignite node, String schemaName, String shortTableName) {
+        // Create table on node 0.
+        TableDefinition schTbl1 = SchemaBuilders.tableBuilder(schemaName, shortTableName).columns(
+                SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
+                SchemaBuilders.column("valInt", ColumnType.INT32).asNullable().build(),
+                SchemaBuilders.column("valStr", ColumnType.string()).withDefaultValueExpression("default").build()
+        ).withPrimaryKey("key").build();
+    
+        node.tables().createTable(
+                schTbl1.canonicalName(),
+                tblCh -> convert(schTbl1, tblCh).changeReplicas(2).changePartitions(10)
+        );
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index 516d5e7..56ba56f 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -21,14 +21,9 @@ import static org.apache.ignite.internal.schema.configuration.SchemaConfiguratio
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
@@ -37,14 +32,9 @@ import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
 import org.apache.ignite.internal.ItUtils;
-import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.client.WatchEvent;
-import org.apache.ignite.internal.metastorage.client.WatchListener;
-import org.apache.ignite.internal.metastorage.watch.AggregatedWatch;
-import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -57,8 +47,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
-import org.junit.platform.commons.util.ReflectionUtils;
-import org.mockito.Mockito;
 
 /**
  * Integration tests to check consistent of java API on different nodes.
@@ -154,7 +142,7 @@ public class ItTablesApiTest extends IgniteAbstractTest {
 
         Ignite ignite1 = clusterNodes.get(1);
 
-        WatchListenerInhibitor ignite1Inhibitor = metastorageEventsInhibitor(ignite1);
+        WatchListenerInhibitor ignite1Inhibitor = WatchListenerInhibitor.metastorageEventsInhibitor(ignite1);
 
         ignite1Inhibitor.startInhibit();
 
@@ -208,112 +196,6 @@ public class ItTablesApiTest extends IgniteAbstractTest {
     }
 
     /**
-     * Creates the specific listener which can inhibit events for real metastorage listener.
-     *
-     * @param ignite Ignite.
-     * @return Listener inhibitor.
-     * @throws Exception If something wrong when creating the listener inhibitor.
-     */
-    private WatchListenerInhibitor metastorageEventsInhibitor(Ignite ignite) throws Exception {
-        //TODO: IGNITE-15723 After a component factory will be implemented, need to got rid of reflection here.
-        MetaStorageManager metaMngr = (MetaStorageManager) ReflectionUtils.tryToReadFieldValue(
-                IgniteImpl.class,
-                "metaStorageMgr",
-                (IgniteImpl) ignite
-        ).get();
-
-        assertNotNull(metaMngr);
-
-        WatchAggregator aggregator = (WatchAggregator) ReflectionUtils.tryToReadFieldValue(
-                MetaStorageManager.class,
-                "watchAggregator",
-                metaMngr
-        ).get();
-
-        assertNotNull(aggregator);
-
-        WatchAggregator aggregatorSpy = Mockito.spy(aggregator);
-
-        WatchListenerInhibitor inhibitor = new WatchListenerInhibitor();
-
-        doAnswer(mock -> {
-            Optional<AggregatedWatch> op = (Optional<AggregatedWatch>) mock.callRealMethod();
-
-            assertTrue(op.isPresent());
-
-            inhibitor.setRealListener(op.get().listener());
-
-            return Optional.of(new AggregatedWatch(op.get().keyCriterion(), op.get().revision(), inhibitor));
-        }).when(aggregatorSpy).watch(anyLong(), any());
-
-        IgniteTestUtils.setFieldValue(metaMngr, "watchAggregator", aggregatorSpy);
-
-        // Redeploy metastorage watch. The Watch inhibitor will be used after.
-        metaMngr.unregisterWatch(-1);
-
-        return inhibitor;
-    }
-
-    /**
-     * Listener which wraps another one to inhibit events.
-     */
-    private static class WatchListenerInhibitor implements WatchListener {
-        /** Inhibited events. */
-        private final ArrayList<WatchEvent> inhibitEvents = new ArrayList<>();
-
-        /** Inhibit flag. */
-        private boolean inhibit = false;
-
-        /** Wrapped listener. */
-        private WatchListener realListener;
-
-        /**
-         * Sets a wrapped listener.
-         *
-         * @param realListener Listener to wrap.
-         */
-        public void setRealListener(WatchListener realListener) {
-            this.realListener = realListener;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public synchronized boolean onUpdate(WatchEvent evt) {
-            if (!inhibit) {
-                return realListener.onUpdate(evt);
-            }
-
-            return inhibitEvents.add(evt);
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public synchronized void onError(Throwable e) {
-            realListener.onError(e);
-        }
-
-        /**
-         * Starts inhibit events.
-         */
-        synchronized void startInhibit() {
-            inhibit = true;
-        }
-
-        /**
-         * Stops inhibit events.
-         */
-        synchronized void stopInhibit() {
-            inhibit = false;
-
-            for (WatchEvent evt : inhibitEvents) {
-                realListener.onUpdate(evt);
-            }
-
-            inhibitEvents.clear();
-        }
-    }
-
-    /**
      * Creates table.
      *
      * @param node           Cluster node.
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
new file mode 100644
index 0000000..ccc13fe
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+
+import java.util.ArrayList;
+import java.util.Optional;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.metastorage.watch.AggregatedWatch;
+import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.junit.platform.commons.util.ReflectionUtils;
+import org.mockito.Mockito;
+
+/**
+ * Listener which wraps another one to inhibit events.
+ */
+public class WatchListenerInhibitor implements WatchListener {
+    /** Inhibited events. */
+    private final ArrayList<WatchEvent> inhibitEvents = new ArrayList<>();
+
+    /** Inhibit flag. */
+    private boolean inhibit = false;
+
+    /** Wrapped listener. */
+    private WatchListener realListener;
+
+    /**
+     * Creates the specific listener which can inhibit events for real metastorage listener.
+     *
+     * @param ignite Ignite.
+     * @return Listener inhibitor.
+     * @throws Exception If something wrong when creating the listener inhibitor.
+     */
+    public static WatchListenerInhibitor metastorageEventsInhibitor(Ignite ignite)
+            throws Exception {
+        //TODO: IGNITE-15723 After a component factory will be implemented, need to got rid of reflection here.
+        MetaStorageManager metaMngr = (MetaStorageManager) ReflectionUtils.tryToReadFieldValue(
+                IgniteImpl.class,
+                "metaStorageMgr",
+                (IgniteImpl) ignite
+        ).get();
+    
+        assertNotNull(metaMngr);
+    
+        WatchAggregator aggregator = (WatchAggregator) ReflectionUtils.tryToReadFieldValue(
+                MetaStorageManager.class,
+                "watchAggregator",
+                metaMngr
+        ).get();
+
+        assertNotNull(aggregator);
+
+        WatchAggregator aggregatorSpy = Mockito.spy(aggregator);
+
+        WatchListenerInhibitor inhibitor = new WatchListenerInhibitor();
+    
+        doAnswer(mock -> {
+            Optional<AggregatedWatch> op = (Optional<AggregatedWatch>) mock.callRealMethod();
+        
+            assertTrue(op.isPresent());
+        
+            inhibitor.setRealListener(op.get().listener());
+        
+            return Optional.of(new AggregatedWatch(op.get().keyCriterion(), op.get().revision(),
+                    inhibitor));
+        }).when(aggregatorSpy).watch(anyLong(), any());
+
+        IgniteTestUtils.setFieldValue(metaMngr, "watchAggregator", aggregatorSpy);
+
+        // Redeploy metastorage watch. The Watch inhibitor will be used after.
+        metaMngr.unregisterWatch(-1);
+
+        return inhibitor;
+    }
+
+    /**
+     * Default constructor.
+     */
+    private WatchListenerInhibitor() {
+    }
+
+    /**
+     * Sets a wrapped listener.
+     *
+     * @param realListener Listener to wrap.
+     */
+    private void setRealListener(WatchListener realListener) {
+        this.realListener = realListener;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean onUpdate(WatchEvent evt) {
+        if (!inhibit) {
+            return realListener.onUpdate(evt);
+        }
+
+        return inhibitEvents.add(evt);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void onError(Throwable e) {
+        realListener.onError(e);
+    }
+
+    /**
+     * Starts inhibit events.
+     */
+    public synchronized void startInhibit() {
+        inhibit = true;
+    }
+
+    /**
+     * Stops inhibit events.
+     */
+    public synchronized void stopInhibit() {
+        inhibit = false;
+    
+        for (WatchEvent evt : inhibitEvents) {
+            realListener.onUpdate(evt);
+        }
+
+        inhibitEvents.clear();
+    }
+}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
index a56b439..d59e641 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.Collection;
 import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
 import org.apache.ignite.internal.schema.row.Row;
 import org.jetbrains.annotations.NotNull;
@@ -50,6 +51,13 @@ public interface SchemaRegistry {
      * @throws SchemaRegistryException If no schema found for given version.
      */
     @NotNull SchemaDescriptor schema(int ver) throws SchemaRegistryException;
+    
+    /**
+     * Gets schema descriptor for the latest version in cluster.
+     *
+     * @return Schema descriptor if initialized, {@code null} otherwise.
+     */
+    SchemaDescriptor waitLatestSchema();
 
     /**
      * Get last registereg schema version.
@@ -64,4 +72,12 @@ public interface SchemaRegistry {
      * @return Schema-aware row.
      */
     Row resolve(BinaryRow row);
+
+    /**
+     * Resolves a schema for batch operation.
+     *
+     * @param rows Binary rows.
+     * @return Schema-aware rows.
+     */
+    Collection<Row> resolve(Collection<BinaryRow> rows);
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
index bdeb9d2..55f4701 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -17,17 +17,21 @@
 
 package org.apache.ignite.internal.schema.registry;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.function.Function;
+import java.util.function.IntSupplier;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.mapping.ColumnMapper;
 import org.apache.ignite.internal.schema.mapping.ColumnMapping;
 import org.apache.ignite.internal.schema.row.Row;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -49,25 +53,30 @@ public class SchemaRegistryImpl implements SchemaRegistry {
     /** Schema store. */
     private final Function<Integer, SchemaDescriptor> history;
 
+    /** The method to provide the latest schema version on cluster. */
+    private final IntSupplier latestVersionStore;
+
     /**
      * Default constructor.
      *
      * @param history Schema history.
+     * @param latestVersionStore The method to provide the latest version of the schema.
      */
-    public SchemaRegistryImpl(Function<Integer, SchemaDescriptor> history) {
-        lastVer = INITIAL_SCHEMA_VERSION;
-        this.history = history;
+    public SchemaRegistryImpl(Function<Integer, SchemaDescriptor> history, IntSupplier latestVersionStore) {
+        this(INITIAL_SCHEMA_VERSION, history, latestVersionStore);
     }
 
     /**
      * Constructor.
      *
      * @param initialVer Initial version.
-     * @param history    Schema history.
+     * @param history Schema history.
+     * @param latestVersionStore The method to provide the latest version of the schema.
      */
-    public SchemaRegistryImpl(int initialVer, Function<Integer, SchemaDescriptor> history) {
+    public SchemaRegistryImpl(int initialVer, Function<Integer, SchemaDescriptor> history, IntSupplier latestVersionStore) {
         lastVer = initialVer;
         this.history = history;
+        this.latestVersionStore = latestVersionStore;
     }
 
     /** {@inheritDoc} */
@@ -107,6 +116,20 @@ public class SchemaRegistryImpl implements SchemaRegistry {
     }
 
     /** {@inheritDoc} */
+    @Override public SchemaDescriptor waitLatestSchema() {
+        int lastVer0 = latestVersionStore.getAsInt();
+    
+        if (lastVer0 == INITIAL_SCHEMA_VERSION) {
+            return schema();
+        }
+    
+        assert lastVer <= lastVer0 : "Cached schema is earlier than consensus [lastVer=" + lastVer
+                + ", consLastVer=" + lastVer0 + ']';
+
+        return schema(lastVer0);
+    }
+
+    /** {@inheritDoc} */
     @Override
     public int lastSchemaVersion() {
         return lastVer;
@@ -115,15 +138,38 @@ public class SchemaRegistryImpl implements SchemaRegistry {
     /** {@inheritDoc} */
     @Override
     public Row resolve(BinaryRow row) {
-        final SchemaDescriptor rowSchema = schema(row.schemaVersion());
-        final SchemaDescriptor curSchema = schema();
+        final SchemaDescriptor curSchema = waitLatestSchema();
+    
+        return resolveInternal(row, curSchema);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Collection<Row> resolve(Collection<BinaryRow> rows) {
+        final SchemaDescriptor curSchema = waitLatestSchema();
+    
+        return rows.stream().map(row -> resolveInternal(row, curSchema))
+                .collect(Collectors.toList());
+    }
 
+    /**
+     * Resolves a schema for row.
+     * The method is optimal when the latest schema is already gotten.
+     *
+     * @param row Binary row.
+     * @param curSchema The latest available local schema.
+     * @return Schema-aware rows.
+     */
+    @NotNull
+    private Row resolveInternal(BinaryRow row, SchemaDescriptor curSchema) {
+        final SchemaDescriptor rowSchema = schema(row.schemaVersion());
+    
         if (curSchema.version() == rowSchema.version()) {
             return new Row(rowSchema, row);
         }
-
+    
         ColumnMapper mapping = resolveMapping(curSchema, rowSchema);
-
+    
         return new UpgradingRowAdapter(curSchema, rowSchema, row, mapping);
     }
 
@@ -182,9 +228,9 @@ public class SchemaRegistryImpl implements SchemaRegistry {
             throw new SchemaRegistryException("Try to register schema of wrong version: ver=" + desc.version() + ", lastVer=" + lastVer);
         }
 
-        schemaCache.put(desc.version(), desc);
-
         lastVer = desc.version();
+
+        schemaCache.put(desc.version(), desc);
     }
 
     /**
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
index 06e6c2b..0754875 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
@@ -58,7 +58,7 @@ public class SchemaRegistryImplTest {
                 new Column[]{new Column("keyLongCol", INT64, true)},
                 new Column[]{new Column("valBytesCol", BYTES, true)});
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> INITIAL_SCHEMA_VERSION);
 
         assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
         assertNull(reg.schema());
@@ -110,7 +110,7 @@ public class SchemaRegistryImplTest {
                         new Column("valStringCol", STRING, true)
                 });
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> INITIAL_SCHEMA_VERSION);
 
         assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
         assertNull(reg.schema());
@@ -164,7 +164,7 @@ public class SchemaRegistryImplTest {
                         new Column("valStringCol", STRING, true)
                 });
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> INITIAL_SCHEMA_VERSION);
 
         assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
 
@@ -232,7 +232,7 @@ public class SchemaRegistryImplTest {
                         new Column("valStringCol", STRING, true)
                 });
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> INITIAL_SCHEMA_VERSION);
 
         assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
 
@@ -348,7 +348,7 @@ public class SchemaRegistryImplTest {
 
         Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV1, schemaV2);
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(2, history::get);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(2, history::get, () -> INITIAL_SCHEMA_VERSION);
 
         assertEquals(2, reg.lastSchemaVersion());
         assertSameSchema(schemaV2, reg.schema());
@@ -414,7 +414,7 @@ public class SchemaRegistryImplTest {
 
         Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2, schemaV3);
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(3, history::get);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(3, history::get, () -> INITIAL_SCHEMA_VERSION);
 
         assertEquals(3, reg.lastSchemaVersion());
         assertSameSchema(schemaV3, reg.schema());
@@ -480,7 +480,7 @@ public class SchemaRegistryImplTest {
 
         Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2, schemaV3, schemaV4);
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(4, history::get);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(4, history::get, () -> INITIAL_SCHEMA_VERSION);
 
         assertEquals(4, reg.lastSchemaVersion());
         assertSameSchema(schemaV4, reg.schema());
@@ -546,7 +546,7 @@ public class SchemaRegistryImplTest {
 
         schemaV4.columnMapping(createMapper(schemaV4).add(schemaV4.column("valBytesCol")));
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> INITIAL_SCHEMA_VERSION);
 
         final Map<Long, ColumnMapper> cache = reg.mappingCache();
 
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
index 1f99a21..4d54319 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
@@ -29,6 +29,7 @@ import static org.apache.ignite.internal.schema.NativeTypes.STRING;
 import static org.apache.ignite.internal.schema.NativeTypes.datetime;
 import static org.apache.ignite.internal.schema.NativeTypes.time;
 import static org.apache.ignite.internal.schema.NativeTypes.timestamp;
+import static org.apache.ignite.internal.schema.registry.SchemaRegistryImpl.INITIAL_SCHEMA_VERSION;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -150,12 +151,12 @@ public class UpgradingRowAdapterTest {
         ByteBufferRow row = new ByteBufferRow(serializeValuesToRow(schema, values));
 
         // Validate row.
-        validateRow(values, new SchemaRegistryImpl(1, v -> v == 1 ? schema : schema2), row);
+        validateRow(values, new SchemaRegistryImpl(1, v -> v == 1 ? schema : schema2, () -> INITIAL_SCHEMA_VERSION), row);
 
         // Validate upgraded row.
         values.add(addedColumnIndex, null);
 
-        validateRow(values, new SchemaRegistryImpl(2, v -> v == 1 ? schema : schema2), row);
+        validateRow(values, new SchemaRegistryImpl(2, v -> v == 1 ? schema : schema2, () -> INITIAL_SCHEMA_VERSION), row);
     }
 
     private void validateRow(List<Object> values, SchemaRegistryImpl schemaRegistry, ByteBufferRow binaryRow) {
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItDistributedTableTest.java
index f35534d..8f7f04b 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItDistributedTableTest.java
@@ -327,7 +327,12 @@ public class ItDistributedTableTest {
             public SchemaDescriptor schema(int ver) {
                 return SCHEMA;
             }
-
+            
+            @Override
+            public SchemaDescriptor waitLatestSchema() {
+                return schema();
+            }
+    
             @Override
             public int lastSchemaVersion() {
                 return SCHEMA.version();
@@ -337,6 +342,10 @@ public class ItDistributedTableTest {
             public Row resolve(BinaryRow row) {
                 return new Row(SCHEMA, row);
             }
+
+            @Override public Collection<Row> resolve(Collection<BinaryRow> rows) {
+                return rows.stream().map(this::resolve).collect(Collectors.toList());
+            }
         }, null);
 
         partitionedTableRecordView(tbl.recordView(), PARTS * 10);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshallerImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshallerImpl.java
index 67842e7..7cac642 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshallerImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshallerImpl.java
@@ -82,22 +82,28 @@ public class TupleMarshallerImpl implements TupleMarshaller {
             
             InternalTuple keyTuple0 = toInternalTuple(schema, tuple, true);
             InternalTuple valTuple0 = toInternalTuple(schema, tuple, false);
-            
+    
             while (valTuple0.knownColumns() + keyTuple0.knownColumns() != tuple.columnCount()) {
                 if (tbl.schemaMode() == SchemaManagementMode.STRICT) {
-                    throw new SchemaMismatchException("Value doesn't match schema.");
+                    SchemaDescriptor newSchema = schemaReg.waitLatestSchema();
+            
+                    if (newSchema.version() == schema.version()) {
+                        throw new SchemaMismatchException("Value doesn't match schema.");
+                    }
+            
+                    return marshal(tuple);
                 }
-                
+        
                 createColumns(extractColumnsType(tuple, extraColumnNames(tuple, schema)));
-                
+        
                 assert schemaReg.lastSchemaVersion() > schema.version();
-                
+        
                 schema = schemaReg.schema();
-                
+        
                 keyTuple0 = toInternalTuple(schema, tuple, true);
                 valTuple0 = toInternalTuple(schema, tuple, false);
             }
-            
+    
             return buildRow(schema, keyTuple0, valTuple0);
         } catch (Exception ex) {
             throw new TupleMarshallerException("Failed to marshal tuple.", ex);
@@ -112,26 +118,34 @@ public class TupleMarshallerImpl implements TupleMarshaller {
             
             InternalTuple keyTuple0 = toInternalTuple(schema, keyTuple, true);
             InternalTuple valTuple0 = toInternalTuple(schema, valTuple, false);
-            
+    
             while (true) {
                 if (keyTuple0.knownColumns() < keyTuple.columnCount()) {
-                    throw new SchemaMismatchException("Key tuple contains extra columns: " + extraColumnNames(keyTuple, true, schema));
+                    throw new SchemaMismatchException(
+                            "Key tuple contains extra columns: " + extraColumnNames(keyTuple, true,
+                                    schema));
                 }
-    
+        
                 if (valTuple == null || valTuple0.knownColumns() == valTuple.columnCount()) {
                     break; // Nothing to do.
                 }
-    
+        
                 if (tbl.schemaMode() == SchemaManagementMode.STRICT) {
-                    throw new SchemaMismatchException("Value doesn't match schema.");
+                    SchemaDescriptor newSchema = schemaReg.waitLatestSchema();
+            
+                    if (newSchema.version() == schema.version()) {
+                        throw new SchemaMismatchException("Value doesn't match schema.");
+                    }
+            
+                    return marshal(keyTuple, valTuple);
                 }
-                
+        
                 createColumns(extractColumnsType(valTuple, extraColumnNames(valTuple, false, schema)));
-                
+        
                 assert schemaReg.lastSchemaVersion() > schema.version();
-                
+        
                 schema = schemaReg.schema();
-                
+        
                 keyTuple0 = toInternalTuple(schema, keyTuple, true);
                 valTuple0 = toInternalTuple(schema, valTuple, false);
             }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index 667f8ef..e3d60d0 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
 
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
@@ -424,7 +425,15 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie
         if (rows == null) {
             return null;
         }
-        
-        return rows.stream().filter(Objects::nonNull).map(this::wrap).collect(Collectors.toSet());
+    
+        Collection<BinaryRow> nonEmptyRows = rows.stream().filter(Objects::nonNull)
+                .collect(Collectors.toList());
+    
+        if (nonEmptyRows.isEmpty()) {
+            return Collections.EMPTY_LIST;
+        }
+    
+        return schemaReg.resolve(nonEmptyRows).stream().map(TableRow::tuple)
+                .collect(Collectors.toList());
     }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 92b79c7..eceb762 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -32,6 +32,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.configuration.schema.ExtendedTableChange;
 import org.apache.ignite.internal.configuration.schema.ExtendedTableConfiguration;
 import org.apache.ignite.internal.configuration.schema.ExtendedTableView;
+import org.apache.ignite.internal.configuration.schema.SchemaConfiguration;
 import org.apache.ignite.internal.configuration.schema.SchemaView;
 import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
 import org.apache.ignite.internal.manager.EventListener;
@@ -60,6 +62,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.manager.Producer;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaException;
 import org.apache.ignite.internal.schema.SchemaUtils;
 import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
 import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
@@ -182,7 +185,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     @Override
     public void start() {
         tablesCfg.tables()
-                .listenElements(new ConfigurationNamedListListener<>() {
+                .listenElements(new ConfigurationNamedListListener<TableView>() {
                     @Override
                     public @NotNull CompletableFuture<?> onCreate(@NotNull ConfigurationNotificationEvent<TableView> ctx) {
                         if (!busyLock.enterBusy()) {
@@ -503,11 +506,31 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                     partitionMap.put(p, service);
                 }
-
-                InternalTableImpl internalTable = new InternalTableImpl(name, tblId, partitionMap, partitions, netAddrResolver,
-                        tableStorage);
-
-                var schemaRegistry = new SchemaRegistryImpl(v -> schemaDesc);
+    
+                InternalTableImpl internalTable = new InternalTableImpl(name, tblId, partitionMap,
+                        partitions, netAddrResolver, tableStorage);
+    
+                var schemaRegistry = new SchemaRegistryImpl(v -> {
+                    if (!busyLock.enterBusy()) {
+                        throw new IgniteException(new NodeStoppingException());
+                    }
+        
+                    try {
+                        return tableSchema(tblId, v);
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                }, () -> {
+                    if (!busyLock.enterBusy()) {
+                        throw new IgniteException(new NodeStoppingException());
+                    }
+        
+                    try {
+                        return latestSchemaVersion(tblId);
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                });
 
                 schemaRegistry.onSchemaRegistered(schemaDesc);
 
@@ -524,7 +547,76 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             } catch (Exception e) {
                 fireEvent(TableEvent.CREATE, new TableEventParameters(tblId, name), e);
             }
-        });
+        }).join();
+    }
+
+    /**
+     * Return table schema of certain version from history.
+     *
+     * @param tblId Table id.
+     * @param schemaVer Schema version.
+     * @return Schema descriptor.
+     */
+    private SchemaDescriptor tableSchema(IgniteUuid tblId, int schemaVer) {
+        try {
+            TableImpl table = tablesById.get(tblId);
+
+            assert table != null : "Table is undefined [tblId=" + tblId + ']';
+
+            ExtendedTableConfiguration tblCfg = ((ExtendedTableConfiguration) tablesCfg.tables().get(table.tableName()));
+
+            if (schemaVer <= table.schemaView().lastSchemaVersion()) {
+                return getSchemaDescriptorLocally(schemaVer, tblCfg);
+            }
+
+            CompletableFuture<SchemaDescriptor> fur = new CompletableFuture<>();
+
+            var clo = new EventListener<TableEventParameters>() {
+                @Override
+                public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) {
+                    if (tblId.equals(parameters.tableId()) && schemaVer <= parameters.table().schemaView().lastSchemaVersion()) {
+                        fur.complete(getSchemaDescriptorLocally(schemaVer, tblCfg));
+
+                        return true;
+                    }
+
+                    return false;
+                }
+
+                @Override public void remove(@NotNull Throwable exception) {
+                    fur.completeExceptionally(exception);
+                }
+            };
+
+            listen(TableEvent.ALTER, clo);
+
+            if (schemaVer <= table.schemaView().lastSchemaVersion()) {
+                fur.complete(getSchemaDescriptorLocally(schemaVer, tblCfg));
+            }
+
+            if (!isSchemaExists(tblId, schemaVer) && fur.complete(null)) {
+                removeListener(TableEvent.ALTER, clo);
+            }
+
+            return fur.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new SchemaException("Can't read schema from vault: ver=" + schemaVer, e);
+        }
+    }
+
+    /**
+     * Gets a schema descriptor from the local node configuration storage.
+     *
+     * @param schemaVer Schema version.
+     * @param tblCfg Table configuration.
+     * @return Schema descriptor.
+     */
+    @NotNull private SchemaDescriptor getSchemaDescriptorLocally(int schemaVer, ExtendedTableConfiguration tblCfg) {
+        SchemaConfiguration schemaCfg = tblCfg.schemas().get(String.valueOf(schemaVer));
+
+        assert schemaCfg != null;
+
+        return SchemaSerializerImpl.INSTANCE.deserialize(schemaCfg.schema().value());
     }
 
     /**
@@ -979,7 +1071,56 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     private List<String> tableNamesConfigured() {
         return ConfigurationUtil.directValue(tablesCfg.tables()).namedListKeys();
     }
-
+    
+    /**
+     * Checks that the schema is configured in the Metasorage consensus.
+     *
+     * @param tblId     Table id.
+     * @param schemaVer Schema version.
+     * @return True when the schema configured, false otherwise.
+     */
+    // TODO: IGNITE-15412 Configuration manager will be used to retrieve distributed values
+    private boolean isSchemaExists(IgniteUuid tblId, int schemaVer) {
+        return latestSchemaVersion(tblId) >= schemaVer;
+    }
+    
+    /**
+     * Gets the latest version of the table schema which available in Metastore.
+     *
+     * @param tblId Table id.
+     * @return The latest schema version.
+     */
+    private int latestSchemaVersion(IgniteUuid tblId) {
+        NamedListView<TableView> directTablesCfg = ((DirectConfigurationProperty<NamedListView<TableView>>) tablesCfg
+                .tables()).directValue();
+        
+        ExtendedTableView viewForId = null;
+        
+        // TODO: IGNITE-15721 Need to review this approach after the ticket would be fixed.
+        // Probably, it won't be required getting configuration of all tables from Metastor.
+        for (String name : directTablesCfg.namedListKeys()) {
+            ExtendedTableView tblView = (ExtendedTableView) directTablesCfg.get(name);
+            
+            if (tblView != null && tblId.equals(IgniteUuid.fromString(tblView.id()))) {
+                viewForId = tblView;
+                
+                break;
+            }
+        }
+        
+        int lastVer = INITIAL_SCHEMA_VERSION;
+        
+        for (String schemaVerAsStr : viewForId.schemas().namedListKeys()) {
+            int ver = Integer.parseInt(schemaVerAsStr);
+            
+            if (ver > lastVer) {
+                lastVer = ver;
+            }
+        }
+        
+        return lastVer;
+    }
+    
     /** {@inheritDoc} */
     @Override
     public Table table(String name) {
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
index b9dd1e7..a7cd3ec 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.benchmarks;
 
+import static org.apache.ignite.internal.schema.registry.SchemaRegistryImpl.INITIAL_SCHEMA_VERSION;
+
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
@@ -105,7 +107,7 @@ public class TupleMarshallerFixlenOnlyBenchmark {
                         .toArray(Column[]::new)
         );
 
-        marshaller = new TupleMarshallerImpl(null, null, new SchemaRegistryImpl(v -> null) {
+        marshaller = new TupleMarshallerImpl(null, null, new SchemaRegistryImpl(v -> null, () -> INITIAL_SCHEMA_VERSION) {
             @Override
             public SchemaDescriptor schema() {
                 return schema;
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
index ec00aaf..d641d9b 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.benchmarks;
 import static org.apache.ignite.internal.schema.NativeTypes.BYTES;
 import static org.apache.ignite.internal.schema.NativeTypes.INT64;
 import static org.apache.ignite.internal.schema.NativeTypes.STRING;
+import static org.apache.ignite.internal.schema.registry.SchemaRegistryImpl.INITIAL_SCHEMA_VERSION;
 
 import java.io.Serializable;
 import java.util.Random;
@@ -119,7 +120,7 @@ public class TupleMarshallerVarlenOnlyBenchmark {
                         .toArray(Column[]::new)
         );
 
-        marshaller = new TupleMarshallerImpl(null, null, new SchemaRegistryImpl(v -> null) {
+        marshaller = new TupleMarshallerImpl(null, null, new SchemaRegistryImpl(v -> null, () -> INITIAL_SCHEMA_VERSION) {
             @Override
             public SchemaDescriptor schema() {
                 return schema;
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index 0d4a80d..1548a57 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -440,11 +440,11 @@ public class TableManagerTest extends IgniteAbstractTest {
             if (!createTbl && !dropTbl) {
                 return CompletableFuture.completedFuture(null);
             }
-
+            
             if (phaser != null) {
                 phaser.arriveAndAwaitAdvance();
             }
-
+            
             return CompletableFuture.completedFuture(null);
         });
     
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
index 986bd80..ceff3d6 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table.impl;
 
+import java.util.Collection;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -46,16 +48,21 @@ public class DummySchemaManagerImpl implements SchemaRegistry {
     public SchemaDescriptor schema() {
         return schema;
     }
-
+    
     /** {@inheritDoc} */
     @Override
     public SchemaDescriptor schema(int ver) {
         assert ver >= 0;
-
+        
         assert schema.version() == ver;
-
+        
         return schema;
     }
+    
+    /** {@inheritDoc} */
+    @Override public SchemaDescriptor waitLatestSchema() {
+        return schema();
+    }
 
     /** {@inheritDoc} */
     @Override
@@ -70,4 +77,10 @@ public class DummySchemaManagerImpl implements SchemaRegistry {
 
         return new Row(schema, row);
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public Collection<Row> resolve(Collection<BinaryRow> rows) {
+        return rows.stream().map(this::resolve).collect(Collectors.toList());
+    }
 }