You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/11/15 12:01:11 UTC
[ignite-3] branch main updated: IGNITE-15492 Check schema availability on local node. Fixes #344
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1957cf3 IGNITE-15492 Check schema availability on local node. Fixes #344
1957cf3 is described below
commit 1957cf3101b37750293ccf5b940e18b1099a38bf
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Mon Nov 15 15:00:48 2021 +0300
IGNITE-15492 Check schema availability on local node. Fixes #344
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../query/calcite/StopCalciteModuleTest.java | 9 +-
.../ignite/client/fakes/FakeSchemaRegistry.java | 13 +
.../apache/ignite/internal/util/IgniteUtils.java | 7 +-
.../internal/testframework/IgniteTestUtils.java | 63 +++++
.../metastorage/watch/WatchAggregator.java | 22 +-
.../raft/jraft/rpc/impl/RaftGroupServiceImpl.java | 43 ++--
modules/runner/pom.xml | 6 +
.../internal/runner/app/ItDataSchemaSyncTest.java | 261 +++++++++++++++++++++
.../internal/runner/app/ItTablesApiTest.java | 122 +---------
.../internal/test/WatchListenerInhibitor.java | 149 ++++++++++++
.../ignite/internal/schema/SchemaRegistry.java | 16 ++
.../schema/registry/SchemaRegistryImpl.java | 68 +++++-
.../schema/registry/SchemaRegistryImplTest.java | 16 +-
.../schema/registry/UpgradingRowAdapterTest.java | 5 +-
.../ignite/distributed/ItDistributedTableTest.java | 11 +-
.../schema/marshaller/TupleMarshallerImpl.java | 46 ++--
.../internal/table/RecordBinaryViewImpl.java | 13 +-
.../internal/table/distributed/TableManager.java | 157 ++++++++++++-
.../TupleMarshallerFixlenOnlyBenchmark.java | 4 +-
.../TupleMarshallerVarlenOnlyBenchmark.java | 3 +-
.../ignite/internal/table/TableManagerTest.java | 4 +-
.../table/impl/DummySchemaManagerImpl.java | 19 +-
22 files changed, 849 insertions(+), 208 deletions(-)
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/StopCalciteModuleTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/StopCalciteModuleTest.java
index bd37d73..22237c3 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/StopCalciteModuleTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/StopCalciteModuleTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite;
+import static org.apache.ignite.internal.schema.registry.SchemaRegistryImpl.INITIAL_SCHEMA_VERSION;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -114,11 +115,11 @@ public class StopCalciteModuleTest {
new Column[]{new Column("ID", NativeTypes.INT32, false)},
new Column[]{new Column("VAL", NativeTypes.INT32, false)}
);
-
- schemaReg = new SchemaRegistryImpl(0, (v) -> schemaDesc);
-
+
+ schemaReg = new SchemaRegistryImpl(0, (v) -> schemaDesc, () -> INITIAL_SCHEMA_VERSION);
+
when(tbl.tableName()).thenReturn("PUBLIC.TEST");
-
+
// Mock create table (notify on register listener).
doAnswer(invocation -> {
EventListener<TableEventParameters> clo = (EventListener<TableEventParameters>) invocation.getArguments()[1];
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
index 52cbad5..6f2eb3c 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
@@ -17,9 +17,11 @@
package org.apache.ignite.client.fakes;
+import java.util.Collection;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -91,6 +93,11 @@ public class FakeSchemaRegistry implements SchemaRegistry {
}
/** {@inheritDoc} */
+ @Override public SchemaDescriptor waitLatestSchema() {
+ return schema();
+ }
+
+ /** {@inheritDoc} */
@Override
public int lastSchemaVersion() {
return lastVer;
@@ -101,4 +108,10 @@ public class FakeSchemaRegistry implements SchemaRegistry {
public Row resolve(BinaryRow row) {
return new Row(schema(row.schemaVersion()), row);
}
+
+ /** {@inheritDoc} */
+ @Override
+ public Collection<Row> resolve(Collection<BinaryRow> rows) {
+ return rows.stream().map(this::resolve).collect(Collectors.toList());
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index e462585..53b84e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.LoggerMessageHelper;
import org.jetbrains.annotations.Nullable;
/**
@@ -518,10 +519,10 @@ public class IgniteUtils {
* @param msg Message to print with the stack.
* @deprecated Calls to this method should never be committed to master.
*/
- public static void dumpStack(IgniteLogger log, String msg) {
+ public static void dumpStack(IgniteLogger log, String msg, Object... params) {
String reason = "Dumping stack.";
-
- var err = new Exception(msg);
+
+ var err = new Exception(LoggerMessageHelper.format(msg, params));
if (log != null) {
log.error(reason, err);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 54ed33e..ad48b1c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -25,7 +25,10 @@ import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.BitSet;
import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.LoggerMessageHelper;
import org.jetbrains.annotations.NotNull;
@@ -195,6 +198,66 @@ public final class IgniteTestUtils {
}
/**
+ * Runs runnable task asyncronously.
+ *
+ * @param task Runnable.
+ * @return Future with task result.
+ */
+ public static CompletableFuture runAsync(final Runnable task) {
+ return runAsync(task, "async-runnable-runner");
+ }
+
+ /**
+ * Runs runnable task asyncronously.
+ *
+ * @param task Runnable.
+ * @return Future with task result.
+ */
+ public static CompletableFuture runAsync(final Runnable task, String threadName) {
+ return runAsync(() -> {
+ task.run();
+
+ return null;
+ }, threadName);
+ }
+
+ /**
+ * Runs callable task asyncronously.
+ *
+ * @param task Callable.
+ * @return Future with task result.
+ */
+ public static <T> CompletableFuture<T> runAsync(final Callable<T> task) {
+ return runAsync(task, "async-callable-runner");
+ }
+
+ /**
+ * Runs callable task asyncronously.
+ *
+ * @param task Callable.
+ * @param threadName Thread name.
+ * @return Future with task result.
+ */
+ public static <T> CompletableFuture<T> runAsync(final Callable<T> task, String threadName) {
+ final NamedThreadFactory thrFactory = new NamedThreadFactory(threadName);
+
+ final CompletableFuture<T> fut = new CompletableFuture<T>();
+
+ thrFactory.newThread(() -> {
+ try {
+ // Execute task.
+ T res = task.call();
+
+ fut.complete(res);
+ } catch (Throwable e) {
+ fut.completeExceptionally(e);
+ }
+ }).start();
+
+ return fut;
+ }
+
+ /**
* Waits for the condition.
*
* @param cond Condition.
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/WatchAggregator.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/WatchAggregator.java
index 34ebd58..46706e7 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/WatchAggregator.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/WatchAggregator.java
@@ -166,15 +166,29 @@ public class WatchAggregator {
final LinkedHashMap<Long, Watch> cpWatches = new LinkedHashMap<>(watches);
return new WatchListener() {
-
+
@Override
public boolean onUpdate(@NotNull WatchEvent evt) {
+ //TODO: IGNITE-15858 Fix stopWatch may solve the issue.
+ synchronized (watches) {
+ processWatchEvents(evt);
+ }
+
+ return true;
+ }
+
+ /**
+ * Process watch events synchronously.
+ *
+ * @param evt Watch event.
+ */
+ private void processWatchEvents(@NotNull WatchEvent evt) {
var watchIt = cpWatches.entrySet().iterator();
Collection<Long> toCancel = new ArrayList<>();
while (watchIt.hasNext()) {
- Map.Entry<Long, WatchAggregator.Watch> entry = watchIt.next();
- WatchAggregator.Watch watch = entry.getValue();
+ Map.Entry<Long, Watch> entry = watchIt.next();
+ Watch watch = entry.getValue();
var filteredEvts = new ArrayList<EntryEvent>();
for (EntryEvent entryEvt : evt.entryEvents()) {
@@ -207,8 +221,6 @@ public class WatchAggregator {
}
storeRevision.accept(entries, revision);
-
- return true;
}
@Override
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index 0270ed1..10f719f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -17,6 +17,26 @@
package org.apache.ignite.raft.jraft.rpc.impl;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.ThreadLocalRandom.current;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -44,26 +64,6 @@ import org.apache.ignite.raft.jraft.rpc.ActionResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.jetbrains.annotations.NotNull;
-import static java.lang.System.currentTimeMillis;
-import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.ThreadLocalRandom.current;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
-
/**
* The implementation of {@link RaftGroupService}
*/
@@ -492,7 +492,8 @@ public class RaftGroupServiceImpl implements RaftGroupService {
CompletableFuture<?> fut0 = cluster.messagingService().invoke(peer.address(), (NetworkMessage) req, networkTimeout);
- fut0.whenComplete(new BiConsumer<Object, Throwable>() {
+ //TODO: IGNITE-15389 org.apache.ignite.internal.metastorage.client.CursorImpl has potential deadlock inside
+ fut0.whenCompleteAsync(new BiConsumer<Object, Throwable>() {
@Override public void accept(Object resp, Throwable err) {
if (err != null) {
if (recoverable(err)) {
diff --git a/modules/runner/pom.xml b/modules/runner/pom.xml
index 5bcfea1..41eeeab 100644
--- a/modules/runner/pom.xml
+++ b/modules/runner/pom.xml
@@ -164,6 +164,12 @@
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-metastorage-server</artifactId>
<type>test-jar</type>
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
new file mode 100644
index 0000000..71bdb41
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.schema.SchemaBuilders;
+import org.apache.ignite.schema.definition.ColumnType;
+import org.apache.ignite.schema.definition.TableDefinition;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * There is a test of table schema synchronization.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItDataSchemaSyncTest extends IgniteAbstractTest {
+ /**
+ * Schema name.
+ */
+ public static final String SCHEMA = "PUBLIC";
+
+ /**
+ * Short table name.
+ */
+ public static final String SHORT_TABLE_NAME = "tbl1";
+
+ /**
+ * Table name.
+ */
+ public static final String TABLE_NAME = SCHEMA + "." + SHORT_TABLE_NAME;
+
+ /**
+ * Nodes bootstrap configuration.
+ */
+ private static final Map<String, String> nodesBootstrapCfg = new LinkedHashMap<>() {
+ {
+ put("node0", "{\n"
+ + " \"node\": {\n"
+ + " \"metastorageNodes\":[ \"node0\" ]\n"
+ + " },\n"
+ + " \"network\": {\n"
+ + " \"port\":3344,\n"
+ + " \"nodeFinder\": {\n"
+ + " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+ + " }\n"
+ + " }\n"
+ + "}");
+
+ put("node1", "{\n"
+ + " \"node\": {\n"
+ + " \"metastorageNodes\":[ \"node0\" ]\n"
+ + " },\n"
+ + " \"network\": {\n"
+ + " \"port\":3345,\n"
+ + " \"nodeFinder\": {\n"
+ + " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+ + " }\n"
+ + " }\n"
+ + "}");
+
+ put("node2", "{\n"
+ + " \"node\": {\n"
+ + " \"metastorageNodes\":[ \"node0\" ]\n"
+ + " },\n"
+ + " \"network\": {\n"
+ + " \"port\":3346,\n"
+ + " \"nodeFinder\": {\n"
+ + " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+ + " }\n"
+ + " }\n"
+ + "}");
+ }
+ };
+
+ /**
+ * Cluster nodes.
+ */
+ private final List<Ignite> clusterNodes = new ArrayList<>();
+
+ /**
+ * Starts a cluster before every test started.
+ */
+ @BeforeEach
+ void beforeEach() throws Exception {
+ nodesBootstrapCfg.forEach((nodeName, configStr) ->
+ clusterNodes.add(IgnitionManager.start(nodeName, configStr, workDir.resolve(nodeName)))
+ );
+ }
+
+ /**
+ * Stops a cluster after every test finished.
+ */
+ @AfterEach
+ void afterEach() throws Exception {
+ IgniteUtils.closeAll(Lists.reverse(clusterNodes));
+ }
+
+ /**
+ * The test executes various operation over the lagging node.
+ * The operations can be executed only the node overtakes a distributed cluster state.
+ */
+ @Test
+ public void test() throws Exception {
+ Ignite ignite0 = clusterNodes.get(0);
+ final IgniteImpl ignite1 = (IgniteImpl) clusterNodes.get(1);
+
+ createTable(ignite0, SCHEMA, SHORT_TABLE_NAME);
+
+ TableImpl table = (TableImpl) ignite0.tables().table(TABLE_NAME);
+
+ assertEquals(1, table.schemaView().schema().version());
+
+ for (int i = 0; i < 10; i++) {
+ table.recordView().insert(Tuple.create()
+ .set("key", Long.valueOf(i))
+ .set("valInt", i)
+ .set("valStr", "str_" + i)
+ );
+ }
+
+ WatchListenerInhibitor listenerInhibitor = WatchListenerInhibitor.metastorageEventsInhibitor(ignite1);
+
+ listenerInhibitor.startInhibit();
+
+ ignite0.tables().alterTable(TABLE_NAME,
+ tblChanger -> tblChanger.changeColumns(cols -> {
+ int colIdx = tblChanger.columns().namedListKeys().stream()
+ .mapToInt(Integer::parseInt).max().getAsInt() + 1;
+
+ cols.create(String.valueOf(colIdx),
+ colChg -> convert(SchemaBuilders.column("valStr2", ColumnType.string())
+ .withDefaultValueExpression("default").build(), colChg));
+ })
+ );
+
+ for (Ignite node : clusterNodes) {
+ if (node == ignite1) {
+ continue;
+ }
+
+ TableImpl tableOnNode = (TableImpl) node.tables().table(TABLE_NAME);
+
+ IgniteTestUtils.waitForCondition(() -> tableOnNode.schemaView().lastSchemaVersion() == 2, 10_000);
+ }
+
+ TableImpl table1 = (TableImpl) ignite1.tables().table(TABLE_NAME);
+
+ for (int i = 10; i < 20; i++) {
+ table.recordView().insert(Tuple.create()
+ .set("key", Long.valueOf(i))
+ .set("valInt", i)
+ .set("valStr", "str_" + i)
+ .set("valStr2", "str2_" + i)
+ );
+ }
+
+ final CompletableFuture insertFut = IgniteTestUtils.runAsync(() ->
+ table1.recordView().insert(Tuple.create()
+ .set("key", Long.valueOf(0))
+ .set("valInt", 0)
+ .set("valStr", "str_" + 0)
+ .set("valStr2", "str2_" + 0)
+ ));
+
+ final CompletableFuture getFut = IgniteTestUtils.runAsync(() -> {
+ table1.recordView().get(Tuple.create().set("key", Long.valueOf(10)));
+ });
+
+ final CompletableFuture checkDefaultFut = IgniteTestUtils.runAsync(() -> {
+ assertEquals("default",
+ table1.recordView().get(Tuple.create().set("key", Long.valueOf(0)))
+ .value("valStr2"));
+ });
+
+ assertEquals(1, table1.schemaView().lastSchemaVersion());
+
+ assertFalse(getFut.isDone());
+ assertFalse(insertFut.isDone());
+ assertFalse(checkDefaultFut.isDone());
+
+ listenerInhibitor.stopInhibit();
+
+ getFut.get(10, TimeUnit.SECONDS);
+ insertFut.get(10, TimeUnit.SECONDS);
+ checkDefaultFut.get(10, TimeUnit.SECONDS);
+
+ for (Ignite node : clusterNodes) {
+ Table tableOnNode = node.tables().table(TABLE_NAME);
+
+ for (int i = 0; i < 20; i++) {
+ Tuple row = tableOnNode.recordView().get(Tuple.create().set("key", Long.valueOf(i)));
+
+ assertNotNull(row);
+
+ assertEquals(i, row.intValue("valInt"));
+ assertEquals("str_" + i, row.value("valStr"));
+ assertEquals(i < 10 ? "default" : ("str2_" + i), row.value("valStr2"));
+ }
+ }
+ }
+
+ /**
+ * Creates a table with the passed name on the specific schema.
+ *
+ * @param node Cluster node.
+ * @param schemaName Schema name.
+ * @param shortTableName Table name.
+ */
+ protected void createTable(Ignite node, String schemaName, String shortTableName) {
+ // Create table on node 0.
+ TableDefinition schTbl1 = SchemaBuilders.tableBuilder(schemaName, shortTableName).columns(
+ SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
+ SchemaBuilders.column("valInt", ColumnType.INT32).asNullable().build(),
+ SchemaBuilders.column("valStr", ColumnType.string()).withDefaultValueExpression("default").build()
+ ).withPrimaryKey("key").build();
+
+ node.tables().createTable(
+ schTbl1.canonicalName(),
+ tblCh -> convert(schTbl1, tblCh).changeReplicas(2).changePartitions(10)
+ );
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index 516d5e7..56ba56f 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -21,14 +21,9 @@ import static org.apache.ignite.internal.schema.configuration.SchemaConfiguratio
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -37,14 +32,9 @@ import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.internal.ItUtils;
-import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.client.WatchEvent;
-import org.apache.ignite.internal.metastorage.client.WatchListener;
-import org.apache.ignite.internal.metastorage.watch.AggregatedWatch;
-import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -57,8 +47,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
-import org.junit.platform.commons.util.ReflectionUtils;
-import org.mockito.Mockito;
/**
* Integration tests to check consistent of java API on different nodes.
@@ -154,7 +142,7 @@ public class ItTablesApiTest extends IgniteAbstractTest {
Ignite ignite1 = clusterNodes.get(1);
- WatchListenerInhibitor ignite1Inhibitor = metastorageEventsInhibitor(ignite1);
+ WatchListenerInhibitor ignite1Inhibitor = WatchListenerInhibitor.metastorageEventsInhibitor(ignite1);
ignite1Inhibitor.startInhibit();
@@ -208,112 +196,6 @@ public class ItTablesApiTest extends IgniteAbstractTest {
}
/**
- * Creates the specific listener which can inhibit events for real metastorage listener.
- *
- * @param ignite Ignite.
- * @return Listener inhibitor.
- * @throws Exception If something wrong when creating the listener inhibitor.
- */
- private WatchListenerInhibitor metastorageEventsInhibitor(Ignite ignite) throws Exception {
- //TODO: IGNITE-15723 After a component factory will be implemented, need to got rid of reflection here.
- MetaStorageManager metaMngr = (MetaStorageManager) ReflectionUtils.tryToReadFieldValue(
- IgniteImpl.class,
- "metaStorageMgr",
- (IgniteImpl) ignite
- ).get();
-
- assertNotNull(metaMngr);
-
- WatchAggregator aggregator = (WatchAggregator) ReflectionUtils.tryToReadFieldValue(
- MetaStorageManager.class,
- "watchAggregator",
- metaMngr
- ).get();
-
- assertNotNull(aggregator);
-
- WatchAggregator aggregatorSpy = Mockito.spy(aggregator);
-
- WatchListenerInhibitor inhibitor = new WatchListenerInhibitor();
-
- doAnswer(mock -> {
- Optional<AggregatedWatch> op = (Optional<AggregatedWatch>) mock.callRealMethod();
-
- assertTrue(op.isPresent());
-
- inhibitor.setRealListener(op.get().listener());
-
- return Optional.of(new AggregatedWatch(op.get().keyCriterion(), op.get().revision(), inhibitor));
- }).when(aggregatorSpy).watch(anyLong(), any());
-
- IgniteTestUtils.setFieldValue(metaMngr, "watchAggregator", aggregatorSpy);
-
- // Redeploy metastorage watch. The Watch inhibitor will be used after.
- metaMngr.unregisterWatch(-1);
-
- return inhibitor;
- }
-
- /**
- * Listener which wraps another one to inhibit events.
- */
- private static class WatchListenerInhibitor implements WatchListener {
- /** Inhibited events. */
- private final ArrayList<WatchEvent> inhibitEvents = new ArrayList<>();
-
- /** Inhibit flag. */
- private boolean inhibit = false;
-
- /** Wrapped listener. */
- private WatchListener realListener;
-
- /**
- * Sets a wrapped listener.
- *
- * @param realListener Listener to wrap.
- */
- public void setRealListener(WatchListener realListener) {
- this.realListener = realListener;
- }
-
- /** {@inheritDoc} */
- @Override
- public synchronized boolean onUpdate(WatchEvent evt) {
- if (!inhibit) {
- return realListener.onUpdate(evt);
- }
-
- return inhibitEvents.add(evt);
- }
-
- /** {@inheritDoc} */
- @Override
- public synchronized void onError(Throwable e) {
- realListener.onError(e);
- }
-
- /**
- * Starts inhibit events.
- */
- synchronized void startInhibit() {
- inhibit = true;
- }
-
- /**
- * Stops inhibit events.
- */
- synchronized void stopInhibit() {
- inhibit = false;
-
- for (WatchEvent evt : inhibitEvents) {
- realListener.onUpdate(evt);
- }
-
- inhibitEvents.clear();
- }
- }
-
- /**
* Creates table.
*
* @param node Cluster node.
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
new file mode 100644
index 0000000..ccc13fe
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+
+import java.util.ArrayList;
+import java.util.Optional;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.metastorage.watch.AggregatedWatch;
+import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.junit.platform.commons.util.ReflectionUtils;
+import org.mockito.Mockito;
+
+/**
+ * Listener which wraps another one to inhibit events.
+ */
+public class WatchListenerInhibitor implements WatchListener {
+ /** Inhibited events. */
+ private final ArrayList<WatchEvent> inhibitEvents = new ArrayList<>();
+
+ /** Inhibit flag. */
+ private boolean inhibit = false;
+
+ /** Wrapped listener. */
+ private WatchListener realListener;
+
+ /**
+ * Creates the specific listener which can inhibit events for real metastorage listener.
+ *
+ * @param ignite Ignite.
+ * @return Listener inhibitor.
+ * @throws Exception If something wrong when creating the listener inhibitor.
+ */
+ public static WatchListenerInhibitor metastorageEventsInhibitor(Ignite ignite)
+ throws Exception {
+ //TODO: IGNITE-15723 After a component factory will be implemented, need to got rid of reflection here.
+ MetaStorageManager metaMngr = (MetaStorageManager) ReflectionUtils.tryToReadFieldValue(
+ IgniteImpl.class,
+ "metaStorageMgr",
+ (IgniteImpl) ignite
+ ).get();
+
+ assertNotNull(metaMngr);
+
+ WatchAggregator aggregator = (WatchAggregator) ReflectionUtils.tryToReadFieldValue(
+ MetaStorageManager.class,
+ "watchAggregator",
+ metaMngr
+ ).get();
+
+ assertNotNull(aggregator);
+
+ WatchAggregator aggregatorSpy = Mockito.spy(aggregator);
+
+ WatchListenerInhibitor inhibitor = new WatchListenerInhibitor();
+
+ doAnswer(mock -> {
+ Optional<AggregatedWatch> op = (Optional<AggregatedWatch>) mock.callRealMethod();
+
+ assertTrue(op.isPresent());
+
+ inhibitor.setRealListener(op.get().listener());
+
+ return Optional.of(new AggregatedWatch(op.get().keyCriterion(), op.get().revision(),
+ inhibitor));
+ }).when(aggregatorSpy).watch(anyLong(), any());
+
+ IgniteTestUtils.setFieldValue(metaMngr, "watchAggregator", aggregatorSpy);
+
+ // Redeploy metastorage watch. The Watch inhibitor will be used after.
+ metaMngr.unregisterWatch(-1);
+
+ return inhibitor;
+ }
+
+ /**
+ * Default constructor.
+ */
+ private WatchListenerInhibitor() {
+ }
+
+ /**
+ * Sets a wrapped listener.
+ *
+ * @param realListener Listener to wrap.
+ */
+ private void setRealListener(WatchListener realListener) {
+ this.realListener = realListener;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized boolean onUpdate(WatchEvent evt) {
+ if (!inhibit) {
+ return realListener.onUpdate(evt);
+ }
+
+ return inhibitEvents.add(evt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void onError(Throwable e) {
+ realListener.onError(e);
+ }
+
+ /**
+ * Starts inhibit events.
+ */
+ public synchronized void startInhibit() {
+ inhibit = true;
+ }
+
+ /**
+ * Stops inhibit events.
+ */
+ public synchronized void stopInhibit() {
+ inhibit = false;
+
+ for (WatchEvent evt : inhibitEvents) {
+ realListener.onUpdate(evt);
+ }
+
+ inhibitEvents.clear();
+ }
+}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
index a56b439..d59e641 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.schema;
+import java.util.Collection;
import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
import org.apache.ignite.internal.schema.row.Row;
import org.jetbrains.annotations.NotNull;
@@ -50,6 +51,13 @@ public interface SchemaRegistry {
* @throws SchemaRegistryException If no schema found for given version.
*/
@NotNull SchemaDescriptor schema(int ver) throws SchemaRegistryException;
+
+ /**
+ * Gets schema descriptor for the latest version in cluster.
+ *
+ * @return Schema descriptor if initialized, {@code null} otherwise.
+ */
+ SchemaDescriptor waitLatestSchema();
/**
* Get last registereg schema version.
@@ -64,4 +72,12 @@ public interface SchemaRegistry {
* @return Schema-aware row.
*/
Row resolve(BinaryRow row);
+
+ /**
+ * Resolves a schema for batch operation.
+ *
+ * @param rows Binary rows.
+ * @return Schema-aware rows.
+ */
+ Collection<Row> resolve(Collection<BinaryRow> rows);
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
index bdeb9d2..55f4701 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -17,17 +17,21 @@
package org.apache.ignite.internal.schema.registry;
+import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Function;
+import java.util.function.IntSupplier;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.mapping.ColumnMapper;
import org.apache.ignite.internal.schema.mapping.ColumnMapping;
import org.apache.ignite.internal.schema.row.Row;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -49,25 +53,30 @@ public class SchemaRegistryImpl implements SchemaRegistry {
/** Schema store. */
private final Function<Integer, SchemaDescriptor> history;
+ /** The method to provide the latest schema version on cluster. */
+ private final IntSupplier latestVersionStore;
+
/**
* Default constructor.
*
* @param history Schema history.
+ * @param latestVersionStore The method to provide the latest version of the schema.
*/
- public SchemaRegistryImpl(Function<Integer, SchemaDescriptor> history) {
- lastVer = INITIAL_SCHEMA_VERSION;
- this.history = history;
+ public SchemaRegistryImpl(Function<Integer, SchemaDescriptor> history, IntSupplier latestVersionStore) {
+ this(INITIAL_SCHEMA_VERSION, history, latestVersionStore);
}
/**
* Constructor.
*
* @param initialVer Initial version.
- * @param history Schema history.
+ * @param history Schema history.
+ * @param latestVersionStore The method to provide the latest version of the schema.
*/
- public SchemaRegistryImpl(int initialVer, Function<Integer, SchemaDescriptor> history) {
+ public SchemaRegistryImpl(int initialVer, Function<Integer, SchemaDescriptor> history, IntSupplier latestVersionStore) {
lastVer = initialVer;
this.history = history;
+ this.latestVersionStore = latestVersionStore;
}
/** {@inheritDoc} */
@@ -107,6 +116,20 @@ public class SchemaRegistryImpl implements SchemaRegistry {
}
/** {@inheritDoc} */
+ @Override public SchemaDescriptor waitLatestSchema() {
+ int lastVer0 = latestVersionStore.getAsInt();
+
+ if (lastVer0 == INITIAL_SCHEMA_VERSION) {
+ return schema();
+ }
+
+ assert lastVer <= lastVer0 : "Cached schema is earlier than consensus [lastVer=" + lastVer
+ + ", consLastVer=" + lastVer0 + ']';
+
+ return schema(lastVer0);
+ }
+
+ /** {@inheritDoc} */
@Override
public int lastSchemaVersion() {
return lastVer;
@@ -115,15 +138,38 @@ public class SchemaRegistryImpl implements SchemaRegistry {
/** {@inheritDoc} */
@Override
public Row resolve(BinaryRow row) {
- final SchemaDescriptor rowSchema = schema(row.schemaVersion());
- final SchemaDescriptor curSchema = schema();
+ final SchemaDescriptor curSchema = waitLatestSchema();
+
+ return resolveInternal(row, curSchema);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Collection<Row> resolve(Collection<BinaryRow> rows) {
+ final SchemaDescriptor curSchema = waitLatestSchema();
+
+ return rows.stream().map(row -> resolveInternal(row, curSchema))
+ .collect(Collectors.toList());
+ }
+ /**
+ * Resolves a schema for row.
+ * The method is optimal when the latest schema is already gotten.
+ *
+ * @param row Binary row.
+ * @param curSchema The latest available local schema.
+ * @return Schema-aware rows.
+ */
+ @NotNull
+ private Row resolveInternal(BinaryRow row, SchemaDescriptor curSchema) {
+ final SchemaDescriptor rowSchema = schema(row.schemaVersion());
+
if (curSchema.version() == rowSchema.version()) {
return new Row(rowSchema, row);
}
-
+
ColumnMapper mapping = resolveMapping(curSchema, rowSchema);
-
+
return new UpgradingRowAdapter(curSchema, rowSchema, row, mapping);
}
@@ -182,9 +228,9 @@ public class SchemaRegistryImpl implements SchemaRegistry {
throw new SchemaRegistryException("Try to register schema of wrong version: ver=" + desc.version() + ", lastVer=" + lastVer);
}
- schemaCache.put(desc.version(), desc);
-
lastVer = desc.version();
+
+ schemaCache.put(desc.version(), desc);
}
/**
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
index 06e6c2b..0754875 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
@@ -58,7 +58,7 @@ public class SchemaRegistryImplTest {
new Column[]{new Column("keyLongCol", INT64, true)},
new Column[]{new Column("valBytesCol", BYTES, true)});
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> INITIAL_SCHEMA_VERSION);
assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
assertNull(reg.schema());
@@ -110,7 +110,7 @@ public class SchemaRegistryImplTest {
new Column("valStringCol", STRING, true)
});
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> INITIAL_SCHEMA_VERSION);
assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
assertNull(reg.schema());
@@ -164,7 +164,7 @@ public class SchemaRegistryImplTest {
new Column("valStringCol", STRING, true)
});
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> INITIAL_SCHEMA_VERSION);
assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
@@ -232,7 +232,7 @@ public class SchemaRegistryImplTest {
new Column("valStringCol", STRING, true)
});
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> INITIAL_SCHEMA_VERSION);
assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
@@ -348,7 +348,7 @@ public class SchemaRegistryImplTest {
Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV1, schemaV2);
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(2, history::get);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(2, history::get, () -> INITIAL_SCHEMA_VERSION);
assertEquals(2, reg.lastSchemaVersion());
assertSameSchema(schemaV2, reg.schema());
@@ -414,7 +414,7 @@ public class SchemaRegistryImplTest {
Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2, schemaV3);
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(3, history::get);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(3, history::get, () -> INITIAL_SCHEMA_VERSION);
assertEquals(3, reg.lastSchemaVersion());
assertSameSchema(schemaV3, reg.schema());
@@ -480,7 +480,7 @@ public class SchemaRegistryImplTest {
Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2, schemaV3, schemaV4);
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(4, history::get);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(4, history::get, () -> INITIAL_SCHEMA_VERSION);
assertEquals(4, reg.lastSchemaVersion());
assertSameSchema(schemaV4, reg.schema());
@@ -546,7 +546,7 @@ public class SchemaRegistryImplTest {
schemaV4.columnMapping(createMapper(schemaV4).add(schemaV4.column("valBytesCol")));
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> INITIAL_SCHEMA_VERSION);
final Map<Long, ColumnMapper> cache = reg.mappingCache();
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
index 1f99a21..4d54319 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
@@ -29,6 +29,7 @@ import static org.apache.ignite.internal.schema.NativeTypes.STRING;
import static org.apache.ignite.internal.schema.NativeTypes.datetime;
import static org.apache.ignite.internal.schema.NativeTypes.time;
import static org.apache.ignite.internal.schema.NativeTypes.timestamp;
+import static org.apache.ignite.internal.schema.registry.SchemaRegistryImpl.INITIAL_SCHEMA_VERSION;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -150,12 +151,12 @@ public class UpgradingRowAdapterTest {
ByteBufferRow row = new ByteBufferRow(serializeValuesToRow(schema, values));
// Validate row.
- validateRow(values, new SchemaRegistryImpl(1, v -> v == 1 ? schema : schema2), row);
+ validateRow(values, new SchemaRegistryImpl(1, v -> v == 1 ? schema : schema2, () -> INITIAL_SCHEMA_VERSION), row);
// Validate upgraded row.
values.add(addedColumnIndex, null);
- validateRow(values, new SchemaRegistryImpl(2, v -> v == 1 ? schema : schema2), row);
+ validateRow(values, new SchemaRegistryImpl(2, v -> v == 1 ? schema : schema2, () -> INITIAL_SCHEMA_VERSION), row);
}
private void validateRow(List<Object> values, SchemaRegistryImpl schemaRegistry, ByteBufferRow binaryRow) {
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItDistributedTableTest.java
index f35534d..8f7f04b 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItDistributedTableTest.java
@@ -327,7 +327,12 @@ public class ItDistributedTableTest {
public SchemaDescriptor schema(int ver) {
return SCHEMA;
}
-
+
+ @Override
+ public SchemaDescriptor waitLatestSchema() {
+ return schema();
+ }
+
@Override
public int lastSchemaVersion() {
return SCHEMA.version();
@@ -337,6 +342,10 @@ public class ItDistributedTableTest {
public Row resolve(BinaryRow row) {
return new Row(SCHEMA, row);
}
+
+ @Override public Collection<Row> resolve(Collection<BinaryRow> rows) {
+ return rows.stream().map(this::resolve).collect(Collectors.toList());
+ }
}, null);
partitionedTableRecordView(tbl.recordView(), PARTS * 10);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshallerImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshallerImpl.java
index 67842e7..7cac642 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshallerImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshallerImpl.java
@@ -82,22 +82,28 @@ public class TupleMarshallerImpl implements TupleMarshaller {
InternalTuple keyTuple0 = toInternalTuple(schema, tuple, true);
InternalTuple valTuple0 = toInternalTuple(schema, tuple, false);
-
+
while (valTuple0.knownColumns() + keyTuple0.knownColumns() != tuple.columnCount()) {
if (tbl.schemaMode() == SchemaManagementMode.STRICT) {
- throw new SchemaMismatchException("Value doesn't match schema.");
+ SchemaDescriptor newSchema = schemaReg.waitLatestSchema();
+
+ if (newSchema.version() == schema.version()) {
+ throw new SchemaMismatchException("Value doesn't match schema.");
+ }
+
+ return marshal(tuple);
}
-
+
createColumns(extractColumnsType(tuple, extraColumnNames(tuple, schema)));
-
+
assert schemaReg.lastSchemaVersion() > schema.version();
-
+
schema = schemaReg.schema();
-
+
keyTuple0 = toInternalTuple(schema, tuple, true);
valTuple0 = toInternalTuple(schema, tuple, false);
}
-
+
return buildRow(schema, keyTuple0, valTuple0);
} catch (Exception ex) {
throw new TupleMarshallerException("Failed to marshal tuple.", ex);
@@ -112,26 +118,34 @@ public class TupleMarshallerImpl implements TupleMarshaller {
InternalTuple keyTuple0 = toInternalTuple(schema, keyTuple, true);
InternalTuple valTuple0 = toInternalTuple(schema, valTuple, false);
-
+
while (true) {
if (keyTuple0.knownColumns() < keyTuple.columnCount()) {
- throw new SchemaMismatchException("Key tuple contains extra columns: " + extraColumnNames(keyTuple, true, schema));
+ throw new SchemaMismatchException(
+ "Key tuple contains extra columns: " + extraColumnNames(keyTuple, true,
+ schema));
}
-
+
if (valTuple == null || valTuple0.knownColumns() == valTuple.columnCount()) {
break; // Nothing to do.
}
-
+
if (tbl.schemaMode() == SchemaManagementMode.STRICT) {
- throw new SchemaMismatchException("Value doesn't match schema.");
+ SchemaDescriptor newSchema = schemaReg.waitLatestSchema();
+
+ if (newSchema.version() == schema.version()) {
+ throw new SchemaMismatchException("Value doesn't match schema.");
+ }
+
+ return marshal(keyTuple, valTuple);
}
-
+
createColumns(extractColumnsType(valTuple, extraColumnNames(valTuple, false, schema)));
-
+
assert schemaReg.lastSchemaVersion() > schema.version();
-
+
schema = schemaReg.schema();
-
+
keyTuple0 = toInternalTuple(schema, keyTuple, true);
valTuple0 = toInternalTuple(schema, valTuple, false);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index 667f8ef..e3d60d0 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
import java.io.Serializable;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
@@ -424,7 +425,15 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie
if (rows == null) {
return null;
}
-
- return rows.stream().filter(Objects::nonNull).map(this::wrap).collect(Collectors.toSet());
+
+ Collection<BinaryRow> nonEmptyRows = rows.stream().filter(Objects::nonNull)
+ .collect(Collectors.toList());
+
+ if (nonEmptyRows.isEmpty()) {
+ return Collections.EMPTY_LIST;
+ }
+
+ return schemaReg.resolve(nonEmptyRows).stream().map(TableRow::tuple)
+ .collect(Collectors.toList());
}
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 92b79c7..eceb762 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -32,6 +32,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.configuration.schema.ExtendedTableChange;
import org.apache.ignite.internal.configuration.schema.ExtendedTableConfiguration;
import org.apache.ignite.internal.configuration.schema.ExtendedTableView;
+import org.apache.ignite.internal.configuration.schema.SchemaConfiguration;
import org.apache.ignite.internal.configuration.schema.SchemaView;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
import org.apache.ignite.internal.manager.EventListener;
@@ -60,6 +62,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaException;
import org.apache.ignite.internal.schema.SchemaUtils;
import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
@@ -182,7 +185,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
@Override
public void start() {
tablesCfg.tables()
- .listenElements(new ConfigurationNamedListListener<>() {
+ .listenElements(new ConfigurationNamedListListener<TableView>() {
@Override
public @NotNull CompletableFuture<?> onCreate(@NotNull ConfigurationNotificationEvent<TableView> ctx) {
if (!busyLock.enterBusy()) {
@@ -503,11 +506,31 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
partitionMap.put(p, service);
}
-
- InternalTableImpl internalTable = new InternalTableImpl(name, tblId, partitionMap, partitions, netAddrResolver,
- tableStorage);
-
- var schemaRegistry = new SchemaRegistryImpl(v -> schemaDesc);
+
+ InternalTableImpl internalTable = new InternalTableImpl(name, tblId, partitionMap,
+ partitions, netAddrResolver, tableStorage);
+
+ var schemaRegistry = new SchemaRegistryImpl(v -> {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(new NodeStoppingException());
+ }
+
+ try {
+ return tableSchema(tblId, v);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }, () -> {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(new NodeStoppingException());
+ }
+
+ try {
+ return latestSchemaVersion(tblId);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
schemaRegistry.onSchemaRegistered(schemaDesc);
@@ -524,7 +547,76 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
} catch (Exception e) {
fireEvent(TableEvent.CREATE, new TableEventParameters(tblId, name), e);
}
- });
+ }).join();
+ }
+
+ /**
+ * Return table schema of certain version from history.
+ *
+ * @param tblId Table id.
+ * @param schemaVer Schema version.
+ * @return Schema descriptor.
+ */
+ private SchemaDescriptor tableSchema(IgniteUuid tblId, int schemaVer) {
+ try {
+ TableImpl table = tablesById.get(tblId);
+
+ assert table != null : "Table is undefined [tblId=" + tblId + ']';
+
+ ExtendedTableConfiguration tblCfg = ((ExtendedTableConfiguration) tablesCfg.tables().get(table.tableName()));
+
+ if (schemaVer <= table.schemaView().lastSchemaVersion()) {
+ return getSchemaDescriptorLocally(schemaVer, tblCfg);
+ }
+
+ CompletableFuture<SchemaDescriptor> fur = new CompletableFuture<>();
+
+ var clo = new EventListener<TableEventParameters>() {
+ @Override
+ public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) {
+ if (tblId.equals(parameters.tableId()) && schemaVer <= parameters.table().schemaView().lastSchemaVersion()) {
+ fur.complete(getSchemaDescriptorLocally(schemaVer, tblCfg));
+
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override public void remove(@NotNull Throwable exception) {
+ fur.completeExceptionally(exception);
+ }
+ };
+
+ listen(TableEvent.ALTER, clo);
+
+ if (schemaVer <= table.schemaView().lastSchemaVersion()) {
+ fur.complete(getSchemaDescriptorLocally(schemaVer, tblCfg));
+ }
+
+ if (!isSchemaExists(tblId, schemaVer) && fur.complete(null)) {
+ removeListener(TableEvent.ALTER, clo);
+ }
+
+ return fur.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new SchemaException("Can't read schema from vault: ver=" + schemaVer, e);
+ }
+ }
+
+ /**
+ * Gets a schema descriptor from the local node configuration storage.
+ *
+ * @param schemaVer Schema version.
+ * @param tblCfg Table configuration.
+ * @return Schema descriptor.
+ */
+ @NotNull private SchemaDescriptor getSchemaDescriptorLocally(int schemaVer, ExtendedTableConfiguration tblCfg) {
+ SchemaConfiguration schemaCfg = tblCfg.schemas().get(String.valueOf(schemaVer));
+
+ assert schemaCfg != null;
+
+ return SchemaSerializerImpl.INSTANCE.deserialize(schemaCfg.schema().value());
}
/**
@@ -979,7 +1071,56 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
private List<String> tableNamesConfigured() {
return ConfigurationUtil.directValue(tablesCfg.tables()).namedListKeys();
}
-
+
+ /**
+ * Checks that the schema is configured in the Metasorage consensus.
+ *
+ * @param tblId Table id.
+ * @param schemaVer Schema version.
+ * @return True when the schema configured, false otherwise.
+ */
+ // TODO: IGNITE-15412 Configuration manager will be used to retrieve distributed values
+ private boolean isSchemaExists(IgniteUuid tblId, int schemaVer) {
+ return latestSchemaVersion(tblId) >= schemaVer;
+ }
+
+ /**
+ * Gets the latest version of the table schema which available in Metastore.
+ *
+ * @param tblId Table id.
+ * @return The latest schema version.
+ */
+ private int latestSchemaVersion(IgniteUuid tblId) {
+ NamedListView<TableView> directTablesCfg = ((DirectConfigurationProperty<NamedListView<TableView>>) tablesCfg
+ .tables()).directValue();
+
+ ExtendedTableView viewForId = null;
+
+ // TODO: IGNITE-15721 Need to review this approach after the ticket would be fixed.
+ // Probably, it won't be required getting configuration of all tables from Metastor.
+ for (String name : directTablesCfg.namedListKeys()) {
+ ExtendedTableView tblView = (ExtendedTableView) directTablesCfg.get(name);
+
+ if (tblView != null && tblId.equals(IgniteUuid.fromString(tblView.id()))) {
+ viewForId = tblView;
+
+ break;
+ }
+ }
+
+ int lastVer = INITIAL_SCHEMA_VERSION;
+
+ for (String schemaVerAsStr : viewForId.schemas().namedListKeys()) {
+ int ver = Integer.parseInt(schemaVerAsStr);
+
+ if (ver > lastVer) {
+ lastVer = ver;
+ }
+ }
+
+ return lastVer;
+ }
+
/** {@inheritDoc} */
@Override
public Table table(String name) {
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
index b9dd1e7..a7cd3ec 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.benchmarks;
+import static org.apache.ignite.internal.schema.registry.SchemaRegistryImpl.INITIAL_SCHEMA_VERSION;
+
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@@ -105,7 +107,7 @@ public class TupleMarshallerFixlenOnlyBenchmark {
.toArray(Column[]::new)
);
- marshaller = new TupleMarshallerImpl(null, null, new SchemaRegistryImpl(v -> null) {
+ marshaller = new TupleMarshallerImpl(null, null, new SchemaRegistryImpl(v -> null, () -> INITIAL_SCHEMA_VERSION) {
@Override
public SchemaDescriptor schema() {
return schema;
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
index ec00aaf..d641d9b 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.benchmarks;
import static org.apache.ignite.internal.schema.NativeTypes.BYTES;
import static org.apache.ignite.internal.schema.NativeTypes.INT64;
import static org.apache.ignite.internal.schema.NativeTypes.STRING;
+import static org.apache.ignite.internal.schema.registry.SchemaRegistryImpl.INITIAL_SCHEMA_VERSION;
import java.io.Serializable;
import java.util.Random;
@@ -119,7 +120,7 @@ public class TupleMarshallerVarlenOnlyBenchmark {
.toArray(Column[]::new)
);
- marshaller = new TupleMarshallerImpl(null, null, new SchemaRegistryImpl(v -> null) {
+ marshaller = new TupleMarshallerImpl(null, null, new SchemaRegistryImpl(v -> null, () -> INITIAL_SCHEMA_VERSION) {
@Override
public SchemaDescriptor schema() {
return schema;
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index 0d4a80d..1548a57 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -440,11 +440,11 @@ public class TableManagerTest extends IgniteAbstractTest {
if (!createTbl && !dropTbl) {
return CompletableFuture.completedFuture(null);
}
-
+
if (phaser != null) {
phaser.arriveAndAwaitAdvance();
}
-
+
return CompletableFuture.completedFuture(null);
});
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
index 986bd80..ceff3d6 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table.impl;
+import java.util.Collection;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -46,16 +48,21 @@ public class DummySchemaManagerImpl implements SchemaRegistry {
public SchemaDescriptor schema() {
return schema;
}
-
+
/** {@inheritDoc} */
@Override
public SchemaDescriptor schema(int ver) {
assert ver >= 0;
-
+
assert schema.version() == ver;
-
+
return schema;
}
+
+ /** {@inheritDoc} */
+ @Override public SchemaDescriptor waitLatestSchema() {
+ return schema();
+ }
/** {@inheritDoc} */
@Override
@@ -70,4 +77,10 @@ public class DummySchemaManagerImpl implements SchemaRegistry {
return new Row(schema, row);
}
+
+ /** {@inheritDoc} */
+ @Override
+ public Collection<Row> resolve(Collection<BinaryRow> rows) {
+ return rows.stream().map(this::resolve).collect(Collectors.toList());
+ }
}