You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pp...@apache.org on 2023/10/19 10:54:50 UTC
[ignite-3] branch main updated: IGNITE-20454 Sql. Added a callback that is notified when data prefetching is complete (#2674)
This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 87792091f1 IGNITE-20454 Sql. Added a callback that is notified when data prefetching is complete (#2674)
87792091f1 is described below
commit 87792091f1fef453c191dd93e5ea24a47f82dfea
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Thu Oct 19 13:54:44 2023 +0300
IGNITE-20454 Sql. Added a callback that is notified when data prefetching is complete (#2674)
---
.../internal/sql/engine/QueryPrefetchCallback.java | 46 ++++++++
.../sql/engine/exec/ExecutionServiceImpl.java | 24 +++-
.../sql/engine/exec/rel/AsyncRootNode.java | 28 ++++-
.../internal/sql/engine/util/BaseQueryContext.java | 20 +++-
.../sql/engine/exec/ExecutionServiceImplTest.java | 123 ++++++++++++++++++++-
5 files changed, 230 insertions(+), 11 deletions(-)
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryPrefetchCallback.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryPrefetchCallback.java
new file mode 100644
index 0000000000..7ef6a7604c
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryPrefetchCallback.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
+import org.apache.ignite.internal.sql.engine.exec.rel.AsyncRootNode;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlCommand;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Callback that is invoked when the query finishes prefetching. It is designed
+ * to allow sequential execution of SQL statements that are dependent on each other.
+ *
+ * <ol>
+ * <li>For {@code DML} queries, it is called after the cursor has finished prefetching
+ * the initial batch of rows (see {@link AsyncRootNode#startPrefetch}).</li>
+ * <li>For {@code DDL} queries, it is called after the corresponding DDL
+ * command has completed (see {@link DdlCommandHandler#handle(DdlCommand)}.</li>
+ * </ol>
+ *
+ * <p>This callback is invoked asynchronously in the "{@code execution pool}".
+ */
+@FunctionalInterface
+public interface QueryPrefetchCallback {
+ /**
+ * Called when the query finishes prefetching.
+ *
+ * @param ex Exceptional completion cause, or {@code null} if prefetch completed successfully.
+ */
+ void onPrefetchComplete(@Nullable Throwable ex);
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index aa4c88cb73..ca0be42a80 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.sql.engine.NodeLeftException;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
+import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
@@ -268,9 +269,9 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
case QUERY:
return executeQuery(tx, ctx, (MultiStepPlan) plan);
case EXPLAIN:
- return executeExplain((ExplainPlan) plan);
+ return executeExplain((ExplainPlan) plan, ctx.prefetchCallback());
case DDL:
- return executeDdl((DdlPlan) plan);
+ return executeDdl((DdlPlan) plan, ctx.prefetchCallback());
default:
throw new AssertionError("Unexpected query type: " + plan);
@@ -288,13 +289,17 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
return mgr.close(true);
}
- private AsyncCursor<List<Object>> executeDdl(DdlPlan plan) {
+ private AsyncCursor<List<Object>> executeDdl(DdlPlan plan, @Nullable QueryPrefetchCallback callback) {
CompletableFuture<Iterator<List<Object>>> ret = ddlCmdHnd.handle(plan.command())
.thenApply(applied -> List.of(List.<Object>of(applied)).iterator())
.exceptionally(th -> {
throw convertDdlException(th);
});
+ if (callback != null) {
+ ret.whenCompleteAsync((res, err) -> callback.onPrefetchComplete(err), taskExecutor);
+ }
+
return new AsyncWrapper<>(ret, Runnable::run);
}
@@ -315,9 +320,13 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
return (e instanceof RuntimeException) ? (RuntimeException) e : new IgniteInternalException(INTERNAL_ERR, e);
}
- private AsyncCursor<List<Object>> executeExplain(ExplainPlan plan) {
+ private AsyncCursor<List<Object>> executeExplain(ExplainPlan plan, @Nullable QueryPrefetchCallback callback) {
List<List<Object>> res = List.of(List.of(plan.plan()));
+ if (callback != null) {
+ taskExecutor.execute(() -> callback.onPrefetchComplete(null));
+ }
+
return new AsyncWrapper<>(res.iterator());
}
@@ -555,7 +564,12 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
});
node.onRegister(rootNode);
- rootNode.prefetch();
+ CompletableFuture<Void> prefetchFut = rootNode.startPrefetch();
+ QueryPrefetchCallback callback = ctx.prefetchCallback();
+
+ if (callback != null) {
+ prefetchFut.whenCompleteAsync((res, err) -> callback.onPrefetchComplete(err), taskExecutor);
+ }
root.complete(rootNode);
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
index d21c541c79..b1350a8227 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
@@ -31,6 +31,7 @@ import java.util.function.Function;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.sql.CursorClosedException;
+import org.jetbrains.annotations.Nullable;
/**
* An async iterator over the execution tree.
@@ -56,6 +57,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async
private final Queue<PendingRequest<OutRowT>> pendingRequests = new ConcurrentLinkedQueue<>();
+ private final CompletableFuture<Void> prefetchFut = new CompletableFuture<>();
+
private volatile boolean closed = false;
/**
@@ -169,6 +172,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async
}
}, source::onError);
+ completePrefetchFuture(th);
+
closed = true;
}
}
@@ -181,8 +186,10 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async
* Starts the execution of the fragment and keeps the result in the intermediate buffer.
*
* <p>Note: this method must be called by the same thread that will execute the whole fragment.
+ *
+ * @return Future representing pending completion of the prefetch operation.
*/
- public void prefetch() {
+ public CompletableFuture<Void> startPrefetch() {
assert source.context().description().prefetch();
if (waiting == 0) {
@@ -192,9 +199,13 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async
onError(ex);
}
}
+
+ return prefetchFut;
}
private void flush() throws Exception {
+ completePrefetchFuture(null);
+
// flush may be triggered by prefetching, so let's do nothing in this case
if (pendingRequests.isEmpty()) {
return;
@@ -234,6 +245,21 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async
}
}
+ /**
+ * Completes prefetch future if it has not already been completed.
+ *
+ * @param ex Exceptional completion cause or {@code null} if the future must complete successfully.
+ */
+ private void completePrefetchFuture(@Nullable Throwable ex) {
+ if (!prefetchFut.isDone()) {
+ if (ex != null) {
+ prefetchFut.completeExceptionally(ex);
+ } else {
+ prefetchFut.complete(null);
+ }
+ }
+ }
+
private static class PendingRequest<OutRowT> {
/**
* A future to complete when {@link #buff buffer} will be filled.
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
index b6f46a7e3b..7c2d517c85 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
@@ -50,6 +50,7 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.QueryCancel;
+import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
import org.apache.ignite.internal.sql.engine.rex.IgniteRexBuilder;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
@@ -152,6 +153,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
private final Object[] parameters;
+ private final QueryPrefetchCallback prefetchCallback;
+
private CalciteCatalogReader catalogReader;
/**
@@ -162,7 +165,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
FrameworkConfig cfg,
QueryCancel cancel,
Object[] parameters,
- IgniteLogger log
+ IgniteLogger log,
+ QueryPrefetchCallback prefetchCallback
) {
super(Contexts.chain(cfg.getContext()));
@@ -173,6 +177,7 @@ public final class BaseQueryContext extends AbstractQueryContext {
this.log = log;
this.cancel = cancel;
this.parameters = parameters;
+ this.prefetchCallback = prefetchCallback;
typeFactory = TYPE_FACTORY;
@@ -225,6 +230,10 @@ public final class BaseQueryContext extends AbstractQueryContext {
return Objects.requireNonNull(schema().unwrap(IgniteSchema.class)).version();
}
+ public QueryPrefetchCallback prefetchCallback() {
+ return prefetchCallback;
+ }
+
/**
* Returns calcite catalog reader.
*/
@@ -285,6 +294,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
private Object[] parameters = ArrayUtils.OBJECT_EMPTY_ARRAY;
+ private QueryPrefetchCallback prefetchCallback;
+
public Builder frameworkConfig(FrameworkConfig frameworkCfg) {
this.frameworkCfg = Objects.requireNonNull(frameworkCfg);
return this;
@@ -305,13 +316,18 @@ public final class BaseQueryContext extends AbstractQueryContext {
return this;
}
+ public Builder prefetchCallback(QueryPrefetchCallback prefetchCallback) {
+ this.prefetchCallback = prefetchCallback;
+ return this;
+ }
+
public Builder parameters(Object... parameters) {
this.parameters = Objects.requireNonNull(parameters);
return this;
}
public BaseQueryContext build() {
- return new BaseQueryContext(queryId, frameworkCfg, cancel, parameters, log);
+ return new BaseQueryContext(queryId, frameworkCfg, cancel, parameters, log, prefetchCallback);
}
}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 5ade4b6ffd..fb1524a9b3 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -22,12 +22,15 @@ import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_LEFT_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -42,13 +45,14 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
-import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
@@ -57,7 +61,10 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.Frameworks;
@@ -68,6 +75,7 @@ import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.sql.engine.NodeLeftException;
import org.apache.ignite.internal.sql.engine.QueryCancel;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
+import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
@@ -579,6 +587,115 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest {
assertThat(stopFuture, willSucceedIn(10, TimeUnit.SECONDS));
}
+ /**
+ * Tests the ability to run multiple statements using {@link QueryPrefetchCallback}. Each subsequent
+ * statement begins execution after the prefetching for the previous statement is completed.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPrefetchCallbackInvocation() throws Exception {
+ String query = "SELECT * FROM test_tbl";
+ int totalStatements = 20;
+ Collection<AsyncCursor<List<Object>>> resultCursors = new ArrayBlockingQueue<>(totalStatements);
+ List<String> queries = IntStream.range(0, totalStatements).boxed().map(n -> query).collect(Collectors.toList());
+ ArrayBlockingQueue<String> queriesQueue = new ArrayBlockingQueue<>(totalStatements, false, queries);
+ AtomicReference<AssertionError> errHolder = new AtomicReference<>();
+ ExecutionService execService = executionServices.get(0);
+
+ Function<QueryPrefetchCallback, BaseQueryContext> createCtx = (callback) -> BaseQueryContext.builder()
+ .cancel(new QueryCancel())
+ .prefetchCallback(callback)
+ .frameworkConfig(
+ Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(wrap(schema))
+ .build()
+ )
+ .logger(log)
+ .build();
+
+ QueryPrefetchCallback prefetchListener = new QueryPrefetchCallback() {
+ @Override
+ public void onPrefetchComplete(@Nullable Throwable err) {
+ try {
+ assertThat(err, nullValue());
+
+ String sql = queriesQueue.poll();
+
+ assertThat(sql, notNullValue());
+
+ BaseQueryContext ctx = createCtx.apply(queriesQueue.isEmpty() ? null : this);
+ InternalTransaction tx = new NoOpTransaction(nodeNames.get(0));
+ QueryPlan plan = prepare(sql, ctx);
+
+ resultCursors.add(
+ execService.executePlan(tx, plan, ctx)
+ );
+ } catch (AssertionError e) {
+ errHolder.set(e);
+ } catch (Throwable t) {
+ errHolder.set(new AssertionError(t));
+ }
+ }
+ };
+
+ // Start statements execution.
+ prefetchListener.onPrefetchComplete(null);
+
+ waitForCondition(() -> resultCursors.size() == queries.size(), TIMEOUT_IN_MS);
+
+ if (errHolder.get() != null) {
+ throw errHolder.get();
+ }
+
+ assertEquals(queries.size(), resultCursors.size());
+
+ resultCursors.forEach(AsyncCursor::closeAsync);
+ }
+
+ /**
+ * Test ensures that an exception during data prefetching is propagated to the callback.
+ */
+ @Test
+ public void testErrorIsPropagatedToPrefetchCallback() {
+ ExecutionService execService = executionServices.get(0);
+ CompletableFuture<Void> prefetchFut = new CompletableFuture<>();
+ IgniteInternalException expectedException = new IgniteInternalException(Common.INTERNAL_ERR, "Expected exception");
+
+ BaseQueryContext ctx = BaseQueryContext.builder()
+ .cancel(new QueryCancel())
+ .prefetchCallback(prefetchFut::completeExceptionally)
+ .frameworkConfig(
+ Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(wrap(schema))
+ .build()
+ )
+ .logger(log)
+ .build();
+
+ testCluster.node(nodeNames.get(2)).interceptor((nodeName, msg, original) -> {
+ if (msg instanceof QueryStartRequest) {
+ testCluster.node(nodeNames.get(2)).messageService().send(nodeName, new SqlQueryMessagesFactory().queryStartResponse()
+ .queryId(((QueryStartRequest) msg).queryId())
+ .fragmentId(((QueryStartRequest) msg).fragmentId())
+ .error(expectedException)
+ .build()
+ );
+ } else {
+ original.onMessage(nodeName, msg);
+ }
+
+ return CompletableFuture.completedFuture(null);
+ });
+
+ QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
+ AsyncCursor<List<Object>> cursor = execService.executePlan(new NoOpTransaction(nodeNames.get(0)), plan, ctx);
+
+ assertThat(prefetchFut, willThrow(equalTo(expectedException)));
+
+ cursor.closeAsync();
+ }
+
/** Creates an execution service instance for the node with given consistent id. */
public ExecutionServiceImpl<Object[]> create(String nodeName) {
if (!nodeNames.contains(nodeName)) {
@@ -918,8 +1035,8 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest {
private static class CapturingMailboxRegistry implements MailboxRegistry {
private final MailboxRegistry delegate;
- private final Set<Inbox<?>> inboxes = Collections.newSetFromMap(new IdentityHashMap<>());
- private final Set<Outbox<?>> outboxes = Collections.newSetFromMap(new IdentityHashMap<>());
+ private final Set<Inbox<?>> inboxes = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final Set<Outbox<?>> outboxes = Collections.newSetFromMap(new ConcurrentHashMap<>());
CapturingMailboxRegistry(MailboxRegistry delegate) {
this.delegate = delegate;