You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2021/04/12 16:31:09 UTC

[ignite-3] branch ignite-14077 updated (50e4507 -> 57e9d4c)

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a change to branch ignite-14077
in repository https://gitbox.apache.org/repos/asf/ignite-3.git.


 discard 50e4507  Initial schema manager implementation. Minor refactoring.
     new 57e9d4c  Initial schema manager implementation. Minor refactoring.

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (50e4507)
            \
             N -- N -- N   refs/heads/ignite-14077 (57e9d4c)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../internal/table/schema/SchemaRegistry.java      |  31 +-
 .../internal/table/schema/SchemaManagerTest.java   | 479 +++++++++++++++++++--
 2 files changed, 461 insertions(+), 49 deletions(-)

[ignite-3] 01/01: Initial schema manager implementation. Minor refactoring.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-14077
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 57e9d4c31a6b693c52dfa58ebf43392d995497e9
Author: Andrew Mashenkov <an...@gmail.com>
AuthorDate: Mon Apr 12 16:47:14 2021 +0300

    Initial schema manager implementation.
    Minor refactoring.
---
 .../SchemaRegistrationConflictException.java}      |  36 +-
 .../internal/table/schema/SchemaRegistry.java      | 177 +++++++
 .../table/schema/SchemaRegistryException.java}     |  40 +-
 .../table/schema/TableSchemaManagerImpl.java       | 123 +++++
 .../table/impl/DummyInternalTableImpl.java         |   2 +-
 .../table/impl/DummySchemaManagerImpl.java         |   2 +-
 .../internal/table/schema/SchemaManagerTest.java   | 507 +++++++++++++++++++++
 .../test/java/org/apache/ignite/table/Example.java |   2 +-
 .../apache/ignite/table/KVViewOperationsTest.java  |   4 +-
 .../table/TableBinaryViewOperationsTest.java       |   4 +-
 10 files changed, 836 insertions(+), 61 deletions(-)

diff --git a/modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/schema/SchemaRegistrationConflictException.java
similarity index 51%
copy from modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/schema/SchemaRegistrationConflictException.java
index 01df2fe..adcc185 100644
--- a/modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/schema/SchemaRegistrationConflictException.java
@@ -15,41 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.table.impl;
+package org.apache.ignite.internal.table.schema;
 
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.table.TableSchemaManager;
-import org.jetbrains.annotations.NotNull;
+import org.apache.ignite.lang.IgniteInternalException;
 
 /**
- * Dummy schema manager for tests.
+ * Schema registration conflict exception is thown if
+ * registering schema's number was alredy registered earlier.
  */
-public class DummySchemaManagerImpl implements TableSchemaManager {
-    /** Schema. */
-    private final SchemaDescriptor schema;
-
+public class SchemaRegistrationConflictException extends IgniteInternalException {
     /**
      * Constructor.
      *
-     * @param schema Schema descriptor.
+     * @param msg Message.
      */
-    public DummySchemaManagerImpl(@NotNull SchemaDescriptor schema) {
-        assert schema != null;
-
-        this.schema = schema;
-    }
-
-    /** {@inheritDoc} */
-    @Override public SchemaDescriptor schema() {
-        return schema;
-    }
-
-    /** {@inheritDoc} */
-    @Override public SchemaDescriptor schema(int ver) {
-        assert ver >= 0;
-
-        assert schema.version() == ver;
-
-        return schema;
+    public SchemaRegistrationConflictException(String msg) {
+        super(msg);
     }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/schema/SchemaRegistry.java b/modules/table/src/main/java/org/apache/ignite/internal/table/schema/SchemaRegistry.java
new file mode 100644
index 0000000..3fa9bdf
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/schema/SchemaRegistry.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.schema;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+
+/**
+ * Holds schema descriptors for actual schema versions.
+ * <p>
+ * Schemas MUST be registered in a version ascending order incrementing by {@code 1} with NO gaps,
+ * otherwise an exception will be thrown. The version numbering starts from the {@code 1}.
+ * <p>
+ * After some table maintenance process some first versions may become outdated and can be safely cleaned up
+ * if the process guarantees the table no longer has a data of these versions.
+ *
+ * @implSpec The changes in between two arbitrary actual versions MUST NOT be lost.
+ * Thus, schema versions can only be removed from the beginning.
+ * @implSpec Initial schema history MAY be registered without the first outdate versions
+ * that could be cleaned up earlier.
+ */
+public class SchemaRegistry {
+    /** Initial schema version. */
+    static final int INITIAL_SCHEMA_VERSION = -1;
+
+    /** Latest actual schemas. */
+    private final ConcurrentSkipListMap<Integer, SchemaDescriptor> history = new ConcurrentSkipListMap<>();
+
+    /** Last registered version. */
+    private volatile int lastVer;
+
+    /**
+     * Default constructor.
+     */
+    public SchemaRegistry() {
+        lastVer = INITIAL_SCHEMA_VERSION;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param history Schema history.
+     */
+    public SchemaRegistry(List<SchemaDescriptor> history) {
+        if (history.isEmpty())
+            lastVer = INITIAL_SCHEMA_VERSION;
+        else {
+            validateSchemaHistory(history);
+
+            history.forEach(d -> this.history.put(d.version(), d));
+
+            lastVer = history.get(history.size() - 1).version();
+        }
+    }
+
+    /**
+     * @param history Schema history.
+     * @throws SchemaRegistryException If history is invalid.
+     */
+    private void validateSchemaHistory(List<SchemaDescriptor> history) {
+        if (history.isEmpty())
+            return;
+
+        int prevVer = Objects.requireNonNull(history.get(0), "Schema descriptor can't be null.").version();
+
+        assert prevVer > 0;
+
+        for (int i = 1; i < history.size(); i++) {
+            final SchemaDescriptor desc = Objects.requireNonNull(history.get(i), "Schema descriptor can't be null.");
+
+            if (desc.version() != (++prevVer))
+                throw new SchemaRegistryException("Unexpected schema version: expected=" + prevVer + ", actual=" + desc.version());
+        }
+    }
+
+    /**
+     * Gets schema descriptor for given version.
+     *
+     * @param ver Schema version to get descriptor for.
+     * @return Schema descriptor.
+     */
+    public SchemaDescriptor schema(int ver) {
+        final SchemaDescriptor desc = history.get(ver);
+
+        if (desc != null)
+            return desc;
+
+        if (lastVer < ver || ver <= 0)
+            throw new SchemaRegistryException("Incorrect schema version requested: " + ver);
+
+        assert history.isEmpty() || ver < history.firstKey();
+
+        throw new SchemaRegistryException("Outdated schema version requested: " + ver);
+    }
+
+    /**
+     * Gets schema descriptor for the latest version.
+     *
+     * @return Schema descriptor.
+     */
+    public SchemaDescriptor schema() {
+        final int lastVer0 = lastVer;
+
+        final SchemaDescriptor desc = history.get(lastVer0);
+
+        if (desc != null)
+            return desc;
+
+        if (lastVer0 == INITIAL_SCHEMA_VERSION)
+            throw new SchemaRegistryException("Table schema was not initialized yet.");
+
+        assert lastVer0 < history.firstKey();
+
+        throw new SchemaRegistryException("Outdated schema version requested: " + lastVer0);
+    }
+
+    /**
+     * @return Last known schema version.
+     */
+    public int lastSchemaVersion() {
+        return lastVer;
+    }
+
+    /**
+     * Register new schema.
+     *
+     * @param desc Schema descriptor.
+     * @throws SchemaRegistrationConflictException If schema of provided version was already registered.
+     * @throws SchemaRegistryException If schema of incorrect version provided.
+     */
+    public void registerSchema(SchemaDescriptor desc) {
+        if (lastVer == INITIAL_SCHEMA_VERSION) {
+            if (desc.version() != 1)
+                throw new SchemaRegistryException("Try to register schema of wrong version: ver=" + desc.version() + ", lastVer=" + lastVer);
+        }
+        else if (desc.version() != lastVer + 1) {
+            if (desc.version() > 0 && desc.version() <= lastVer)
+                throw new SchemaRegistrationConflictException("Schema with given version has been already registered: " + desc.version());
+
+            throw new SchemaRegistryException("Try to register schema of wrong version: ver=" + desc.version() + ", lastVer=" + lastVer);
+        }
+
+        history.put(desc.version(), desc);
+
+        lastVer = desc.version();
+    }
+
+    /**
+     * Cleanup history prior to given schema version.
+     *
+     * @param ver First actual schema version.
+     * @throws SchemaRegistryException If incorrect schema version provided.
+     */
+    public void cleanupBefore(int ver) {
+        if (ver > lastVer || ver <= 0)
+            throw new SchemaRegistryException("Incorrect schema version to clean up to: " + ver);
+
+        history.keySet().stream().takeWhile(k -> k < ver).forEach(history::remove);
+    }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/schema/SchemaRegistryException.java
similarity index 51%
copy from modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/schema/SchemaRegistryException.java
index 01df2fe..5b41260 100644
--- a/modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/schema/SchemaRegistryException.java
@@ -15,41 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.table.impl;
+package org.apache.ignite.internal.table.schema;
 
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.table.TableSchemaManager;
-import org.jetbrains.annotations.NotNull;
+import org.apache.ignite.lang.IgniteInternalException;
 
 /**
- * Dummy schema manager for tests.
+ * Schema registration exception.
  */
-public class DummySchemaManagerImpl implements TableSchemaManager {
-    /** Schema. */
-    private final SchemaDescriptor schema;
-
+public class SchemaRegistryException extends IgniteInternalException {
     /**
      * Constructor.
      *
-     * @param schema Schema descriptor.
+     * @param msg Message.
      */
-    public DummySchemaManagerImpl(@NotNull SchemaDescriptor schema) {
-        assert schema != null;
-
-        this.schema = schema;
+    public SchemaRegistryException(String msg) {
+        super(msg);
     }
 
-    /** {@inheritDoc} */
-    @Override public SchemaDescriptor schema() {
-        return schema;
-    }
-
-    /** {@inheritDoc} */
-    @Override public SchemaDescriptor schema(int ver) {
-        assert ver >= 0;
-
-        assert schema.version() == ver;
-
-        return schema;
+    /**
+     * Constructor.
+     *
+     * @param cause Cause.
+     */
+    public SchemaRegistryException(Throwable cause) {
+        super(cause);
     }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/schema/TableSchemaManagerImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/schema/TableSchemaManagerImpl.java
new file mode 100644
index 0000000..1ca3ed1
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/schema/TableSchemaManagerImpl.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.schema;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.TableSchemaManager;
+
+import static org.apache.ignite.internal.table.schema.SchemaRegistry.INITIAL_SCHEMA_VERSION;
+
+/**
+ * Table schema manager component.
+ */
+public class TableSchemaManagerImpl implements TableSchemaManager {
+    /** Stub. */
+    private static final Column[] EMPTY_COLS_ARR = new Column[0];
+
+    /** Local registry for schema. */
+    private final SchemaRegistry schemaReg;
+
+    private final Map<UUID, Future<SchemaDescriptor>> pendingRegistrations = new ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     */
+    public TableSchemaManagerImpl() {
+        this.schemaReg = new SchemaRegistry();
+    }
+
+    /**
+     * Gets schema description for version.
+     *
+     * @param ver Schema version to get descriptor for.
+     * @return Schema descriptor.
+     */
+    @Override public SchemaDescriptor schema(int ver) {
+        return schemaReg.schema(ver);
+    }
+
+    /**
+     * Gets schema description for version.
+     *
+     * @return Schema descriptor.
+     */
+    @Override public SchemaDescriptor schema() {
+        return schemaReg.schema();
+    }
+
+    /**
+     * Registers schema in local registry.
+     *
+     * @param desc Schema descriptor.
+     */
+    private void registerSchemaLocal(SchemaDescriptor desc) {
+        schemaReg.registerSchema(desc);
+    }
+
+    /**
+     * Metastore callback is triggered when new schema was registered in grid.
+     *
+     * @param schemaDesc Schema descriptor.
+     */
+    public void onSchemaUpdated(SchemaDescriptor schemaDesc) {
+        registerSchemaLocal(schemaDesc);
+    }
+
+    /**
+     * Compares schemas.
+     *
+     * @param expected Expected schema.
+     * @param actual Actual schema.
+     * @return {@code True} if schemas are equal, {@code false} otherwise.
+     */
+    static boolean equalSchemas(SchemaDescriptor expected, SchemaDescriptor actual) {
+        if (expected.keyColumns().length() != actual.keyColumns().length() ||
+            expected.valueColumns().length() != actual.valueColumns().length())
+            return false;
+
+        for (int i = 0; i < expected.length(); i++) {
+            if (!expected.column(i).equals(actual.column(i)))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Schema descriptor factory method.
+     *
+     * @param ver Version.
+     * @param keyCols Key columns.
+     * @param valCols Value columns.
+     * @return Schema descriptor.
+     */
+    static SchemaDescriptor createDescriptor(int ver, List<Column> keyCols, List<Column> valCols) {
+        return new SchemaDescriptor(
+            ver,
+            keyCols.toArray(EMPTY_COLS_ARR),
+            valCols.toArray(EMPTY_COLS_ARR));
+    }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
similarity index 99%
rename from modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java
rename to modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 5e9a352..f41b025 100644
--- a/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.table.impl;
+package org.apache.ignite.internal.table.impl;
 
 import java.util.Arrays;
 import java.util.Collection;
diff --git a/modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
similarity index 97%
rename from modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java
rename to modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
index 01df2fe..3c19cf6 100644
--- a/modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.table.impl;
+package org.apache.ignite.internal.table.impl;
 
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.table.TableSchemaManager;
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/schema/SchemaManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/schema/SchemaManagerTest.java
new file mode 100644
index 0000000..aff58bd
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/schema/SchemaManagerTest.java
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.schema;
+
+import java.util.List;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.ignite.internal.schema.NativeType.BYTES;
+import static org.apache.ignite.internal.schema.NativeType.LONG;
+import static org.apache.ignite.internal.schema.NativeType.STRING;
+import static org.apache.ignite.internal.table.schema.SchemaRegistry.INITIAL_SCHEMA_VERSION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Schema manager test.
+ */
+public class SchemaManagerTest {
+    /**
+     * Check registration of schema with wrong versions.
+     */
+    @Test
+    public void testWrongSchemaVersionRegistration() {
+        final SchemaDescriptor schemaV0 = new SchemaDescriptor(INITIAL_SCHEMA_VERSION,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaDescriptor schemaV1 = new SchemaDescriptor(0,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaRegistry reg = new SchemaRegistry();
+
+        assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema());
+
+        // Try to register schema with initial version.
+        assertThrows(SchemaRegistryException.class, () -> reg.registerSchema(schemaV0));
+        assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+
+        assertThrows(SchemaRegistryException.class, () -> reg.schema());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(INITIAL_SCHEMA_VERSION));
+
+        // Try to register schema with version of 0-zero.
+        assertThrows(SchemaRegistryException.class, () -> reg.registerSchema(schemaV1));
+        assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(INITIAL_SCHEMA_VERSION));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(0));
+
+        // Try to register schema with version of 2.
+        assertThrows(SchemaRegistryException.class, () -> reg.registerSchema(schemaV2));
+        assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(INITIAL_SCHEMA_VERSION));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(0));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
+    }
+
+    /**
+     * Check initial schema registration.
+     */
+    @Test
+    public void testSchemaRegistration() {
+        final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaRegistry reg = new SchemaRegistry();
+
+        assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema());
+
+        // Register schema with very first version.
+        reg.registerSchema(schemaV1);
+
+        assertEquals(1, reg.lastSchemaVersion());
+        assertSameSchema(schemaV1, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+
+        // Register schema with next version.
+        reg.registerSchema(schemaV2);
+
+        assertEquals(2, reg.lastSchemaVersion());
+        assertSameSchema(schemaV2, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+
+        // Try to register schema with version of 4.
+        assertThrows(SchemaRegistryException.class, () -> reg.registerSchema(schemaV4));
+
+        assertEquals(2, reg.lastSchemaVersion());
+        assertSameSchema(schemaV2, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(4));
+    }
+
+    /**
+     * Check schema registration.
+     */
+    @Test
+    public void testDuplucateSchemaRegistration() {
+        final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaDescriptor wrongSchema = new SchemaDescriptor(1,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaRegistry reg = new SchemaRegistry();
+
+        assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+
+        // Register schema with very first version.
+        reg.registerSchema(schemaV1);
+
+        assertEquals(1, reg.lastSchemaVersion());
+        assertSameSchema(schemaV1, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+
+        // Try to register same schema once again.
+        assertThrows(SchemaRegistrationConflictException.class, () -> reg.registerSchema(schemaV1));
+
+        assertEquals(1, reg.lastSchemaVersion());
+        assertSameSchema(schemaV1, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
+
+        // Try to register another schema with same version and check nothing was registered.
+        assertThrows(SchemaRegistrationConflictException.class, () -> reg.registerSchema(wrongSchema));
+
+        assertEquals(1, reg.lastSchemaVersion());
+        assertEquals(1, reg.schema().version());
+
+        assertSameSchema(schemaV1, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
+
+        // Register schema with next version.
+        reg.registerSchema(schemaV2);
+
+        assertEquals(2, reg.lastSchemaVersion());
+
+        assertSameSchema(schemaV2, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+    }
+
+    /**
+     * Check schema cleanup.
+     */
+    @Test
+    public void testSchemaCleanup() {
+        final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaRegistry reg = new SchemaRegistry();
+
+        assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+
+        // Fail to cleanup initial schema
+        assertThrows(SchemaRegistryException.class, () -> reg.cleanupBefore(INITIAL_SCHEMA_VERSION));
+        assertThrows(SchemaRegistryException.class, () -> reg.cleanupBefore(1));
+
+        // Register schema with very first version.
+        reg.registerSchema(schemaV1);
+
+        assertEquals(1, reg.lastSchemaVersion());
+        assertNotNull(reg.schema());
+        assertNotNull(reg.schema(1));
+
+        // Remove non-existed schemas.
+        reg.cleanupBefore(1);
+
+        assertEquals(1, reg.lastSchemaVersion());
+        assertNotNull(reg.schema());
+        assertNotNull(reg.schema(1));
+
+        // Register new schema with next version.
+        reg.registerSchema(schemaV2);
+        reg.registerSchema(schemaV3);
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertNotNull(reg.schema(1));
+        assertNotNull(reg.schema(2));
+        assertNotNull(reg.schema(3));
+
+        // Remove outdated schema 1.
+        reg.cleanupBefore(2);
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+        assertNotNull(reg.schema(2));
+        assertNotNull(reg.schema(3));
+
+        // Remove non-existed schemas.
+        reg.cleanupBefore(2);
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+        assertNotNull(reg.schema(2));
+        assertNotNull(reg.schema(3));
+
+        // Register new schema with next version.
+        reg.registerSchema(schemaV4);
+
+        assertEquals(4, reg.lastSchemaVersion());
+        assertNotNull(reg.schema(2));
+        assertNotNull(reg.schema(3));
+        assertNotNull(reg.schema(4));
+
+        // Remove non-existed schemas.
+        reg.cleanupBefore(2);
+
+        assertEquals(4, reg.lastSchemaVersion());
+        assertSameSchema(schemaV4, reg.schema());
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertSameSchema(schemaV3, reg.schema(3));
+        assertSameSchema(schemaV4, reg.schema(4));
+
+        // Multiple remove.
+        reg.cleanupBefore(4);
+
+        assertEquals(4, reg.lastSchemaVersion());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
+        assertSameSchema(schemaV4, reg.schema());
+        assertSameSchema(schemaV4, reg.schema(4));
+
+        // Once again.
+        reg.cleanupBefore(4);
+
+        assertEquals(4, reg.lastSchemaVersion());
+        assertSameSchema(schemaV4, reg.schema(4));
+    }
+
+    /**
+     * Check schema registration with full history.
+     */
+    @Test
+    public void testInitialSchemaWithFullHistory() {
+        final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaRegistry reg = new SchemaRegistry(List.of(schemaV1, schemaV2));
+
+        assertEquals(2, reg.lastSchemaVersion());
+        assertSameSchema(schemaV2, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+
+        // Register schema with duplicate version.
+        assertThrows(SchemaRegistrationConflictException.class, () -> reg.registerSchema(schemaV1));
+
+        assertEquals(2, reg.lastSchemaVersion());
+        assertSameSchema(schemaV2, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
+
+        // Register schema with out-of-order version.
+        assertThrows(SchemaRegistryException.class, () -> reg.registerSchema(schemaV4));
+
+        assertEquals(2, reg.lastSchemaVersion());
+        assertSameSchema(schemaV2, reg.schema());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
+
+        // Register schema with next version.
+        reg.registerSchema(schemaV3);
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertSameSchema(schemaV3, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertSameSchema(schemaV3, reg.schema(3));
+    }
+
+    /**
+     * Check schema registration with history tail.
+     */
+    @Test
+    public void testInitialSchemaWithTailHistory() {
+        final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valStringCol", STRING, true)});
+
+        final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV5 = new SchemaDescriptor(5,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valStringCol", STRING, true)});
+
+        final SchemaRegistry reg = new SchemaRegistry(List.of(schemaV2, schemaV3));
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertSameSchema(schemaV3, reg.schema());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertSameSchema(schemaV3, reg.schema(3));
+
+        // Register schema with duplicate version.
+        assertThrows(SchemaRegistrationConflictException.class, () -> reg.registerSchema(schemaV2));
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertSameSchema(schemaV3, reg.schema());
+
+        // Register schema with out-of-order version.
+        assertThrows(SchemaRegistryException.class, () -> reg.registerSchema(schemaV5));
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertSameSchema(schemaV3, reg.schema());
+
+        // Register schema with outdated version.
+        assertThrows(SchemaRegistrationConflictException.class, () -> reg.registerSchema(schemaV1));
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertSameSchema(schemaV3, reg.schema());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertSameSchema(schemaV3, reg.schema(3));
+
+        // Register schema with next version.
+        reg.registerSchema(schemaV4);
+
+        assertEquals(4, reg.lastSchemaVersion());
+        assertSameSchema(schemaV4, reg.schema());
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertSameSchema(schemaV3, reg.schema(3));
+        assertSameSchema(schemaV4, reg.schema(4));
+    }
+
+    /**
+     * Check schema cleanup.
+     */
+    @Test
+    public void testSchemaWithHistoryCleanup() {
+        final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaRegistry reg = new SchemaRegistry(List.of(schemaV2, schemaV3, schemaV4));
+
+        assertEquals(4, reg.lastSchemaVersion());
+        assertSameSchema(schemaV4, reg.schema());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertSameSchema(schemaV3, reg.schema(3));
+        assertSameSchema(schemaV4, reg.schema(4));
+
+        reg.cleanupBefore(1);
+        assertEquals(4, reg.lastSchemaVersion());
+        assertNotNull(reg.schema(2));
+        assertNotNull(reg.schema(3));
+        assertNotNull(reg.schema(4));
+
+        reg.cleanupBefore(2);
+        assertEquals(4, reg.lastSchemaVersion());
+        assertNotNull(reg.schema(2));
+        assertNotNull(reg.schema(3));
+        assertNotNull(reg.schema(4));
+
+        reg.cleanupBefore(4);
+
+        assertEquals(4, reg.lastSchemaVersion());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
+        assertNotNull(reg.schema(4));
+    }
+
+    /**
+     * Validate schemas are equals.
+     *
+     * @param schemaDesc1 Schema descriptor to compare with.
+     * @param schemaDesc2 Schema descriptor to compare.
+     */
+    private void assertSameSchema(SchemaDescriptor schemaDesc1, SchemaDescriptor schemaDesc2) {
+        assertEquals(schemaDesc1.version(), schemaDesc2.version(), "Descriptors of different versions.");
+
+        assertTrue(TableSchemaManagerImpl.equalSchemas(schemaDesc1, schemaDesc2), "Schemas are not equals.");
+    }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/table/Example.java b/modules/table/src/test/java/org/apache/ignite/table/Example.java
index 242a185..b83b75c 100644
--- a/modules/table/src/test/java/org/apache/ignite/table/Example.java
+++ b/modules/table/src/test/java/org/apache/ignite/table/Example.java
@@ -23,7 +23,7 @@ import java.util.List;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjects;
 import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.table.impl.DummyInternalTableImpl;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.table.mapper.Mappers;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.params.ParameterizedTest;
diff --git a/modules/table/src/test/java/org/apache/ignite/table/KVViewOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/table/KVViewOperationsTest.java
index 33f1650..a249a8a 100644
--- a/modules/table/src/test/java/org/apache/ignite/table/KVViewOperationsTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/table/KVViewOperationsTest.java
@@ -21,8 +21,8 @@ import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.table.KVBinaryViewImpl;
-import org.apache.ignite.table.impl.DummyInternalTableImpl;
-import org.apache.ignite.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
+import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
diff --git a/modules/table/src/test/java/org/apache/ignite/table/TableBinaryViewOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/table/TableBinaryViewOperationsTest.java
index 6b011b0..efbb2a3 100644
--- a/modules/table/src/test/java/org/apache/ignite/table/TableBinaryViewOperationsTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/table/TableBinaryViewOperationsTest.java
@@ -21,8 +21,8 @@ import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.table.impl.DummyInternalTableImpl;
-import org.apache.ignite.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
+import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;