You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/03/22 10:00:17 UTC
[ignite-3] 02/02: WIP.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-19080
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit a746b771aa5d07d7fb75132cc812a659bf2312b4
Author: amashenkov <an...@gmail.com>
AuthorDate: Wed Mar 22 12:44:00 2023 +0300
WIP.
---
modules/catalog/build.gradle | 1 +
.../ignite/internal/catalog/CatalogService.java | 1 -
.../internal/catalog/CatalogServiceImpl.java | 158 +++++++++++++++------
.../internal/catalog/commands/CatalogUtils.java | 4 +
.../catalog/descriptors/TableDescriptor.java | 15 +-
.../internal/catalog/CatalogServiceSelfTest.java | 50 +++++--
6 files changed, 165 insertions(+), 64 deletions(-)
diff --git a/modules/catalog/build.gradle b/modules/catalog/build.gradle
index 7d9f945e71..01fa3474ce 100644
--- a/modules/catalog/build.gradle
+++ b/modules/catalog/build.gradle
@@ -29,6 +29,7 @@ dependencies {
implementation libs.jetbrains.annotations
testImplementation(testFixtures(project(':ignite-core')))
+ testImplementation(testFixtures(project(':ignite-metastorage')))
testImplementation libs.mockito.junit
testImplementation libs.mockito.core
testImplementation libs.hamcrest.core
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index 507eefde03..9a7ae7c1dd 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
* <p>TBD: events
*/
public interface CatalogService {
- String PUBLIC = "PUBLIC";
//TODO: IGNITE-19082 Drop this stuff when all versioned schema stuff will be moved to Catalog.
@Deprecated(forRemoval = true)
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index 2d9ce8c768..6599fb60a7 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -21,12 +21,15 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import java.util.Collection;
-import java.util.Map.Entry;
+import java.util.Map;
import java.util.NavigableMap;
-import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnParams;
import org.apache.ignite.internal.catalog.commands.AlterTableDropColumnParams;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
@@ -38,10 +41,16 @@ import org.apache.ignite.internal.catalog.descriptors.SchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.Conditions;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.internal.metastorage.dsl.Statements;
import org.apache.ignite.internal.util.ArrayUtils;
+import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.jetbrains.annotations.Nullable;
@@ -51,10 +60,9 @@ import org.jetbrains.annotations.Nullable;
* TODO: IGNITE-19081 Introduce catalog events and make CatalogServiceImpl extends Producer.
*/
public class CatalogServiceImpl implements CatalogService, CatalogManager {
- private static final AtomicInteger TABLE_ID_GEN = new AtomicInteger();
-
/** The logger. */
private static final IgniteLogger LOG = Loggers.forClass(CatalogServiceImpl.class);
+ public static final String CATALOG_VER_PREFIX = "catalog.ver.";
/** Versioned catalog descriptors. */
private final NavigableMap<Integer, CatalogDescriptor> catalogByVer = new ConcurrentSkipListMap<>();
@@ -66,6 +74,10 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
private final WatchListener catalogVersionsListener;
+ private final ExecutorService executorService = ForkJoinPool.commonPool();
+
+ private final ConcurrentMap<Integer, CompletableFuture<Boolean>> futMap = new ConcurrentHashMap<>();
+
/**
* Constructor.
*/
@@ -78,7 +90,7 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
@Override
public void start() {
if (CatalogService.useCatalogService()) {
- metaStorageMgr.registerPrefixWatch(ByteArray.fromString("catalog-"), catalogVersionsListener);
+ metaStorageMgr.registerPrefixWatch(ByteArray.fromString(CATALOG_VER_PREFIX), catalogVersionsListener);
}
//TODO: IGNITE-19080 restore state.
@@ -95,7 +107,7 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
/** {@inheritDoc} */
@Override
public TableDescriptor table(String tableName, long timestamp) {
- return catalogAt(timestamp).schema(CatalogService.PUBLIC).table(tableName);
+ return catalogAt(timestamp).schema(CatalogUtils.DEFAULT_SCHEMA).table(tableName);
}
/** {@inheritDoc} */
@@ -125,13 +137,13 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
return null;
}
- return catalog.schema(CatalogService.PUBLIC);
+ return catalog.schema(CatalogUtils.DEFAULT_SCHEMA);
}
/** {@inheritDoc} */
@Override
public @Nullable SchemaDescriptor activeSchema(long timestamp) {
- return catalogAt(timestamp).schema(CatalogService.PUBLIC);
+ return catalogAt(timestamp).schema(CatalogUtils.DEFAULT_SCHEMA);
}
private CatalogDescriptor catalog(int version) {
@@ -139,7 +151,7 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
}
private CatalogDescriptor catalogAt(long timestamp) {
- Entry<Long, CatalogDescriptor> entry = catalogByTs.floorEntry(timestamp);
+ Map.Entry<Long, CatalogDescriptor> entry = catalogByTs.floorEntry(timestamp);
if (entry == null) {
throw new IllegalStateException("No valid schema found for given timestamp: " + timestamp);
@@ -151,7 +163,7 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
/**
* MetaStorage event listener for catalog metadata updates.
*/
- private static class CatalogEventListener implements WatchListener {
+ private class CatalogEventListener implements WatchListener {
/** {@inheritDoc} */
@Override
public String id() {
@@ -161,6 +173,44 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
+ assert event.single();
+
+ EntryEvent entryEvent = event.entryEvent();
+
+ if (entryEvent.newEntry() != null) {
+ Object obj = ByteUtils.fromBytes(entryEvent.newEntry().value());
+
+ assert obj instanceof TableDescriptor;
+
+ TableDescriptor tableDesc = (TableDescriptor) obj;
+
+ // TODO: Add catalog version to event.
+ int ver = catalogByVer.lastKey() + 1;
+
+ CatalogDescriptor catalog = catalogByVer.get(ver - 1);
+
+ SchemaDescriptor schema = catalog.schema(tableDesc.schemaName());
+
+ CatalogDescriptor newCatalog = new CatalogDescriptor(
+ ver,
+ System.currentTimeMillis(),
+ new SchemaDescriptor(
+ schema.id(),
+ schema.name(),
+ ver,
+ ArrayUtils.concat(schema.tables(), tableDesc),
+ schema.indexes()
+ )
+ );
+
+ registerCatalog(newCatalog);
+
+ CompletableFuture<Boolean> rmv = futMap.remove(ver);
+ if (rmv != null) {
+ rmv.complete(true);
+ }
+ }
+
return completedFuture(null);
}
@@ -173,53 +223,69 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager {
/** {@inheritDoc} */
@Override
- public CompletableFuture<?> createTable(CreateTableParams params) {
- // Creates TableDescriptor and saves it to MetaStorage.
+ public CompletableFuture<Boolean> createTable(CreateTableParams params) {
+ // Creates diff from params, then save to metastorage.
// Atomically:
- // int id = metaStorage.get("lastId") + 1;
- // TableDescriptor table = new TableDescriptor(tableId, params)
+ // int newVer = metaStorage.get("lastVer") + 1;
//
- // Catalog newCatalog = catalogByVer.get(id -1).copy()
- // newCatalog.setId(id).addTable(table);
+ // validate(params);
+ // Object diff = createDiff(catalog, params);
//
- // metaStorage.put("catalog-"+id, newCatalog);
- // metaStorage.put("lastId", id);
+ // metaStorage.put("catalog.ver." + newVer, diff);
+ // metaStorage.put("lastVer", newVer);
- // Dummy implementation.
- synchronized (this) {
- CatalogDescriptor catalog = catalogByVer.lastEntry().getValue();
+ ByteArray LAST_VER_KEY = ByteArray.fromString("catalog.lastVer");
+ ByteArray TABLE_ID_KEY = ByteArray.fromString("catalog.tableId");
- //TODO: IGNITE-19081 Add validation.
- String schemaName = Objects.requireNonNullElse(params.schemaName(), CatalogService.PUBLIC);
+ return metaStorageMgr.getAll(Set.of(LAST_VER_KEY, TABLE_ID_KEY))
+ .thenCompose(entries -> {
+ Entry lastVerEntry = entries.get(LAST_VER_KEY);
+ Entry tableIdEntry = entries.get(TABLE_ID_KEY);
- SchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName), "No schema found: " + schemaName);
+ int lastVer = lastVerEntry.empty() ? 0 : ByteUtils.bytesToInt(lastVerEntry.value());
+ int tableId = tableIdEntry.empty() ? 0 : ByteUtils.bytesToInt(tableIdEntry.value());
- if (schema.table(params.tableName()) != null) {
- return params.ifTableExists()
- ? completedFuture(false)
- : failedFuture(new TableAlreadyExistsException(schemaName, params.tableName()));
- }
+ int newVer = lastVer + 1;
+ int newTableId = tableId + 1;
- int newVersion = catalogByVer.lastKey() + 1;
+ CatalogDescriptor catalog = catalogByVer.get(lastVer);
- TableDescriptor table = CatalogUtils.fromParams(TABLE_ID_GEN.incrementAndGet(), params);
+ assert catalog.table(newTableId) == null;
- CatalogDescriptor newCatalog = new CatalogDescriptor(
- newVersion,
- System.currentTimeMillis(),
- new SchemaDescriptor(
- schema.id(),
- schemaName,
- newVersion,
- ArrayUtils.concat(schema.tables(), table),
- schema.indexes()
- )
- );
+ TableDescriptor tableDesc = CatalogUtils.fromParams(newTableId, params);
- registerCatalog(newCatalog);
- }
+ // params.validate(catalog); ???
+ // validate(catalog, table);
+
+ SchemaDescriptor schema = catalog.schema(tableDesc.schemaName());
+
+ if (schema.table(tableDesc.name()) != null) {
+ return params.ifTableExists()
+ ? completedFuture(false)
+ : failedFuture(new TableAlreadyExistsException(tableDesc.schemaName(), tableDesc.name()));
+ }
+
+ CompletableFuture<Boolean> opFut = new CompletableFuture<>();
+
+ if (futMap.putIfAbsent(newVer, opFut) != null) {
+ return completedFuture(null).thenComposeAsync(ignore -> createTable(params), executorService);
+ }
- return completedFuture(true);
+ return metaStorageMgr.invoke(
+ Statements.iif(
+ Conditions.value(LAST_VER_KEY).eq(lastVerEntry.value()),
+ Operations.ops(
+ Operations.put(LAST_VER_KEY, ByteUtils.intToBytes(newVer)),
+ Operations.put(TABLE_ID_KEY, ByteUtils.intToBytes(newTableId)),
+ Operations.put(ByteArray.fromString(CATALOG_VER_PREFIX + newVer), ByteUtils.toBytes(tableDesc))
+ ).yield(true),
+ Operations.ops().yield(false)
+ )
+ ).thenComposeAsync(
+ res -> res.getAsBoolean() ? opFut.thenApply(ignore -> true) : createTable(params),
+ executorService
+ );
+ });
}
/** {@inheritDoc} */
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
index 1ead944291..eff8c1e1cc 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.catalog.commands;
+import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
@@ -25,6 +26,8 @@ import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
* Catalog utils.
*/
public class CatalogUtils {
+ public static final String DEFAULT_SCHEMA = "PUBLIC";
+
/**
* Converts CreateTable command params to descriptor.
*
@@ -34,6 +37,7 @@ public class CatalogUtils {
*/
public static TableDescriptor fromParams(int id, CreateTableParams params) {
return new TableDescriptor(id,
+ Objects.requireNonNullElse(params.schemaName(), DEFAULT_SCHEMA),
params.tableName(),
params.columns().stream().map(CatalogUtils::fromParams).collect(Collectors.toList()),
params.primaryKeyColumns(),
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java
index a9bfd56168..719cf6a1a0 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java
@@ -38,7 +38,7 @@ public class TableDescriptor extends ObjectDescriptor {
private final int zoneId = 0;
private final int engineId = 0;
-
+ private final String schemaName;
private final TableColumnDescriptor[] columns;
private final String[] primaryKeyColumns;
private final String[] colocationColumns;
@@ -50,20 +50,23 @@ public class TableDescriptor extends ObjectDescriptor {
* Constructor.
*
* @param id Table id.
- * @param name Table name.
+ * @param schemaName Schema name.
+ * @param tableName Table name.
* @param columns Table column descriptors.
* @param pkCols Primary key column names.
* @param colocationCols Colocation column names.
*/
public TableDescriptor(
int id,
- String name,
+ String schemaName,
+ String tableName,
List<TableColumnDescriptor> columns,
List<String> pkCols,
@Nullable List<String> colocationCols
) {
- super(id, Type.TABLE, name);
+ super(id, Type.TABLE, tableName);
+ this.schemaName = Objects.requireNonNull(schemaName);
this.columns = Objects.requireNonNull(columns, "No columns defined.").toArray(TableColumnDescriptor[]::new);
primaryKeyColumns = Objects.requireNonNull(pkCols, "No primary key columns.").toArray(String[]::new);
colocationColumns = colocationCols == null ? primaryKeyColumns : colocationCols.toArray(String[]::new);
@@ -80,6 +83,10 @@ public class TableDescriptor extends ObjectDescriptor {
assert primaryKeyColumns == colocationColumns || Set.of(primaryKeyColumns).containsAll(List.of(colocationColumns));
}
+ public String schemaName(){
+ return schemaName;
+ }
+
public int zoneId() {
return zoneId;
}
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
index 8953e5737c..2c0495e0e3 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
@@ -23,33 +23,63 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.commands.CreateTableParams;
import org.apache.ignite.internal.catalog.commands.DefaultValue;
import org.apache.ignite.internal.catalog.descriptors.SchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.impl.StandaloneMetastorageManager;
+import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher;
+import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
+import org.junit.jupiter.api.extension.ExtendWith;
/**
* Catalog service self test.
*/
+@ExtendWith(SystemPropertiesExtension.class)
+@WithSystemProperty(key = CatalogService.IGNITE_USE_CATALOG_PROPERTY, value = "true")
public class CatalogServiceSelfTest {
private static final String TABLE_NAME = "myTable";
+ private MetaStorageManager metaStorageManager;
+ private CatalogServiceImpl service;
- @Test
- public void testEmptyCatalog() {
- CatalogServiceImpl service = new CatalogServiceImpl(Mockito.mock(MetaStorageManager.class));
+ @BeforeEach
+ public void initCatalogService() {
+ metaStorageManager = StandaloneMetastorageManager.create();
+ service = new CatalogServiceImpl(metaStorageManager);
+
+ metaStorageManager.start();
service.start();
+ try {
+ metaStorageManager.deployWatches();
+ } catch (NodeStoppingException e) {
+ fail(e);
+ }
+ }
+
+ @AfterEach
+ public void cleanupResources() throws Exception {
+ service.stop();
+ metaStorageManager.stop();
+ }
+
+ @Test
+ public void testEmptyCatalog() {
assertNotNull(service.activeSchema(System.currentTimeMillis()));
assertNotNull(service.schema(0));
@@ -60,7 +90,7 @@ public class CatalogServiceSelfTest {
assertNull(service.index(0, System.currentTimeMillis()));
SchemaDescriptor schema = service.schema(0);
- assertEquals(CatalogService.PUBLIC, schema.name());
+ assertEquals(CatalogUtils.DEFAULT_SCHEMA, schema.name());
assertEquals(0, schema.version());
assertEquals(0, schema.tables().length);
@@ -69,9 +99,6 @@ public class CatalogServiceSelfTest {
@Test
public void testCreateTable() {
- CatalogServiceImpl service = new CatalogServiceImpl(Mockito.mock(MetaStorageManager.class));
- service.start();
-
CreateTableParams params = CreateTableParams.builder()
.schemaName("PUBLIC")
.tableName(TABLE_NAME)
@@ -99,7 +126,7 @@ public class CatalogServiceSelfTest {
assertNotNull(schema);
assertEquals(0, schema.id());
- assertEquals(CatalogService.PUBLIC, schema.name());
+ assertEquals(CatalogUtils.DEFAULT_SCHEMA, schema.name());
assertSame(schema, service.activeSchema(0L));
assertSame(schema, service.activeSchema(123L));
@@ -112,7 +139,7 @@ public class CatalogServiceSelfTest {
assertNotNull(schema);
assertEquals(0, schema.id());
- assertEquals(CatalogService.PUBLIC, schema.name());
+ assertEquals(CatalogUtils.DEFAULT_SCHEMA, schema.name());
assertSame(schema, service.activeSchema(System.currentTimeMillis()));
assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, System.currentTimeMillis()));
@@ -129,9 +156,6 @@ public class CatalogServiceSelfTest {
@Test
public void testCreateTableIfExistsFlag() {
- CatalogServiceImpl service = new CatalogServiceImpl(Mockito.mock(MetaStorageManager.class));
- service.start();
-
CreateTableParams params = CreateTableParams.builder()
.tableName("table1")
.columns(List.of(