You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/10/03 13:42:31 UTC

[ignite-3] branch main updated: IGNITE-17686 Revisit and fix exception handling in futures chains in sql-engine module (#1118)

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e7df5e28ec IGNITE-17686 Revisit and fix exception handling in futures chains in sql-engine module (#1118)
e7df5e28ec is described below

commit e7df5e28ec228e93af9eba831c1f495071a3a42b
Author: Andrew V. Mashenkov <AM...@users.noreply.github.com>
AuthorDate: Mon Oct 3 16:42:26 2022 +0300

    IGNITE-17686 Revisit and fix exception handling in futures chains in sql-engine module (#1118)
---
 .../internal/sql/engine/SqlQueryProcessor.java     |  1 +
 .../sql/engine/exec/ddl/DdlCommandHandler.java     |  4 +++
 .../sql/engine/schema/SqlSchemaManagerImpl.java    | 30 ++++++++++------------
 3 files changed, 19 insertions(+), 16 deletions(-)

diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 7009e39cb2..d487f57ada 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -473,6 +473,7 @@ public class SqlQueryProcessor implements QueryProcessor {
                     .parameters(params)
                     .build();
 
+            // TODO https://issues.apache.org/jira/browse/IGNITE-17746 Fix query execution flow.
             CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start.thenCompose(none -> prepareSvc.prepareAsync(sqlNode, ctx))
                     .thenApply(plan -> {
                         context.maybeUnwrap(QueryValidator.class)
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index d0a2739d31..ca84b5d3b0 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -308,6 +308,8 @@ public class DdlCommandHandler {
         return tableManager.alterTableAsync(
                 fullName,
                 chng -> chng.changeColumns(cols -> {
+                    ret.set(true); // Reset state if closure have been restarted.
+
                     Map<String, String> colNamesToOrders = columnOrdersToNames(chng.columns());
 
                     List<ColumnDefinition> colsDef0;
@@ -388,6 +390,8 @@ public class DdlCommandHandler {
         return tableManager.alterTableAsync(
                         fullName,
                         chng -> chng.changeColumns(cols -> {
+                            ret.set(true); // Reset state if closure have been restarted.
+
                             PrimaryKeyView priKey = chng.primaryKey();
 
                             Map<String, String> colNamesToOrders = columnOrdersToNames(chng.columns());
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index ff3822adf7..503721920b 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -227,7 +227,7 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager {
                                     Map<UUID, InternalIgniteTable> resTbls = new HashMap<>(tables);
 
                                     return igniteTableFuture
-                                            .thenApply(igniteTable -> inBusyLock(busyLock, () -> {
+                                            .thenApply(igniteTable -> {
                                                 InternalIgniteTable oldTable = resTbls.put(igniteTable.id(), igniteTable);
 
                                                 // looks like this is UPDATE operation
@@ -238,17 +238,13 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager {
                                                 }
 
                                                 return resTbls;
-                                            }));
+                                            });
                                 }))
-                        .thenCombine(
-                                igniteTableFuture,
-                                (v, igniteTable) -> inBusyLock(busyLock, () -> {
-                                            schema.addTable(objectSimpleName(schemaName, table.name()), igniteTable);
-
-                                            return null;
-                                        }
-                                )).thenCompose(v -> inBusyLock(busyLock, () -> completedFuture(res)));
+                        .thenCombine(igniteTableFuture, (v, igniteTable) -> {
+                            schema.addTable(objectSimpleName(schemaName, table.name()), igniteTable);
 
+                            return res;
+                        });
             }));
 
             return calciteSchemaVv.get(causalityToken);
@@ -309,7 +305,7 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager {
                         resTbls.remove(table.id());
 
                         return completedFuture(resTbls);
-                    })).thenCompose(tables -> inBusyLock(busyLock, () -> completedFuture(res)));
+                    })).thenCompose(tables -> completedFuture(res));
                 }
 
                 return completedFuture(res);
@@ -436,15 +432,17 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager {
 
                                         return CompletableFuture.completedFuture(resIdxs);
                                     })
-                            ).thenRun(() -> inBusyLock(busyLock, () -> {
+                            ).thenCompose(ignore -> {
                                 String tblName = tableNameById(schema, index.tableId());
 
                                 table.addIndex(schemaIndex);
                                 schema.addTable(tblName, table);
                                 schema.addIndex(index.id(), schemaIndex);
-                            })).thenCompose(ignored -> inBusyLock(busyLock, () -> completedFuture(resTbls)));
+
+                                return completedFuture(resTbls);
+                            });
                         })
-                ).thenCompose(v -> inBusyLock(busyLock, () -> completedFuture(res)));
+                ).thenCompose(v -> completedFuture(res));
             }));
 
             return calciteSchemaVv.get(causalityToken);
@@ -518,9 +516,9 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager {
 
                                             return completedFuture(resIdxs);
                                         }
-                                )).thenCompose(v -> inBusyLock(busyLock, () -> completedFuture(resTbls)));
+                                )).thenCompose(v -> completedFuture(resTbls));
                             })
-                    ).thenCompose(v -> inBusyLock(busyLock, () -> completedFuture(res)));
+                    ).thenCompose(v -> completedFuture(res));
                 }
 
                 return completedFuture(res);