You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pp...@apache.org on 2023/10/19 10:54:50 UTC

[ignite-3] branch main updated: IGNITE-20454 Sql. Added a callback that is notified when data prefetching is complete (#2674)

This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 87792091f1 IGNITE-20454 Sql. Added a callback that is notified when data prefetching is complete (#2674)
87792091f1 is described below

commit 87792091f1fef453c191dd93e5ea24a47f82dfea
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Thu Oct 19 13:54:44 2023 +0300

    IGNITE-20454 Sql. Added a callback that is notified when data prefetching is complete (#2674)
---
 .../internal/sql/engine/QueryPrefetchCallback.java |  46 ++++++++
 .../sql/engine/exec/ExecutionServiceImpl.java      |  24 +++-
 .../sql/engine/exec/rel/AsyncRootNode.java         |  28 ++++-
 .../internal/sql/engine/util/BaseQueryContext.java |  20 +++-
 .../sql/engine/exec/ExecutionServiceImplTest.java  | 123 ++++++++++++++++++++-
 5 files changed, 230 insertions(+), 11 deletions(-)

diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryPrefetchCallback.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryPrefetchCallback.java
new file mode 100644
index 0000000000..7ef6a7604c
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryPrefetchCallback.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
+import org.apache.ignite.internal.sql.engine.exec.rel.AsyncRootNode;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlCommand;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Callback that is invoked when the query finishes prefetching. It is designed
+ * to allow sequential execution of SQL statements that are dependent on each other.
+ *
+ * <ol>
+ *     <li>For {@code DML} queries, it is called after the cursor has finished prefetching
+ *     the initial batch of rows (see {@link AsyncRootNode#startPrefetch}).</li>
+ *     <li>For {@code DDL} queries, it is called after the corresponding DDL
+ *     command has completed (see {@link DdlCommandHandler#handle(DdlCommand)}.</li>
+ * </ol>
+ *
+ * <p>This callback is invoked asynchronously in the "{@code execution pool}".
+ */
+@FunctionalInterface
+public interface QueryPrefetchCallback {
+    /**
+     * Called when the query finishes prefetching.
+     *
+     * @param ex Exceptional completion cause, or {@code null} if prefetch completed successfully.
+     */
+    void onPrefetchComplete(@Nullable Throwable ex);
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index aa4c88cb73..ca0be42a80 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.sql.engine.NodeLeftException;
 import org.apache.ignite.internal.sql.engine.QueryCancelledException;
+import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
 import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
@@ -268,9 +269,9 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
             case QUERY:
                 return executeQuery(tx, ctx, (MultiStepPlan) plan);
             case EXPLAIN:
-                return executeExplain((ExplainPlan) plan);
+                return executeExplain((ExplainPlan) plan, ctx.prefetchCallback());
             case DDL:
-                return executeDdl((DdlPlan) plan);
+                return executeDdl((DdlPlan) plan, ctx.prefetchCallback());
 
             default:
                 throw new AssertionError("Unexpected query type: " + plan);
@@ -288,13 +289,17 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
         return mgr.close(true);
     }
 
-    private AsyncCursor<List<Object>> executeDdl(DdlPlan plan) {
+    private AsyncCursor<List<Object>> executeDdl(DdlPlan plan, @Nullable QueryPrefetchCallback callback) {
         CompletableFuture<Iterator<List<Object>>> ret = ddlCmdHnd.handle(plan.command())
                 .thenApply(applied -> List.of(List.<Object>of(applied)).iterator())
                 .exceptionally(th -> {
                     throw convertDdlException(th);
                 });
 
+        if (callback != null) {
+            ret.whenCompleteAsync((res, err) -> callback.onPrefetchComplete(err), taskExecutor);
+        }
+
         return new AsyncWrapper<>(ret, Runnable::run);
     }
 
@@ -315,9 +320,13 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
         return (e instanceof RuntimeException) ? (RuntimeException) e : new IgniteInternalException(INTERNAL_ERR, e);
     }
 
-    private AsyncCursor<List<Object>> executeExplain(ExplainPlan plan) {
+    private AsyncCursor<List<Object>> executeExplain(ExplainPlan plan, @Nullable QueryPrefetchCallback callback) {
         List<List<Object>> res = List.of(List.of(plan.plan()));
 
+        if (callback != null) {
+            taskExecutor.execute(() -> callback.onPrefetchComplete(null));
+        }
+
         return new AsyncWrapper<>(res.iterator());
     }
 
@@ -555,7 +564,12 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
                 });
                 node.onRegister(rootNode);
 
-                rootNode.prefetch();
+                CompletableFuture<Void> prefetchFut = rootNode.startPrefetch();
+                QueryPrefetchCallback callback = ctx.prefetchCallback();
+
+                if (callback != null) {
+                    prefetchFut.whenCompleteAsync((res, err) -> callback.onPrefetchComplete(err), taskExecutor);
+                }
 
                 root.complete(rootNode);
             }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
index d21c541c79..b1350a8227 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
@@ -31,6 +31,7 @@ import java.util.function.Function;
 import org.apache.ignite.internal.sql.engine.QueryCancelledException;
 import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.sql.CursorClosedException;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * An async iterator over the execution tree.
@@ -56,6 +57,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async
 
     private final Queue<PendingRequest<OutRowT>> pendingRequests = new ConcurrentLinkedQueue<>();
 
+    private final CompletableFuture<Void> prefetchFut = new CompletableFuture<>();
+
     private volatile boolean closed = false;
 
     /**
@@ -169,6 +172,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async
                         }
                     }, source::onError);
 
+                    completePrefetchFuture(th);
+
                     closed = true;
                 }
             }
@@ -181,8 +186,10 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async
      * Starts the execution of the fragment and keeps the result in the intermediate buffer.
      *
      * <p>Note: this method must be called by the same thread that will execute the whole fragment.
+     *
+     * @return Future representing pending completion of the prefetch operation.
      */
-    public void prefetch() {
+    public CompletableFuture<Void> startPrefetch() {
         assert source.context().description().prefetch();
 
         if (waiting == 0) {
@@ -192,9 +199,13 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async
                 onError(ex);
             }
         }
+
+        return prefetchFut;
     }
 
     private void flush() throws Exception {
+        completePrefetchFuture(null);
+
         // flush may be triggered by prefetching, so let's do nothing in this case
         if (pendingRequests.isEmpty()) {
             return;
@@ -234,6 +245,21 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async
         }
     }
 
+    /**
+     * Completes prefetch future if it has not already been completed.
+     *
+     * @param ex Exceptional completion cause or {@code null} if the future must complete successfully.
+     */
+    private void completePrefetchFuture(@Nullable Throwable ex) {
+        if (!prefetchFut.isDone()) {
+            if (ex != null) {
+                prefetchFut.completeExceptionally(ex);
+            } else {
+                prefetchFut.complete(null);
+            }
+        }
+    }
+
     private static class PendingRequest<OutRowT> {
         /**
          * A future to complete when {@link #buff buffer} will be filled.
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
index b6f46a7e3b..7c2d517c85 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
@@ -50,6 +50,7 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
+import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
 import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.sql.engine.rex.IgniteRexBuilder;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
@@ -152,6 +153,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
 
     private final Object[] parameters;
 
+    private final QueryPrefetchCallback prefetchCallback;
+
     private CalciteCatalogReader catalogReader;
 
     /**
@@ -162,7 +165,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
             FrameworkConfig cfg,
             QueryCancel cancel,
             Object[] parameters,
-            IgniteLogger log
+            IgniteLogger log,
+            QueryPrefetchCallback prefetchCallback
     ) {
         super(Contexts.chain(cfg.getContext()));
 
@@ -173,6 +177,7 @@ public final class BaseQueryContext extends AbstractQueryContext {
         this.log = log;
         this.cancel = cancel;
         this.parameters = parameters;
+        this.prefetchCallback = prefetchCallback;
 
         typeFactory = TYPE_FACTORY;
 
@@ -225,6 +230,10 @@ public final class BaseQueryContext extends AbstractQueryContext {
         return Objects.requireNonNull(schema().unwrap(IgniteSchema.class)).version();
     }
 
+    public QueryPrefetchCallback prefetchCallback() {
+        return prefetchCallback;
+    }
+
     /**
      * Returns calcite catalog reader.
      */
@@ -285,6 +294,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
 
         private Object[] parameters = ArrayUtils.OBJECT_EMPTY_ARRAY;
 
+        private QueryPrefetchCallback prefetchCallback;
+
         public Builder frameworkConfig(FrameworkConfig frameworkCfg) {
             this.frameworkCfg = Objects.requireNonNull(frameworkCfg);
             return this;
@@ -305,13 +316,18 @@ public final class BaseQueryContext extends AbstractQueryContext {
             return this;
         }
 
+        public Builder prefetchCallback(QueryPrefetchCallback prefetchCallback) {
+            this.prefetchCallback = prefetchCallback;
+            return this;
+        }
+
         public Builder parameters(Object... parameters) {
             this.parameters = Objects.requireNonNull(parameters);
             return this;
         }
 
         public BaseQueryContext build() {
-            return new BaseQueryContext(queryId, frameworkCfg, cancel, parameters, log);
+            return new BaseQueryContext(queryId, frameworkCfg, cancel, parameters, log, prefetchCallback);
         }
     }
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 5ade4b6ffd..fb1524a9b3 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -22,12 +22,15 @@ import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_LEFT_ERR;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -42,13 +45,14 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
@@ -57,7 +61,10 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.Frameworks;
@@ -68,6 +75,7 @@ import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.sql.engine.NodeLeftException;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
 import org.apache.ignite.internal.sql.engine.QueryCancelledException;
+import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode;
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
@@ -579,6 +587,115 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest {
         assertThat(stopFuture, willSucceedIn(10, TimeUnit.SECONDS));
     }
 
+    /**
+     * Tests the ability to run multiple statements using {@link QueryPrefetchCallback}. Each subsequent
+     * statement begins execution after the prefetching for the previous statement is completed.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPrefetchCallbackInvocation() throws Exception {
+        String query = "SELECT * FROM test_tbl";
+        int totalStatements = 20;
+        Collection<AsyncCursor<List<Object>>> resultCursors = new ArrayBlockingQueue<>(totalStatements);
+        List<String> queries = IntStream.range(0, totalStatements).boxed().map(n -> query).collect(Collectors.toList());
+        ArrayBlockingQueue<String> queriesQueue = new ArrayBlockingQueue<>(totalStatements, false, queries);
+        AtomicReference<AssertionError> errHolder = new AtomicReference<>();
+        ExecutionService execService = executionServices.get(0);
+
+        Function<QueryPrefetchCallback, BaseQueryContext> createCtx = (callback) -> BaseQueryContext.builder()
+                .cancel(new QueryCancel())
+                .prefetchCallback(callback)
+                .frameworkConfig(
+                        Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+                                .defaultSchema(wrap(schema))
+                                .build()
+                )
+                .logger(log)
+                .build();
+
+        QueryPrefetchCallback prefetchListener = new QueryPrefetchCallback() {
+            @Override
+            public void onPrefetchComplete(@Nullable Throwable err) {
+                try {
+                    assertThat(err, nullValue());
+
+                    String sql = queriesQueue.poll();
+
+                    assertThat(sql, notNullValue());
+
+                    BaseQueryContext ctx = createCtx.apply(queriesQueue.isEmpty() ? null : this);
+                    InternalTransaction tx = new NoOpTransaction(nodeNames.get(0));
+                    QueryPlan plan = prepare(sql, ctx);
+
+                    resultCursors.add(
+                            execService.executePlan(tx, plan, ctx)
+                    );
+                } catch (AssertionError e) {
+                    errHolder.set(e);
+                } catch (Throwable t) {
+                    errHolder.set(new AssertionError(t));
+                }
+            }
+        };
+
+        // Start statements execution.
+        prefetchListener.onPrefetchComplete(null);
+
+        waitForCondition(() -> resultCursors.size() == queries.size(), TIMEOUT_IN_MS);
+
+        if (errHolder.get() != null) {
+            throw errHolder.get();
+        }
+
+        assertEquals(queries.size(), resultCursors.size());
+
+        resultCursors.forEach(AsyncCursor::closeAsync);
+    }
+
+    /**
+     * Test ensures that an exception during data prefetching is propagated to the callback.
+     */
+    @Test
+    public void testErrorIsPropagatedToPrefetchCallback() {
+        ExecutionService execService = executionServices.get(0);
+        CompletableFuture<Void> prefetchFut = new CompletableFuture<>();
+        IgniteInternalException expectedException = new IgniteInternalException(Common.INTERNAL_ERR, "Expected exception");
+
+        BaseQueryContext ctx = BaseQueryContext.builder()
+                .cancel(new QueryCancel())
+                .prefetchCallback(prefetchFut::completeExceptionally)
+                .frameworkConfig(
+                        Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+                                .defaultSchema(wrap(schema))
+                                .build()
+                )
+                .logger(log)
+                .build();
+
+        testCluster.node(nodeNames.get(2)).interceptor((nodeName, msg, original) -> {
+            if (msg instanceof QueryStartRequest) {
+                testCluster.node(nodeNames.get(2)).messageService().send(nodeName, new SqlQueryMessagesFactory().queryStartResponse()
+                        .queryId(((QueryStartRequest) msg).queryId())
+                        .fragmentId(((QueryStartRequest) msg).fragmentId())
+                        .error(expectedException)
+                        .build()
+                );
+            } else {
+                original.onMessage(nodeName, msg);
+            }
+
+            return CompletableFuture.completedFuture(null);
+        });
+
+        QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
+        AsyncCursor<List<Object>> cursor = execService.executePlan(new NoOpTransaction(nodeNames.get(0)), plan, ctx);
+
+        assertThat(prefetchFut, willThrow(equalTo(expectedException)));
+
+        cursor.closeAsync();
+    }
+
     /** Creates an execution service instance for the node with given consistent id. */
     public ExecutionServiceImpl<Object[]> create(String nodeName) {
         if (!nodeNames.contains(nodeName)) {
@@ -918,8 +1035,8 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest {
     private static class CapturingMailboxRegistry implements MailboxRegistry {
         private final MailboxRegistry delegate;
 
-        private final Set<Inbox<?>> inboxes = Collections.newSetFromMap(new IdentityHashMap<>());
-        private final Set<Outbox<?>> outboxes = Collections.newSetFromMap(new IdentityHashMap<>());
+        private final Set<Inbox<?>> inboxes = Collections.newSetFromMap(new ConcurrentHashMap<>());
+        private final Set<Outbox<?>> outboxes = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
         CapturingMailboxRegistry(MailboxRegistry delegate) {
             this.delegate = delegate;