You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/08/26 14:03:17 UTC

[ignite-3] 02/02: Fix DdlCommandHandler, TableManager, IndexManager async methods usage.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-17431
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit b543cd9c7a7138c063005451460a3b02d1dacf7c
Author: amashenkov <an...@gmail.com>
AuthorDate: Fri Aug 26 17:02:17 2022 +0300

    Fix DdlCommandHandler, TableManager, IndexManager async methods usage.
---
 .../apache/ignite/internal/index/IndexManager.java | 38 ----------
 .../sql/engine/exec/ExecutionServiceImpl.java      | 21 ++++--
 .../sql/engine/exec/ddl/DdlCommandHandler.java     | 88 +++++++++++-----------
 3 files changed, 58 insertions(+), 89 deletions(-)

diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 322ca04573..8bd34d4f82 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -122,27 +122,6 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
         LOG.info("Index manager stopped");
     }
 
-    /**
-     * Creates index from provided configuration changer.
-     *
-     * @param schemaName A name of the schema to create index in.
-     * @param indexName A name of the index to create.
-     * @param tableName A name of the table to create index for.
-     * @param indexChange A consumer that suppose to change the configuration in order to provide description of an index.
-     * @param failIfExists Flag indicates whether exception be thrown if index exists or not.
-     * @return {@code True} if index was created successfully, {@code false} otherwise.
-     * @throws IndexAlreadyExistsException If index already exists and
-     */
-    public boolean createIndex(
-            String schemaName,
-            String indexName,
-            String tableName,
-            boolean failIfExists,
-            Consumer<TableIndexChange> indexChange
-    ) {
-        return join(createIndexAsync(schemaName, indexName, tableName, failIfExists, indexChange)) != null;
-    }
-
     /**
      * Creates index from provided configuration changer.
      *
@@ -251,23 +230,6 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
         }
     }
 
-    /**
-     * Drops the index with a given name.
-     *
-     * @param schemaName A name of the schema the index belong to.
-     * @param indexName A name of the index to drop.
-     * @param failIfNotExist Flag, which force failure, when {@code trues} if index doen't not exists.
-     * @return {@code True} if index was removed, {@code false} otherwise.
-     * @throws IndexNotFoundException If index doesn't exist and {@code failIfNotExist} param was {@code true}.
-     */
-    public boolean dropIndex(
-            String schemaName,
-            String indexName,
-            boolean failIfNotExist
-    ) {
-        return join(dropIndexAsync(schemaName, indexName, failIfNotExist));
-    }
-
     /**
      * Drops the index with a given name asynchronously.
      *
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index c3109c1862..c5c6c9939e 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -22,8 +22,8 @@ import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFI
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -32,6 +32,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
@@ -249,14 +250,18 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
     }
 
     private AsyncCursor<List<Object>> executeDdl(DdlPlan plan) {
-        try {
-            boolean ret = ddlCmdHnd.handle(plan.command());
+        CompletableFuture<Iterator<List<Object>>> res = ddlCmdHnd.handle(plan.command())
+                .handle((v, err) -> {
+                    if (err == null) {
+                        return List.of(List.of((Object) v)).iterator();
+                    }
 
-            return new AsyncWrapper<>(Collections.singletonList(Collections.<Object>singletonList(ret)).iterator());
-        } catch (IgniteInternalCheckedException e) {
-            throw new IgniteInternalException("Failed to execute DDL statement [stmt=" /*+ qry.sql()*/
-                    + ", err=" + e.getMessage() + ']', e);
-        }
+                    throw new IgniteInternalException(
+                            "Failed to execute DDL statement [stmt=" /*+ qry.sql()*/
+                                    + ", err=" + err.getMessage() + ']', err);
+                });
+
+        return new AsyncWrapper<>(res, ForkJoinPool.commonPool());
     }
 
     private AsyncCursor<List<Object>> executeExplain(ExplainPlan plan) {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index 1f8cbd52bd..b671771142 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.exec.ddl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
 import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convertDefaultToConfiguration;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
@@ -25,7 +27,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -89,7 +93,7 @@ public class DdlCommandHandler {
     }
 
     /** Handles ddl commands. */
-    public boolean handle(DdlCommand cmd) throws IgniteInternalCheckedException {
+    public CompletableFuture<Boolean> handle(DdlCommand cmd) {
         validateCommand(cmd);
 
         if (cmd instanceof CreateTableCommand) {
@@ -105,9 +109,9 @@ public class DdlCommandHandler {
         } else if (cmd instanceof DropIndexCommand) {
             return handleDropIndex((DropIndexCommand) cmd);
         } else {
-            throw new IgniteInternalCheckedException("Unsupported DDL operation ["
+            return failedFuture(new IgniteInternalCheckedException("Unsupported DDL operation ["
                     + "cmdName=" + (cmd == null ? null : cmd.getClass().getSimpleName()) + "; "
-                    + "cmd=\"" + cmd + "\"]");
+                    + "cmd=\"" + cmd + "\"]"));
         }
     }
 
@@ -123,7 +127,7 @@ public class DdlCommandHandler {
     }
 
     /** Handles create table command. */
-    private boolean handleCreateTable(CreateTableCommand cmd) {
+    private CompletableFuture<Boolean> handleCreateTable(CreateTableCommand cmd) {
         Consumer<TableChange> tblChanger = tableChange -> {
             tableChange.changeColumns(columnsChange -> {
                 for (var col : cmd.columns()) {
@@ -159,41 +163,39 @@ public class DdlCommandHandler {
         );
 
         try {
-            tableManager.createTable(fullName, tblChanger);
-
-            return true;
+            return tableManager.createTableAsync(fullName, tblChanger)
+                    .thenApply(t -> Boolean.TRUE);
         } catch (TableAlreadyExistsException ex) {
             if (!cmd.ifTableExists()) {
-                throw ex;
+                return CompletableFuture.failedFuture(ex);
             } else {
-                return false;
+                return completedFuture(Boolean.FALSE);
             }
         }
     }
 
     /** Handles drop table command. */
-    private boolean handleDropTable(DropTableCommand cmd) {
+    private CompletableFuture<Boolean> handleDropTable(DropTableCommand cmd) {
         String fullName = TableDefinitionImpl.canonicalName(
                 IgniteObjectName.quote(cmd.schemaName()),
                 IgniteObjectName.quote(cmd.tableName())
         );
         try {
-            tableManager.dropTable(fullName);
-
-            return true;
+            return tableManager.dropTableAsync(fullName)
+                    .thenApply(v -> Boolean.TRUE);
         } catch (TableNotFoundException ex) {
             if (!cmd.ifTableExists()) {
-                throw ex;
+                return CompletableFuture.failedFuture(ex);
             } else {
-                return false;
+                return completedFuture(Boolean.FALSE);
             }
         }
     }
 
     /** Handles add column command. */
-    private boolean handleAlterAddColumn(AlterTableAddCommand cmd) {
+    private CompletableFuture<Boolean> handleAlterAddColumn(AlterTableAddCommand cmd) {
         if (nullOrEmpty(cmd.columns())) {
-            return false;
+            return completedFuture(Boolean.FALSE);
         }
 
         String fullName = TableDefinitionImpl.canonicalName(
@@ -202,20 +204,20 @@ public class DdlCommandHandler {
         );
 
         try {
-            return addColumnInternal(fullName, cmd.columns(), cmd.ifColumnNotExists());
+            return addColumnInternal(fullName, cmd.columns(), !cmd.ifColumnNotExists());
         } catch (TableNotFoundException ex) {
             if (!cmd.ifTableExists()) {
-                throw ex;
+                return CompletableFuture.failedFuture(ex);
             } else {
-                return false;
+                return completedFuture(Boolean.FALSE);
             }
         }
     }
 
     /** Handles drop column command. */
-    private boolean handleAlterDropColumn(AlterTableDropCommand cmd) {
+    private CompletableFuture<Boolean> handleAlterDropColumn(AlterTableDropCommand cmd) {
         if (nullOrEmpty(cmd.columns())) {
-            return false;
+            return completedFuture(Boolean.FALSE);
         }
 
         String fullName = TableDefinitionImpl.canonicalName(
@@ -227,15 +229,15 @@ public class DdlCommandHandler {
             return dropColumnInternal(fullName, cmd.columns(), cmd.ifColumnExists());
         } catch (TableNotFoundException ex) {
             if (!cmd.ifTableExists()) {
-                throw ex;
+                return CompletableFuture.failedFuture(ex);
             } else {
-                return false;
+                return completedFuture(Boolean.FALSE);
             }
         }
     }
 
     /** Handles create index command. */
-    private boolean handleCreateIndex(CreateIndexCommand cmd) {
+    private CompletableFuture<Boolean> handleCreateIndex(CreateIndexCommand cmd) {
         // Only sorted idx for now.
         //TODO: https://issues.apache.org/jira/browse/IGNITE-17563 Pass null ordering for columns.
         SortedIndexDefinitionBuilder idx = SchemaBuilders.sortedIndex(cmd.indexName());
@@ -250,17 +252,18 @@ public class DdlCommandHandler {
             idx0.done();
         }
 
-        return indexManager.createIndex(
+        return indexManager.createIndexAsync(
                 cmd.schemaName(),
                 cmd.indexName(),
                 cmd.tableName(),
                 !cmd.ifIndexNotExists(),
-                tableIndexChange -> convert(idx.build(), tableIndexChange));
+                tableIndexChange -> convert(idx.build(), tableIndexChange))
+                .thenApply(Objects::nonNull);
     }
 
     /** Handles drop index command. */
-    private boolean handleDropIndex(DropIndexCommand cmd) {
-        return indexManager.dropIndex(cmd.schemaName(), cmd.indexName(), !cmd.ifNotExists());
+    private CompletableFuture<Boolean> handleDropIndex(DropIndexCommand cmd) {
+        return indexManager.dropIndexAsync(cmd.schemaName(), cmd.indexName(), !cmd.ifNotExists());
     }
 
     /**
@@ -268,20 +271,21 @@ public class DdlCommandHandler {
      *
      * @param fullName Table with schema name.
      * @param colsDef  Columns defenitions.
-     * @param colNotExist Flag indicates exceptionally behavior in case of already existing column.
+     * @param failIfExists Flag indicates exceptionally behavior in case of already existing column.
      *
      * @return {@code true} if the full columns set is applied successfully. Otherwise, returns {@code false}.
      */
-    private boolean addColumnInternal(String fullName, List<ColumnDefinition> colsDef, boolean colNotExist) {
+    private CompletableFuture<Boolean> addColumnInternal(String fullName, List<ColumnDefinition> colsDef, boolean failIfExists) {
         AtomicBoolean ret = new AtomicBoolean(true);
-        tableManager.alterTable(
+
+        return tableManager.alterTableAsync(
                 fullName,
                 chng -> chng.changeColumns(cols -> {
                     Map<String, String> colNamesToOrders = columnOrdersToNames(chng.columns());
 
                     List<ColumnDefinition> colsDef0;
 
-                    if (!colNotExist) {
+                    if (failIfExists) {
                         colsDef.stream()
                                 .filter(k -> colNamesToOrders.containsKey(k.name()))
                                 .findAny()
@@ -305,9 +309,8 @@ public class DdlCommandHandler {
                     for (ColumnDefinition col : colsDef0) {
                         cols.create(col.name(), colChg -> convertColumnDefinition(col, colChg));
                     }
-                }));
-
-        return ret.get();
+                }))
+                .thenApply(v -> ret.get());
     }
 
     private void convertColumnDefinition(ColumnDefinition definition, ColumnChange columnChange) {
@@ -349,13 +352,13 @@ public class DdlCommandHandler {
      *
      * @param fullName Table with schema name.
      * @param colNames Columns definitions.
-     * @param colExist Flag indicates exceptionally behavior in case of already existing column.
+     * @param failIfNotExists Flag indicates exceptionally behavior in case of already existing column.
      * @return {@code true} if the full columns set is applied successfully. Otherwise, returns {@code false}.
      */
-    private boolean dropColumnInternal(String fullName, Set<String> colNames, boolean colExist) {
+    private CompletableFuture<Boolean> dropColumnInternal(String fullName, Set<String> colNames, boolean failIfNotExists) {
         AtomicBoolean ret = new AtomicBoolean(true);
 
-        tableManager.alterTable(
+        return tableManager.alterTableAsync(
                 fullName,
                 chng -> chng.changeColumns(cols -> {
                     PrimaryKeyView priKey = chng.primaryKey();
@@ -370,7 +373,7 @@ public class DdlCommandHandler {
                         if (!colNamesToOrders.containsKey(colName)) {
                             ret.set(false);
 
-                            if (!colExist) {
+                            if (!failIfNotExists) {
                                 throw new ColumnNotFoundException(colName, fullName);
                             }
                         } else {
@@ -384,9 +387,8 @@ public class DdlCommandHandler {
                     }
 
                     colNames0.forEach(k -> cols.delete(colNamesToOrders.get(k)));
-                }));
-
-        return ret.get();
+                }))
+                .thenApply(v -> ret.get());
     }
 
     /** Map column name to order. */