You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/08/26 14:03:17 UTC
[ignite-3] 02/02: Fix DdlCommandHandler, TableManager, IndexManager async methods usage.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-17431
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit b543cd9c7a7138c063005451460a3b02d1dacf7c
Author: amashenkov <an...@gmail.com>
AuthorDate: Fri Aug 26 17:02:17 2022 +0300
Fix DdlCommandHandler, TableManager, IndexManager async methods usage.
---
.../apache/ignite/internal/index/IndexManager.java | 38 ----------
.../sql/engine/exec/ExecutionServiceImpl.java | 21 ++++--
.../sql/engine/exec/ddl/DdlCommandHandler.java | 88 +++++++++++-----------
3 files changed, 58 insertions(+), 89 deletions(-)
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 322ca04573..8bd34d4f82 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -122,27 +122,6 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
LOG.info("Index manager stopped");
}
- /**
- * Creates index from provided configuration changer.
- *
- * @param schemaName A name of the schema to create index in.
- * @param indexName A name of the index to create.
- * @param tableName A name of the table to create index for.
- * @param indexChange A consumer that suppose to change the configuration in order to provide description of an index.
- * @param failIfExists Flag indicates whether exception be thrown if index exists or not.
- * @return {@code True} if index was created successfully, {@code false} otherwise.
- * @throws IndexAlreadyExistsException If index already exists and
- */
- public boolean createIndex(
- String schemaName,
- String indexName,
- String tableName,
- boolean failIfExists,
- Consumer<TableIndexChange> indexChange
- ) {
- return join(createIndexAsync(schemaName, indexName, tableName, failIfExists, indexChange)) != null;
- }
-
/**
* Creates index from provided configuration changer.
*
@@ -251,23 +230,6 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
}
}
- /**
- * Drops the index with a given name.
- *
- * @param schemaName A name of the schema the index belong to.
- * @param indexName A name of the index to drop.
- * @param failIfNotExist Flag, which force failure, when {@code trues} if index doen't not exists.
- * @return {@code True} if index was removed, {@code false} otherwise.
- * @throws IndexNotFoundException If index doesn't exist and {@code failIfNotExist} param was {@code true}.
- */
- public boolean dropIndex(
- String schemaName,
- String indexName,
- boolean failIfNotExist
- ) {
- return join(dropIndexAsync(schemaName, indexName, failIfNotExist));
- }
-
/**
* Drops the index with a given name asynchronously.
*
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index c3109c1862..c5c6c9939e 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -22,8 +22,8 @@ import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFI
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -32,6 +32,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
@@ -249,14 +250,18 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
}
private AsyncCursor<List<Object>> executeDdl(DdlPlan plan) {
- try {
- boolean ret = ddlCmdHnd.handle(plan.command());
+ CompletableFuture<Iterator<List<Object>>> res = ddlCmdHnd.handle(plan.command())
+ .handle((v, err) -> {
+ if (err == null) {
+ return List.of(List.of((Object) v)).iterator();
+ }
- return new AsyncWrapper<>(Collections.singletonList(Collections.<Object>singletonList(ret)).iterator());
- } catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalException("Failed to execute DDL statement [stmt=" /*+ qry.sql()*/
- + ", err=" + e.getMessage() + ']', e);
- }
+ throw new IgniteInternalException(
+ "Failed to execute DDL statement [stmt=" /*+ qry.sql()*/
+ + ", err=" + err.getMessage() + ']', err);
+ });
+
+ return new AsyncWrapper<>(res, ForkJoinPool.commonPool());
}
private AsyncCursor<List<Object>> executeExplain(ExplainPlan plan) {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index 1f8cbd52bd..b671771142 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.sql.engine.exec.ddl;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convertDefaultToConfiguration;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
@@ -25,7 +27,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -89,7 +93,7 @@ public class DdlCommandHandler {
}
/** Handles ddl commands. */
- public boolean handle(DdlCommand cmd) throws IgniteInternalCheckedException {
+ public CompletableFuture<Boolean> handle(DdlCommand cmd) {
validateCommand(cmd);
if (cmd instanceof CreateTableCommand) {
@@ -105,9 +109,9 @@ public class DdlCommandHandler {
} else if (cmd instanceof DropIndexCommand) {
return handleDropIndex((DropIndexCommand) cmd);
} else {
- throw new IgniteInternalCheckedException("Unsupported DDL operation ["
+ return failedFuture(new IgniteInternalCheckedException("Unsupported DDL operation ["
+ "cmdName=" + (cmd == null ? null : cmd.getClass().getSimpleName()) + "; "
- + "cmd=\"" + cmd + "\"]");
+ + "cmd=\"" + cmd + "\"]"));
}
}
@@ -123,7 +127,7 @@ public class DdlCommandHandler {
}
/** Handles create table command. */
- private boolean handleCreateTable(CreateTableCommand cmd) {
+ private CompletableFuture<Boolean> handleCreateTable(CreateTableCommand cmd) {
Consumer<TableChange> tblChanger = tableChange -> {
tableChange.changeColumns(columnsChange -> {
for (var col : cmd.columns()) {
@@ -159,41 +163,39 @@ public class DdlCommandHandler {
);
try {
- tableManager.createTable(fullName, tblChanger);
-
- return true;
+ return tableManager.createTableAsync(fullName, tblChanger)
+ .thenApply(t -> Boolean.TRUE);
} catch (TableAlreadyExistsException ex) {
if (!cmd.ifTableExists()) {
- throw ex;
+ return CompletableFuture.failedFuture(ex);
} else {
- return false;
+ return completedFuture(Boolean.FALSE);
}
}
}
/** Handles drop table command. */
- private boolean handleDropTable(DropTableCommand cmd) {
+ private CompletableFuture<Boolean> handleDropTable(DropTableCommand cmd) {
String fullName = TableDefinitionImpl.canonicalName(
IgniteObjectName.quote(cmd.schemaName()),
IgniteObjectName.quote(cmd.tableName())
);
try {
- tableManager.dropTable(fullName);
-
- return true;
+ return tableManager.dropTableAsync(fullName)
+ .thenApply(v -> Boolean.TRUE);
} catch (TableNotFoundException ex) {
if (!cmd.ifTableExists()) {
- throw ex;
+ return CompletableFuture.failedFuture(ex);
} else {
- return false;
+ return completedFuture(Boolean.FALSE);
}
}
}
/** Handles add column command. */
- private boolean handleAlterAddColumn(AlterTableAddCommand cmd) {
+ private CompletableFuture<Boolean> handleAlterAddColumn(AlterTableAddCommand cmd) {
if (nullOrEmpty(cmd.columns())) {
- return false;
+ return completedFuture(Boolean.FALSE);
}
String fullName = TableDefinitionImpl.canonicalName(
@@ -202,20 +204,20 @@ public class DdlCommandHandler {
);
try {
- return addColumnInternal(fullName, cmd.columns(), cmd.ifColumnNotExists());
+ return addColumnInternal(fullName, cmd.columns(), !cmd.ifColumnNotExists());
} catch (TableNotFoundException ex) {
if (!cmd.ifTableExists()) {
- throw ex;
+ return CompletableFuture.failedFuture(ex);
} else {
- return false;
+ return completedFuture(Boolean.FALSE);
}
}
}
/** Handles drop column command. */
- private boolean handleAlterDropColumn(AlterTableDropCommand cmd) {
+ private CompletableFuture<Boolean> handleAlterDropColumn(AlterTableDropCommand cmd) {
if (nullOrEmpty(cmd.columns())) {
- return false;
+ return completedFuture(Boolean.FALSE);
}
String fullName = TableDefinitionImpl.canonicalName(
@@ -227,15 +229,15 @@ public class DdlCommandHandler {
return dropColumnInternal(fullName, cmd.columns(), cmd.ifColumnExists());
} catch (TableNotFoundException ex) {
if (!cmd.ifTableExists()) {
- throw ex;
+ return CompletableFuture.failedFuture(ex);
} else {
- return false;
+ return completedFuture(Boolean.FALSE);
}
}
}
/** Handles create index command. */
- private boolean handleCreateIndex(CreateIndexCommand cmd) {
+ private CompletableFuture<Boolean> handleCreateIndex(CreateIndexCommand cmd) {
// Only sorted idx for now.
//TODO: https://issues.apache.org/jira/browse/IGNITE-17563 Pass null ordering for columns.
SortedIndexDefinitionBuilder idx = SchemaBuilders.sortedIndex(cmd.indexName());
@@ -250,17 +252,18 @@ public class DdlCommandHandler {
idx0.done();
}
- return indexManager.createIndex(
+ return indexManager.createIndexAsync(
cmd.schemaName(),
cmd.indexName(),
cmd.tableName(),
!cmd.ifIndexNotExists(),
- tableIndexChange -> convert(idx.build(), tableIndexChange));
+ tableIndexChange -> convert(idx.build(), tableIndexChange))
+ .thenApply(Objects::nonNull);
}
/** Handles drop index command. */
- private boolean handleDropIndex(DropIndexCommand cmd) {
- return indexManager.dropIndex(cmd.schemaName(), cmd.indexName(), !cmd.ifNotExists());
+ private CompletableFuture<Boolean> handleDropIndex(DropIndexCommand cmd) {
+ return indexManager.dropIndexAsync(cmd.schemaName(), cmd.indexName(), !cmd.ifNotExists());
}
/**
@@ -268,20 +271,21 @@ public class DdlCommandHandler {
*
* @param fullName Table with schema name.
* @param colsDef Columns defenitions.
- * @param colNotExist Flag indicates exceptionally behavior in case of already existing column.
+ * @param failIfExists Flag indicates exceptionally behavior in case of already existing column.
*
* @return {@code true} if the full columns set is applied successfully. Otherwise, returns {@code false}.
*/
- private boolean addColumnInternal(String fullName, List<ColumnDefinition> colsDef, boolean colNotExist) {
+ private CompletableFuture<Boolean> addColumnInternal(String fullName, List<ColumnDefinition> colsDef, boolean failIfExists) {
AtomicBoolean ret = new AtomicBoolean(true);
- tableManager.alterTable(
+
+ return tableManager.alterTableAsync(
fullName,
chng -> chng.changeColumns(cols -> {
Map<String, String> colNamesToOrders = columnOrdersToNames(chng.columns());
List<ColumnDefinition> colsDef0;
- if (!colNotExist) {
+ if (failIfExists) {
colsDef.stream()
.filter(k -> colNamesToOrders.containsKey(k.name()))
.findAny()
@@ -305,9 +309,8 @@ public class DdlCommandHandler {
for (ColumnDefinition col : colsDef0) {
cols.create(col.name(), colChg -> convertColumnDefinition(col, colChg));
}
- }));
-
- return ret.get();
+ }))
+ .thenApply(v -> ret.get());
}
private void convertColumnDefinition(ColumnDefinition definition, ColumnChange columnChange) {
@@ -349,13 +352,13 @@ public class DdlCommandHandler {
*
* @param fullName Table with schema name.
* @param colNames Columns definitions.
- * @param colExist Flag indicates exceptionally behavior in case of already existing column.
+ * @param failIfNotExists Flag indicates exceptionally behavior in case of already existing column.
* @return {@code true} if the full columns set is applied successfully. Otherwise, returns {@code false}.
*/
- private boolean dropColumnInternal(String fullName, Set<String> colNames, boolean colExist) {
+ private CompletableFuture<Boolean> dropColumnInternal(String fullName, Set<String> colNames, boolean failIfNotExists) {
AtomicBoolean ret = new AtomicBoolean(true);
- tableManager.alterTable(
+ return tableManager.alterTableAsync(
fullName,
chng -> chng.changeColumns(cols -> {
PrimaryKeyView priKey = chng.primaryKey();
@@ -370,7 +373,7 @@ public class DdlCommandHandler {
if (!colNamesToOrders.containsKey(colName)) {
ret.set(false);
- if (!colExist) {
+ if (!failIfNotExists) {
throw new ColumnNotFoundException(colName, fullName);
}
} else {
@@ -384,9 +387,8 @@ public class DdlCommandHandler {
}
colNames0.forEach(k -> cols.delete(colNamesToOrders.get(k)));
- }));
-
- return ret.get();
+ }))
+ .thenApply(v -> ret.get());
}
/** Map column name to order. */