You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/03/22 10:00:17 UTC

[ignite-3] 02/02: WIP.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-19080
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit a746b771aa5d07d7fb75132cc812a659bf2312b4
Author: amashenkov <an...@gmail.com>
AuthorDate: Wed Mar 22 12:44:00 2023 +0300

    WIP.
---
 modules/catalog/build.gradle                       |   1 +
 .../ignite/internal/catalog/CatalogService.java    |   1 -
 .../internal/catalog/CatalogServiceImpl.java       | 158 +++++++++++++++------
 .../internal/catalog/commands/CatalogUtils.java    |   4 +
 .../catalog/descriptors/TableDescriptor.java       |  15 +-
 .../internal/catalog/CatalogServiceSelfTest.java   |  50 +++++--
 6 files changed, 165 insertions(+), 64 deletions(-)

diff --git a/modules/catalog/build.gradle b/modules/catalog/build.gradle
index 7d9f945e71..01fa3474ce 100644
--- a/modules/catalog/build.gradle
+++ b/modules/catalog/build.gradle
@@ -29,6 +29,7 @@ dependencies {
     implementation libs.jetbrains.annotations
 
     testImplementation(testFixtures(project(':ignite-core')))
+    testImplementation(testFixtures(project(':ignite-metastorage')))
     testImplementation libs.mockito.junit
     testImplementation libs.mockito.core
     testImplementation libs.hamcrest.core
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index 507eefde03..9a7ae7c1dd 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
  * <p>TBD: events
  */
 public interface CatalogService {
-    String PUBLIC = "PUBLIC";
 
     //TODO: IGNITE-19082 Drop this stuff when all versioned schema stuff will be moved to Catalog.
     @Deprecated(forRemoval = true)
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index 2d9ce8c768..6599fb60a7 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -21,12 +21,15 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 
 import java.util.Collection;
-import java.util.Map.Entry;
+import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
 import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnParams;
 import org.apache.ignite.internal.catalog.commands.AlterTableDropColumnParams;
 import org.apache.ignite.internal.catalog.commands.CatalogUtils;
@@ -38,10 +41,16 @@ import org.apache.ignite.internal.catalog.descriptors.SchemaDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.Conditions;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.internal.metastorage.dsl.Statements;
 import org.apache.ignite.internal.util.ArrayUtils;
+import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.jetbrains.annotations.Nullable;
@@ -51,10 +60,9 @@ import org.jetbrains.annotations.Nullable;
  * TODO: IGNITE-19081 Introduce catalog events and make CatalogServiceImpl extends Producer.
  */
 public class CatalogServiceImpl implements CatalogService, CatalogManager {
-    private static final AtomicInteger TABLE_ID_GEN = new AtomicInteger();
-
     /** The logger. */
     private static final IgniteLogger LOG = Loggers.forClass(CatalogServiceImpl.class);
+    public static final String CATALOG_VER_PREFIX = "catalog.ver.";
 
     /** Versioned catalog descriptors. */
     private final NavigableMap<Integer, CatalogDescriptor> catalogByVer = new ConcurrentSkipListMap<>();
@@ -66,6 +74,10 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
 
     private final WatchListener catalogVersionsListener;
 
+    private final ExecutorService executorService = ForkJoinPool.commonPool();
+
+    private final ConcurrentMap<Integer, CompletableFuture<Boolean>> futMap = new ConcurrentHashMap<>();
+
     /**
      * Constructor.
      */
@@ -78,7 +90,7 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
     @Override
     public void start() {
         if (CatalogService.useCatalogService()) {
-            metaStorageMgr.registerPrefixWatch(ByteArray.fromString("catalog-"), catalogVersionsListener);
+            metaStorageMgr.registerPrefixWatch(ByteArray.fromString(CATALOG_VER_PREFIX), catalogVersionsListener);
         }
 
         //TODO: IGNITE-19080 restore state.
@@ -95,7 +107,7 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
     /** {@inheritDoc} */
     @Override
     public TableDescriptor table(String tableName, long timestamp) {
-        return catalogAt(timestamp).schema(CatalogService.PUBLIC).table(tableName);
+        return catalogAt(timestamp).schema(CatalogUtils.DEFAULT_SCHEMA).table(tableName);
     }
 
     /** {@inheritDoc} */
@@ -125,13 +137,13 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
             return null;
         }
 
-        return catalog.schema(CatalogService.PUBLIC);
+        return catalog.schema(CatalogUtils.DEFAULT_SCHEMA);
     }
 
     /** {@inheritDoc} */
     @Override
     public @Nullable SchemaDescriptor activeSchema(long timestamp) {
-        return catalogAt(timestamp).schema(CatalogService.PUBLIC);
+        return catalogAt(timestamp).schema(CatalogUtils.DEFAULT_SCHEMA);
     }
 
     private CatalogDescriptor catalog(int version) {
@@ -139,7 +151,7 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
     }
 
     private CatalogDescriptor catalogAt(long timestamp) {
-        Entry<Long, CatalogDescriptor> entry = catalogByTs.floorEntry(timestamp);
+        Map.Entry<Long, CatalogDescriptor> entry = catalogByTs.floorEntry(timestamp);
 
         if (entry == null) {
             throw new IllegalStateException("No valid schema found for given timestamp: " + timestamp);
@@ -151,7 +163,7 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
     /**
      * MetaStorage event listener for catalog metadata updates.
      */
-    private static class CatalogEventListener implements WatchListener {
+    private class CatalogEventListener implements WatchListener {
         /** {@inheritDoc} */
         @Override
         public String id() {
@@ -161,6 +173,44 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
         /** {@inheritDoc} */
         @Override
         public CompletableFuture<Void> onUpdate(WatchEvent event) {
+            assert event.single();
+
+            EntryEvent entryEvent = event.entryEvent();
+
+            if (entryEvent.newEntry() != null) {
+                Object obj = ByteUtils.fromBytes(entryEvent.newEntry().value());
+
+                assert obj instanceof TableDescriptor;
+
+                TableDescriptor tableDesc = (TableDescriptor) obj;
+
+                // TODO: Add catalog version to event.
+                int ver = catalogByVer.lastKey() + 1;
+
+                CatalogDescriptor catalog = catalogByVer.get(ver - 1);
+
+                SchemaDescriptor schema = catalog.schema(tableDesc.schemaName());
+
+                CatalogDescriptor newCatalog = new CatalogDescriptor(
+                        ver,
+                        System.currentTimeMillis(),
+                        new SchemaDescriptor(
+                                schema.id(),
+                                schema.name(),
+                                ver,
+                                ArrayUtils.concat(schema.tables(), tableDesc),
+                                schema.indexes()
+                        )
+                );
+
+                registerCatalog(newCatalog);
+
+                CompletableFuture<Boolean> rmv = futMap.remove(ver);
+                if (rmv != null) {
+                    rmv.complete(true);
+                }
+            }
+
             return completedFuture(null);
         }
 
@@ -173,53 +223,69 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<?> createTable(CreateTableParams params) {
-        // Creates TableDescriptor and saves it to MetaStorage.
+    public CompletableFuture<Boolean> createTable(CreateTableParams params) {
+        // Creates diff from params, then save to metastorage.
         // Atomically:
-        //        int id = metaStorage.get("lastId") + 1;
-        //        TableDescriptor table = new TableDescriptor(tableId, params)
+        //        int newVer = metaStorage.get("lastVer") + 1;
         //
-        //        Catalog newCatalog = catalogByVer.get(id -1).copy()
-        //        newCatalog.setId(id).addTable(table);
+        //        validate(params);
+        //        Object diff = createDiff(catalog, params);
         //
-        //        metaStorage.put("catalog-"+id, newCatalog);
-        //        metaStorage.put("lastId", id);
+        //        metaStorage.put("catalog.ver." + newVer, diff);
+        //        metaStorage.put("lastVer", newVer);
 
-        // Dummy implementation.
-        synchronized (this) {
-            CatalogDescriptor catalog = catalogByVer.lastEntry().getValue();
+        ByteArray LAST_VER_KEY = ByteArray.fromString("catalog.lastVer");
+        ByteArray TABLE_ID_KEY = ByteArray.fromString("catalog.tableId");
 
-            //TODO: IGNITE-19081 Add validation.
-            String schemaName = Objects.requireNonNullElse(params.schemaName(), CatalogService.PUBLIC);
+        return metaStorageMgr.getAll(Set.of(LAST_VER_KEY, TABLE_ID_KEY))
+                .thenCompose(entries -> {
+                    Entry lastVerEntry = entries.get(LAST_VER_KEY);
+                    Entry tableIdEntry = entries.get(TABLE_ID_KEY);
 
-            SchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName), "No schema found: " + schemaName);
+                    int lastVer = lastVerEntry.empty() ? 0 : ByteUtils.bytesToInt(lastVerEntry.value());
+                    int tableId = tableIdEntry.empty() ? 0 : ByteUtils.bytesToInt(tableIdEntry.value());
 
-            if (schema.table(params.tableName()) != null) {
-                return params.ifTableExists()
-                        ? completedFuture(false)
-                        : failedFuture(new TableAlreadyExistsException(schemaName, params.tableName()));
-            }
+                    int newVer = lastVer + 1;
+                    int newTableId = tableId + 1;
 
-            int newVersion = catalogByVer.lastKey() + 1;
+                    CatalogDescriptor catalog = catalogByVer.get(lastVer);
 
-            TableDescriptor table = CatalogUtils.fromParams(TABLE_ID_GEN.incrementAndGet(), params);
+                    assert catalog.table(newTableId) == null;
 
-            CatalogDescriptor newCatalog = new CatalogDescriptor(
-                    newVersion,
-                    System.currentTimeMillis(),
-                    new SchemaDescriptor(
-                            schema.id(),
-                            schemaName,
-                            newVersion,
-                            ArrayUtils.concat(schema.tables(), table),
-                            schema.indexes()
-                    )
-            );
+                    TableDescriptor tableDesc = CatalogUtils.fromParams(newTableId, params);
 
-            registerCatalog(newCatalog);
-        }
+                    // params.validate(catalog); ???
+                    // validate(catalog, table);
+
+                    SchemaDescriptor schema = catalog.schema(tableDesc.schemaName());
+
+                    if (schema.table(tableDesc.name()) != null) {
+                        return params.ifTableExists()
+                                ? completedFuture(false)
+                                : failedFuture(new TableAlreadyExistsException(tableDesc.schemaName(), tableDesc.name()));
+                    }
+
+                    CompletableFuture<Boolean> opFut = new CompletableFuture<>();
+
+                    if (futMap.putIfAbsent(newVer, opFut) != null) {
+                        return completedFuture(null).thenComposeAsync(ignore -> createTable(params), executorService);
+                    }
 
-        return completedFuture(true);
+                    return metaStorageMgr.invoke(
+                            Statements.iif(
+                                    Conditions.value(LAST_VER_KEY).eq(lastVerEntry.value()),
+                                    Operations.ops(
+                                            Operations.put(LAST_VER_KEY, ByteUtils.intToBytes(newVer)),
+                                            Operations.put(TABLE_ID_KEY, ByteUtils.intToBytes(newTableId)),
+                                            Operations.put(ByteArray.fromString(CATALOG_VER_PREFIX + newVer), ByteUtils.toBytes(tableDesc))
+                                    ).yield(true),
+                                    Operations.ops().yield(false)
+                            )
+                    ).thenComposeAsync(
+                            res -> res.getAsBoolean() ? opFut.thenApply(ignore -> true) : createTable(params),
+                            executorService
+                    );
+                });
     }
 
     /** {@inheritDoc} */
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
index 1ead944291..eff8c1e1cc 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.catalog.commands;
 
+import java.util.Objects;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
@@ -25,6 +26,8 @@ import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
  * Catalog utils.
  */
 public class CatalogUtils {
+    public static final String DEFAULT_SCHEMA = "PUBLIC";
+
     /**
      * Converts CreateTable command params to descriptor.
      *
@@ -34,6 +37,7 @@ public class CatalogUtils {
      */
     public static TableDescriptor fromParams(int id, CreateTableParams params) {
         return new TableDescriptor(id,
+                Objects.requireNonNullElse(params.schemaName(), DEFAULT_SCHEMA),
                 params.tableName(),
                 params.columns().stream().map(CatalogUtils::fromParams).collect(Collectors.toList()),
                 params.primaryKeyColumns(),
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java
index a9bfd56168..719cf6a1a0 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java
@@ -38,7 +38,7 @@ public class TableDescriptor extends ObjectDescriptor {
 
     private final int zoneId = 0;
     private final int engineId = 0;
-
+    private final String schemaName;
     private final TableColumnDescriptor[] columns;
     private final String[] primaryKeyColumns;
     private final String[] colocationColumns;
@@ -50,20 +50,23 @@ public class TableDescriptor extends ObjectDescriptor {
      * Constructor.
      *
      * @param id Table id.
-     * @param name Table name.
+     * @param schemaName Schema name.
+     * @param tableName Table name.
      * @param columns Table column descriptors.
      * @param pkCols Primary key column names.
      * @param colocationCols Colocation column names.
      */
     public TableDescriptor(
             int id,
-            String name,
+            String schemaName,
+            String tableName,
             List<TableColumnDescriptor> columns,
             List<String> pkCols,
             @Nullable List<String> colocationCols
     ) {
-        super(id, Type.TABLE, name);
+        super(id, Type.TABLE, tableName);
 
+        this.schemaName = Objects.requireNonNull(schemaName);
         this.columns = Objects.requireNonNull(columns, "No columns defined.").toArray(TableColumnDescriptor[]::new);
         primaryKeyColumns = Objects.requireNonNull(pkCols, "No primary key columns.").toArray(String[]::new);
         colocationColumns = colocationCols == null ? primaryKeyColumns : colocationCols.toArray(String[]::new);
@@ -80,6 +83,10 @@ public class TableDescriptor extends ObjectDescriptor {
         assert primaryKeyColumns == colocationColumns || Set.of(primaryKeyColumns).containsAll(List.of(colocationColumns));
     }
 
+    public String schemaName(){
+        return schemaName;
+    }
+
     public int zoneId() {
         return zoneId;
     }
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
index 8953e5737c..2c0495e0e3 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
@@ -23,33 +23,63 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.CatalogUtils;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
 import org.apache.ignite.internal.catalog.commands.CreateTableParams;
 import org.apache.ignite.internal.catalog.commands.DefaultValue;
 import org.apache.ignite.internal.catalog.descriptors.SchemaDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.impl.StandaloneMetastorageManager;
+import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher;
+import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Catalog service self test.
  */
+@ExtendWith(SystemPropertiesExtension.class)
+@WithSystemProperty(key = CatalogService.IGNITE_USE_CATALOG_PROPERTY, value = "true")
 public class CatalogServiceSelfTest {
     private static final String TABLE_NAME = "myTable";
+    private MetaStorageManager metaStorageManager;
+    private CatalogServiceImpl service;
 
-    @Test
-    public void testEmptyCatalog() {
-        CatalogServiceImpl service = new CatalogServiceImpl(Mockito.mock(MetaStorageManager.class));
+    @BeforeEach
+    public void initCatalogService() {
+        metaStorageManager = StandaloneMetastorageManager.create();
+        service = new CatalogServiceImpl(metaStorageManager);
+
+        metaStorageManager.start();
         service.start();
 
+        try {
+            metaStorageManager.deployWatches();
+        } catch (NodeStoppingException e) {
+            fail(e);
+        }
+    }
+
+    @AfterEach
+    public void cleanupResources() throws Exception {
+        service.stop();
+        metaStorageManager.stop();
+    }
+
+    @Test
+    public void testEmptyCatalog() {
         assertNotNull(service.activeSchema(System.currentTimeMillis()));
         assertNotNull(service.schema(0));
 
@@ -60,7 +90,7 @@ public class CatalogServiceSelfTest {
         assertNull(service.index(0, System.currentTimeMillis()));
 
         SchemaDescriptor schema = service.schema(0);
-        assertEquals(CatalogService.PUBLIC, schema.name());
+        assertEquals(CatalogUtils.DEFAULT_SCHEMA, schema.name());
 
         assertEquals(0, schema.version());
         assertEquals(0, schema.tables().length);
@@ -69,9 +99,6 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testCreateTable() {
-        CatalogServiceImpl service = new CatalogServiceImpl(Mockito.mock(MetaStorageManager.class));
-        service.start();
-
         CreateTableParams params = CreateTableParams.builder()
                 .schemaName("PUBLIC")
                 .tableName(TABLE_NAME)
@@ -99,7 +126,7 @@ public class CatalogServiceSelfTest {
 
         assertNotNull(schema);
         assertEquals(0, schema.id());
-        assertEquals(CatalogService.PUBLIC, schema.name());
+        assertEquals(CatalogUtils.DEFAULT_SCHEMA, schema.name());
         assertSame(schema, service.activeSchema(0L));
         assertSame(schema, service.activeSchema(123L));
 
@@ -112,7 +139,7 @@ public class CatalogServiceSelfTest {
 
         assertNotNull(schema);
         assertEquals(0, schema.id());
-        assertEquals(CatalogService.PUBLIC, schema.name());
+        assertEquals(CatalogUtils.DEFAULT_SCHEMA, schema.name());
         assertSame(schema, service.activeSchema(System.currentTimeMillis()));
 
         assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, System.currentTimeMillis()));
@@ -129,9 +156,6 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testCreateTableIfExistsFlag() {
-        CatalogServiceImpl service = new CatalogServiceImpl(Mockito.mock(MetaStorageManager.class));
-        service.start();
-
         CreateTableParams params = CreateTableParams.builder()
                 .tableName("table1")
                 .columns(List.of(